diff --git a/cds_migrator_kit/rdm/cli.py b/cds_migrator_kit/rdm/cli.py index bcdf811d..1ab8c02a 100644 --- a/cds_migrator_kit/rdm/cli.py +++ b/cds_migrator_kit/rdm/cli.py @@ -22,6 +22,7 @@ CommentsStreamDefinition, ) from cds_migrator_kit.rdm.records.streams import ( # UserStreamDefinition, + RecordEPApprovalStreamDefinition, RecordStreamDefinition, ) from cds_migrator_kit.rdm.stats.runner import RecordStatsRunner @@ -69,12 +70,20 @@ def migration(): "Can also be set per-collection in streams.yaml under transform.workers." ), ) +@click.option( + "--ep-approval", + is_flag=True, + help="Use the EP approval load stream (pre-EP draft snapshots without legacy minting).", +) @with_appcontext -def run(collection, dry_run=False, keep_logs=False, workers=None): +def run(collection, dry_run=False, keep_logs=False, workers=None, ep_approval=False): """Run.""" stream_config = current_app.config["CDS_MIGRATOR_KIT_STREAM_CONFIG"] + stream_definition = ( + RecordEPApprovalStreamDefinition if ep_approval else RecordStreamDefinition + ) runner = Runner( - stream_definitions=[RecordStreamDefinition], + stream_definitions=[stream_definition], # stream_definitions=[UserStreamDefinition], config_filepath=Path(stream_config).absolute(), dry_run=dry_run, diff --git a/cds_migrator_kit/rdm/migration_config.py b/cds_migrator_kit/rdm/migration_config.py index ed243d76..f1e1c50b 100644 --- a/cds_migrator_kit/rdm/migration_config.py +++ b/cds_migrator_kit/rdm/migration_config.py @@ -359,6 +359,11 @@ def _(x): # needed to avoid start time failure with lazy strings "validator": always_valid, "datacite": "CDS", }, + "apprn": { + "label": _("Approval Report Number"), + "validator": schemes.is_approval_report_number, + "datacite": "CDS", + }, "aleph": { "label": _("Aleph number"), "validator": schemes.is_aleph, @@ -518,3 +523,19 @@ def resolve_record_pid(pid): # don't generate logs for migration AUDIT_LOGS_ENABLED = False + +### EP Approval configuration +# =========================== +CDS_CERN_SCIENTIFIC_COMMUNITY_ID = "8c42db1e-1230-42f0-9d99-5880bbee5326" +"""The id of the CERN Scientific community.""" + +CDS_EP_APPROVAL_COMMUNITIES = { + # Map community UUID → workflow config. + # UUIDs are used (not slugs) because slugs can be renamed. + # + "8dfea666-5758-4614-bbc1-56209565c78a": { + "label": "EP approval", # shown in UI buttons/headings + "referee_group": "cds-ph-ep-publication", # CERN e-group slug + "report_number_pattern": "CERN-EP-{year}-{seq:03d}", + }, +} diff --git a/cds_migrator_kit/rdm/records/load/__init__.py b/cds_migrator_kit/rdm/records/load/__init__.py index 38f8fc5e..4ce3126c 100644 --- a/cds_migrator_kit/rdm/records/load/__init__.py +++ b/cds_migrator_kit/rdm/records/load/__init__.py @@ -7,6 +7,7 @@ """CDS-RDM Migration load package.""" +from .ep_approval_load import CDSEPApprovalRecordServiceLoad from .load import CDSRecordServiceLoad -__all__ = ("CDSRecordServiceLoad",) +__all__ = ("CDSEPApprovalRecordServiceLoad", "CDSRecordServiceLoad") diff --git a/cds_migrator_kit/rdm/records/load/ep_approval_load.py b/cds_migrator_kit/rdm/records/load/ep_approval_load.py new file mode 100644 index 00000000..46a49fe8 --- /dev/null +++ b/cds_migrator_kit/rdm/records/load/ep_approval_load.py @@ -0,0 +1,766 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration load module for records with EP approval.""" +import re +from collections import OrderedDict +from copy import deepcopy +from datetime import datetime, timezone +from pathlib import Path + +from cds_rdm.requests.ep_approval import APPRN_PID_TYPE, EPApprovalRequest +from flask import current_app +from invenio_access.permissions import system_identity +from invenio_accounts.models import User +from invenio_db import db +from invenio_db.uow import UnitOfWork +from invenio_drafts_resources.services.records.uow import ParentRecordCommitOp +from invenio_pidstore.errors import PIDAlreadyExists +from invenio_pidstore.models import PersistentIdentifier, PIDStatus +from invenio_rdm_records.proxies import current_rdm_records_service +from invenio_rdm_records.records.api import RDMParent +from invenio_records_resources.services.uow import RecordCommitOp +from invenio_requests.customizations.event_types import LogEventType +from invenio_requests.proxies import current_events_service, current_requests_service +from invenio_requests.resolvers.registry import ResolverRegistry + +from cds_migrator_kit.errors import ManualImportRequired, UnexpectedValue +from cds_migrator_kit.rdm.migration_config import CDS_CERN_SCIENTIFIC_COMMUNITY_ID + +from .load import CDSRecordServiceLoad + +EPPHAPP_FILE_TYPE = "EPPHAPP_FILE" +EP_APPROVAL_WAITING_STATUS = "waiting" +EP_APPROVAL_APPROVED_STATUS = "approved" +EP_APPROVAL_REPORT_NUMBER_PREFIX = "CERN-EP" +EP_APPROVAL_REPORT_NUMBER_RE = re.compile(r"^CERN-EP-\d{4}-\d{3}$") + + +class CDSEPApprovalRecordServiceLoad(CDSRecordServiceLoad): + """Load records with EP approval. + + Splits a legacy record into two RDM records before load: + - a public record with non-EPPHAPP files + - a restricted record with restricted EPPHAPP files + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.ep_approval_metadata = { + "title": None, + "experiment": None, + "resource_type": None, + "report_number": None, + } + self._ep_approval_parsed = None + self._load_flags = self._public_load_flags() + + def _public_load_flags(self): + """Load flags for the public migrated record.""" + return { + "mint_pids": True, + "mint_legacy_recid": True, + "save_original_dump": True, + "clc_sync": True, + "record_state": True, + } + + def _restricted_load_flags(self): + """Load flags for the restricted EPPHAPP snapshot record.""" + return { + "mint_pids": False, + "mint_legacy_recid": False, + "save_original_dump": False, + "clc_sync": False, + "record_state": False, + } + + def _should_log_record_state(self): + return self._load_flags["record_state"] + + def _after_publish_mint_recid(self, record, entry, version): + if self._load_flags["mint_legacy_recid"]: + super()._after_publish_mint_recid(record, entry, version) + + def _after_publish_update_dois(self, identity, record, entry, uow): + if self._load_flags["mint_pids"]: + return super()._after_publish_update_dois(identity, record, entry, uow) + return None + + def _assign_rep_numbers(self, draft): + if self._load_flags["mint_pids"]: + super()._assign_rep_numbers(draft) + + def _save_original_dumped_record(self, entry, recid_state): + if self._load_flags["save_original_dump"]: + super()._save_original_dumped_record(entry, recid_state) + + def _after_load_clc_sync(self, record_state): + if self._load_flags["clc_sync"]: + super()._after_load_clc_sync(record_state) + + def _load(self, entry): + """ + Load the record with EP approval. + Configure the 2 records by separating the files and call the load method for each one + After creation completed of these 2 record, create EP approval request with metadata from the entry + Connect 2 records with related_identifiers and EP approval request + """ + if not entry: + return + try: + recid = entry.get("record", {}).get("recid") + ep_approval = entry.get("record", {}).get("ep_approval") + if not ep_approval: + raise UnexpectedValue( + message="EP approval request not found", + stage="load", + recid=recid, + priority="critical", + ) + record_json = entry.get("record", {}).get("json", {}) + metadata = record_json.get("metadata", {}) + + # Set the EP approval metadata + self.ep_approval_metadata["resource_type"] = metadata.get("resource_type") + self.ep_approval_metadata["title"] = metadata.get("title") + + # TODO: What if there are multiple experiments? + experiments = record_json.get("custom_fields", {}).get( + "cern:experiments", [] + ) + self.ep_approval_metadata["experiment"] = ( + experiments[0]["id"] if experiments else None + ) + + if not self.ep_approval_metadata["experiment"]: + raise UnexpectedValue( + message="Record is missing experiment it's required for EP approval", + stage="load", + recid=recid, + priority="critical", + ) + + # Validate the EP approval data + self._validate_ep_approval(ep_approval, recid) + + # Split the metadata and files + public_entry = self._split_entry(entry, include_epphapp=False) + restricted_entry = self._split_entry(entry, include_epphapp=True) + + # Load the records + restricted_state = self._load_split_record( + restricted_entry, self._restricted_load_flags(), finalise=False + ) + public_state = self._load_split_record( + public_entry, + self._public_load_flags(), + finalise=True, + ) + + # Load the EP approval request + self._load_ep_approval(restricted_state, public_state, legacy_recid=recid) + except (UnexpectedValue, ManualImportRequired) as e: + self.migration_logger.add_log(e, record=entry) + except Exception as e: + exc = ManualImportRequired( + message=str(e), + field="validation", + stage="load", + recid=recid, + priority="warning", + ) + self.migration_logger.add_log(exc, record=entry) + + def _load_split_record(self, entry, load_flags, finalise): + """Load the record.""" + self._load_flags = load_flags + self._finalise_on_load = finalise + return super()._load(entry) + + def _validate_ep_approval(self, ep_approval, legacy_recid): + """Validate EP approval data before creating any records.""" + waiting_entry, approved_entry, report_number = self._parse_ep_approval_history( + ep_approval + ) + + existing = self._existing_ep_approval_request(legacy_recid) + if existing: + raise ManualImportRequired( + message=f"EP approval request {existing['id']} already exists", + stage="load", + priority="critical", + ) + if self._exists_apprn_pid(report_number): + raise ManualImportRequired( + message=f"APPRN PID {report_number} already exists", + stage="load", + priority="critical", + ) + + self._ep_approval_parsed = (waiting_entry, approved_entry, report_number) + + def _load_ep_approval(self, restricted_state, public_state, legacy_recid): + """Create EP approval request and link public/restricted records.""" + waiting_entry, approved_entry, report_number = self._ep_approval_parsed + + publication_title = self.ep_approval_metadata["title"] + approval_datetime = self._parse_legacy_datetime(approved_entry.get("date")) + approval_iso = approval_datetime.isoformat() + + if not self.dry_run: + if not restricted_state or not public_state: + raise UnexpectedValue( + message="Both public and restricted records are required for EP approval.", + stage="load", + recid=legacy_recid, + priority="critical", + ) + restricted_recid = restricted_state["latest_version"] + public_recid = public_state["latest_version"] + restricted_parent = RDMParent.get_record( + restricted_state["parent_object_uuid"] + ) + public_parent = RDMParent.get_record(public_state["parent_object_uuid"]) + # Create the request + self._create_ep_approval_request( + legacy_recid, + restricted_recid, + restricted_parent, + waiting_entry, + approved_entry, + report_number, + publication_title, + ) + + self._mint_apprn_pid( + report_number, restricted_state["latest_version_object_uuid"] + ) + + # Write ep approval metadata to restricted and public parent records + with UnitOfWork() as uow: + self._write_parent_ep_approval( + restricted_parent, + { + "reportnumber": report_number, + "datetime": approval_iso, + "approved_internal_version": restricted_recid, + "approved_public_version": public_recid, + "source_public_version": restricted_recid, + }, + uow, + ) + self._write_parent_ep_approval( + public_parent, + { + "reportnumber": report_number, + "source_internal_version": restricted_recid, + }, + uow, + ) + uow.commit() + # Link the records with related_identifiers + self._append_related_identifier( + public_recid, + restricted_recid, + "isversionof", + self.ep_approval_metadata["resource_type"], + ) + self._append_related_identifier( + restricted_recid, + public_recid, + "isvariantformof", + self.ep_approval_metadata["resource_type"], + ) + # Add the APPRN identifier to the public record + self._sync_public_apprn_identifier(public_recid, report_number) + + def _parse_ep_approval_history(self, ep_approval): + """Return waiting/approved history entries and the report number.""" + if len(ep_approval) != 2: + raise UnexpectedValue( + message="EP approval history has more/less than 2 entries", + stage="load", + priority="critical", + ) + history = ep_approval or [] + waiting = next( + ( + item + for item in history + if item.get("status") == EP_APPROVAL_WAITING_STATUS + ), + None, + ) + approved = next( + ( + item + for item in history + if item.get("status") == EP_APPROVAL_APPROVED_STATUS + ), + None, + ) + if not waiting: + raise UnexpectedValue( + message="EP approval history has no waiting entry", + stage="load", + priority="critical", + ) + if not approved: + raise UnexpectedValue( + message="EP approval history has no approved entry", + stage="load", + priority="critical", + ) + + report_number = approved.get("ep_report_number") + if not report_number: + raise UnexpectedValue( + message="EP approval approved entry is missing ep_report_number", + stage="load", + priority="critical", + ) + if waiting.get("ep_report_number") != report_number: + raise UnexpectedValue( + message="EP approval waiting entry has different ep_report_number than approved entry", + stage="load", + priority="critical", + ) + self.ep_approval_metadata["report_number"] = report_number + # Check if the submitters are exists + self._resolve_user_by_email(waiting.get("submitted_by"), "submitter") + self._resolve_user_by_email(approved.get("submitted_by"), "approver") + + # Check if record approved after the deadline + waiting_deadline = self._parse_legacy_datetime(waiting.get("deadline")) + approved_date = self._parse_legacy_datetime(approved.get("date")) + created_at = self._parse_legacy_datetime(waiting.get("date")) + if not created_at or not approved_date or not waiting_deadline: + raise UnexpectedValue( + message="EP approval history has missing timestamps", + stage="load", + priority="critical", + ) + if waiting_deadline and approved_date > waiting_deadline: + raise UnexpectedValue( + message="Record approved after the deadline", + stage="load", + priority="critical", + ) + return waiting, approved, report_number + + @staticmethod + def _parse_legacy_datetime(value): + """Parse legacy EP approval timestamps into timezone-aware datetimes.""" + if not value: + return None + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): + try: + return datetime.strptime(value, fmt).replace(tzinfo=timezone.utc) + except ValueError: + continue + return None + + def _get_ep_referee_group(self, restricted_parent): + """Get the EP approval referee group from the restricted record.""" + default_community_id = restricted_parent.get("communities", {}).get("default") + if not default_community_id: + raise UnexpectedValue( + message="Restricted record has no default community for EP approval", + stage="load", + priority="critical", + ) + ep_config = current_app.config.get("CDS_EP_APPROVAL_COMMUNITIES", {}).get( + default_community_id + ) + if not ep_config: + raise UnexpectedValue( + message=( + f"Community {default_community_id} is not enrolled in " + "CDS_EP_APPROVAL_COMMUNITIES" + ), + stage="load", + priority="critical", + ) + return ep_config["referee_group"] + + def _resolve_user_by_email(self, email, role): + """Resolve the user by email.""" + if not email: + raise UnexpectedValue( + message=f"EP approval {role} email is missing", + stage="load", + priority="critical", + ) + user = User.query.filter_by(email=email).one_or_none() + if not user: + raise UnexpectedValue( + message=f"EP approval {role} user not found: {email}", + stage="load", + priority="critical", + ) + return {"user": str(user.id)} + + def _existing_ep_approval_request(self, legacy_recid): + """Check if the EP approval request already exists.""" + number = f"lrecid:{legacy_recid}:ep-approval" + results = current_requests_service.search( + system_identity, + params={"q": f'number:"{number}"', "size": 1}, + ) + hits = list(results.hits) + return hits[0] if hits else None + + def _exists_apprn_pid(self, report_number): + """Check if the APPRN PID already exists.""" + existing = PersistentIdentifier.query.filter_by( + pid_type=APPRN_PID_TYPE, + pid_value=report_number, + ).one_or_none() + if existing: + return True + return False + + def _mint_apprn_pid(self, report_number, restricted_version_uuid): + """Mint the APPRN PID.""" + try: + PersistentIdentifier.create( + pid_type=APPRN_PID_TYPE, + pid_value=report_number, + object_type="rec", + object_uuid=str(restricted_version_uuid), + status=PIDStatus.REGISTERED, + ) + except PIDAlreadyExists: + raise ManualImportRequired( + message=f"APPRN PID {report_number} already exists", + stage="load", + priority="critical", + ) + + def _write_parent_ep_approval( + self, + parent, + ep_approval, + uow, + ): + """Write the EP approval metadata to the parent record.""" + pf = parent.get("permission_flags") or {} + pf["ep_approval"] = ep_approval + parent["permission_flags"] = pf + uow.register(ParentRecordCommitOp(parent)) + + def _create_accept_log_event(self, request, approved_entry, uow): + """Create the accept timeline event with the legacy approver as created_by.""" + approver_ref = self._resolve_user_by_email( + approved_entry.get("submitted_by"), + "approver", + ) + + event = current_events_service.record_cls.create( + {}, + request=request.model, + request_id=str(request.id), + type=LogEventType, + ) + event.update({"payload": {"event": "accepted"}}) + event.created_by = ResolverRegistry.resolve_entity_proxy( + approver_ref, raise_=True + ) + + approved_at = self._parse_legacy_datetime(approved_entry.get("date")) + if approved_at: + event.model.created = approved_at + + uow.register(RecordCommitOp(event, indexer=current_events_service.indexer)) + + def _apply_approved_entry_to_request( + self, request, approved_entry, report_number, uow + ): + """Update an existing request to accepted using the legacy approved entry.""" + payload = dict(request.get("payload") or {}) + payload["approved_report_number"] = report_number + request["payload"] = payload + request.status = "accepted" + + approved_at = self._parse_legacy_datetime(approved_entry.get("date")) + if approved_at: + request.model.updated = approved_at + + self._create_accept_log_event(request, approved_entry, uow) + + def _create_ep_approval_request( + self, + legacy_recid, + restricted_recid, + restricted_parent, + waiting_entry, + approved_entry, + report_number, + publication_title, + ): + """Create request from waiting entry, then update it with approved entry.""" + expires_at = self._parse_legacy_datetime(waiting_entry.get("deadline")) + + referee_group = self._get_ep_referee_group(restricted_parent) + submitted_payload = { + "experiment": self.ep_approval_metadata["experiment"], + "submitted_by": waiting_entry.get("submitted_by", "unknown"), + "role": "Submitter", + "publication_title": publication_title, + "submitted_version_id": restricted_recid, + } + + with UnitOfWork() as uow: + request_item = current_requests_service.create( + system_identity, + data={ + "title": f'EP approval for "{publication_title}"', + "payload": submitted_payload, + }, + request_type=EPApprovalRequest, + receiver={"group": referee_group}, + creator=self._resolve_user_by_email( + waiting_entry.get("submitted_by"), "submitter" + ), + topic={"record": restricted_recid}, + expires_at=expires_at, + uow=uow, + ) + request = request_item._record + request.number = f"lrecid:{legacy_recid}:ep-approval" + request.status = "submitted" + + submitted_at = self._parse_legacy_datetime(waiting_entry.get("date")) + if submitted_at: + request.model.created = submitted_at + + self._apply_approved_entry_to_request( + request, approved_entry, report_number, uow + ) + + uow.register( + RecordCommitOp(request, indexer=current_requests_service.indexer) + ) + uow.commit() + + def _append_related_identifier( + self, record_id, target_id, relation_id, resource_type + ): + """Append the related identifier to the record.""" + draft = current_rdm_records_service.edit(system_identity, id_=record_id) + data = draft.data + related = list(data.get("metadata", {}).get("related_identifiers", [])) + + entry = { + "identifier": target_id, + "scheme": "cds", + "relation_type": {"id": relation_id}, + } + if resource_type: + entry["resource_type"] = resource_type + related.append(entry) + data.setdefault("metadata", {})["related_identifiers"] = related + current_rdm_records_service.update_draft( + system_identity, id_=draft.id, data=data + ) + current_rdm_records_service.publish(system_identity, id_=draft.id) + return True + + def _sync_public_apprn_identifier(self, public_recid, report_number): + """Add apprn to the public record metadata and republish.""" + draft = current_rdm_records_service.edit(system_identity, id_=public_recid) + data = draft.data + metadata = data.setdefault("metadata", {}) + metadata.setdefault("identifiers", []).append( + {"scheme": "apprn", "identifier": report_number} + ) + current_rdm_records_service.update_draft( + system_identity, id_=draft.id, data=data + ) + current_rdm_records_service.publish(system_identity, id_=draft.id) + + def _add_cern_scientific_community(self, entry): + """Add the CERN Scientific community to the public record parent.""" + communities = entry.get("parent", {}).get("json", {}).get("communities", {}) + ids = list(communities.get("ids", [])) + if CDS_CERN_SCIENTIFIC_COMMUNITY_ID not in ids: + ids.append(CDS_CERN_SCIENTIFIC_COMMUNITY_ID) + communities["ids"] = ids + entry.setdefault("parent", {}).setdefault("json", {})[ + "communities" + ] = communities + + def _should_remove_ep_report_number(self, identifier, public_split): + """Return whether an EP report number should be stripped from metadata.""" + if not identifier.startswith(EP_APPROVAL_REPORT_NUMBER_PREFIX): + return False + if public_split: + return True + if EP_APPROVAL_REPORT_NUMBER_RE.match(identifier): + if identifier != self.ep_approval_metadata["report_number"]: + raise UnexpectedValue( + message="EP report number is not the same as the approved entry", + stage="load", + priority="critical", + ) + return True + return False + + def _remove_ep_report_numbers_from_metadata(self, entry, include_epphapp): + """Strip EP report numbers from split record metadata before load.""" + recid = entry.get("record", {}).get("recid") + metadata = entry.get("record", {}).get("json", {}).get("metadata", {}) + identifiers = metadata.get("identifiers", []) + if not identifiers: + return + + kept = [] + removed = [] + for id_entry in identifiers: + if id_entry.get("scheme") != "cdsrn": + kept.append(id_entry) + continue + identifier = id_entry.get("identifier", "") + if self._should_remove_ep_report_number(identifier, not include_epphapp): + removed.append(identifier) + else: + kept.append(id_entry) + + if not removed: + return + + metadata["identifiers"] = kept + split_type = "restricted" if include_epphapp else "public" + self.migration_logger.add_information( + recid, + { + "message": ( + f"Removed EP approval report number(s) from {split_type} " "record." + ), + "value": removed, + }, + ) + + def _remove_doi_pid_from_metadata(self, entry, include_epphapp): + """Strip DOI PID from restricted EPPHAPP split record metadata before load.""" + if not include_epphapp: + return + + recid = entry.get("record", {}).get("recid") + record_json = entry.get("record", {}).get("json", {}) + pids = record_json.get("pids") + + if not pids or "doi" not in pids: + return + + removed = pids.pop("doi") + + self.migration_logger.add_information( + recid, + { + "message": "Removed DOI PID from restricted record.", + "value": removed, + }, + ) + + def _split_entry(self, entry, include_epphapp): + """Return a load entry for the public or restricted EP approval split.""" + split = deepcopy(entry) + split["record"].pop("ep_approval", None) + + new_versions = OrderedDict() + versioned_files = OrderedDict() + previous_signature = None + + for _, version_data in split.get("versions", {}).items(): + current_version_files = OrderedDict() + + for key, file_data in version_data.get("files", {}).items(): + is_epphapp = file_data.get("type") == EPPHAPP_FILE_TYPE + + if include_epphapp != is_epphapp: + continue + + if not include_epphapp and file_data.get("access"): + raise UnexpectedValue( + message=( + "Public split contains restricted files after excluding " + f"EPPHAPP files: {[key]}" + ), + stage="load", + recid=split["record"]["recid"], + priority="critical", + ) + + current_version_files[key] = deepcopy(file_data) + + if not current_version_files: + continue + + versioned_files.update(current_version_files) + + signature = tuple( + sorted( + ( + key, + file_data.get("checksum"), + file_data.get("id_bibdoc"), + file_data.get("version"), + file_data.get("type"), + file_data.get("access"), + ) + for key, file_data in versioned_files.items() + ) + ) + # If the signature is the same, skip the version + if signature == previous_signature: + continue + + previous_signature = signature + + version_access = deepcopy(version_data.get("access", {})) + access_obj = deepcopy(version_access.get("access_obj", {})) + + if include_epphapp: + access_obj["record"] = "restricted" + access_obj["files"] = "restricted" + else: + access_obj["record"] = "public" + access_obj["files"] = "public" + # Remove the meta field from the public version access + version_access.pop("meta", None) + + version_access["access_obj"] = access_obj + + new_version_data = deepcopy(version_data) + new_version_data["files"] = deepcopy(versioned_files) + new_version_data["access"] = version_access + + new_versions[len(new_versions) + 1] = new_version_data + + if not new_versions: + raise UnexpectedValue( + message=( + "No EPPHAPP files found to load for EP approval restricted split" + if include_epphapp + else "No public files found to load for EP approval public split" + ), + stage="load", + recid=split["record"]["recid"], + priority="critical", + ) + + if not include_epphapp: + split["record"]["access"] = "public" + self._add_cern_scientific_community(split) + + split["versions"] = new_versions + self._remove_ep_report_numbers_from_metadata(split, include_epphapp) + self._remove_doi_pid_from_metadata(split, include_epphapp) + + return split diff --git a/cds_migrator_kit/rdm/records/load/load.py b/cds_migrator_kit/rdm/records/load/load.py index a1a1bed9..db12c202 100644 --- a/cds_migrator_kit/rdm/records/load/load.py +++ b/cds_migrator_kit/rdm/records/load/load.py @@ -35,6 +35,7 @@ CDSMigrationException, GrantCreationError, ManualImportRequired, + UnexpectedValue, ) @@ -69,6 +70,7 @@ def __init__( self.dry_run = dry_run self.legacy_pids_to_redirect = {} self.clc_sync = False + self._finalise_on_load = True self.collection = collection self.update_new_version_publication_date = update_new_version_publication_date self.migration_logger = migration_logger @@ -540,9 +542,14 @@ def _load_versions(self, entry, uow): record_state_context = self._load_record_state(legacy_recid, records) # Dump the computed record state. This is useful to migrate then the record stats if record_state_context: - self.record_state_logger.add_record_state(record_state_context) + if self._should_log_record_state(): + self.record_state_logger.add_record_state(record_state_context) return record_state_context + def _should_log_record_state(self): + """Whether to persist record state for stats migration.""" + return True + def _dry_load(self, entry): current_rdm_records_service.schema.load( entry["record"]["json"], @@ -678,8 +685,17 @@ def _load(self, entry): del entry["_clc_sync"] try: + ep_approval = entry.get("record", {}).get("ep_approval") + if ep_approval: + raise UnexpectedValue( + message="EP approval records must be loaded with the EP approval stream", + stage="load", + recid=recid, + priority="critical", + ) if self.dry_run: self._dry_load(entry) + recid_state_after_load = None else: with UnitOfWork(db.session) as uow: recid_state_after_load = self._load_versions(entry, uow) @@ -689,8 +705,10 @@ def _load(self, entry): ) self._after_load_clc_sync(recid_state_after_load) uow.commit() - self.migration_logger.finalise_record(recid) - except ManualImportRequired as e: + if self._finalise_on_load: + self.migration_logger.finalise_record(recid) + return recid_state_after_load + except (UnexpectedValue, ManualImportRequired) as e: self.migration_logger.add_log(e, record=entry) except GrantCreationError as e: self.migration_logger.add_log(e, record=entry) diff --git a/cds_migrator_kit/rdm/records/streams.py b/cds_migrator_kit/rdm/records/streams.py index 0a637798..eccfe125 100644 --- a/cds_migrator_kit/rdm/records/streams.py +++ b/cds_migrator_kit/rdm/records/streams.py @@ -11,7 +11,7 @@ from cds_migrator_kit.extract.extract import LegacyExtract from cds_migrator_kit.rdm.records.transform.transform import CDSToRDMRecordTransform -from .load import CDSRecordServiceLoad +from .load import CDSEPApprovalRecordServiceLoad, CDSRecordServiceLoad RecordStreamDefinition = StreamDefinition( name="records", @@ -20,3 +20,11 @@ load_cls=CDSRecordServiceLoad, ) """ETL stream for CDS to RDM records.""" + +RecordEPApprovalStreamDefinition = StreamDefinition( + name="records", + extract_cls=LegacyExtract, + transform_cls=CDSToRDMRecordTransform, + load_cls=CDSEPApprovalRecordServiceLoad, +) +"""ETL stream for CDS to RDM records with EP approval.""" diff --git a/cds_migrator_kit/rdm/records/transform/transform.py b/cds_migrator_kit/rdm/records/transform/transform.py index e5439114..023a5ee6 100644 --- a/cds_migrator_kit/rdm/records/transform/transform.py +++ b/cds_migrator_kit/rdm/records/transform/transform.py @@ -33,6 +33,7 @@ from cds_migrator_kit.errors import ( ManualImportRequired, MissingRequiredField, + MultipleModelsMatched, RecordFlaggedCuration, RestrictedFileDetected, UnexpectedValue, @@ -144,6 +145,7 @@ def __init__( self.access_grants_view = access_grants_view self.migration_logger = migration_logger self.record_state_logger = record_state_logger + self.ep_approval_request = None super().__init__(partial) def _created(self, entry): @@ -519,7 +521,9 @@ def subjects(json_entry): if item in keys: keys.remove(item) - forgotten_keys = [key for key in keys if key not in list(metadata.keys())] + used_keys = list(metadata.keys()) + ["ep_approval"] + self.ep_approval_request = json_entry.get("ep_approval", []) + forgotten_keys = [key for key in keys if key not in used_keys] if forgotten_keys: raise ManualImportRequired("Unassigned metadata key", value=forgotten_keys) return {k: v for k, v in metadata.items() if v} @@ -758,6 +762,7 @@ def transform(self, entry): # keep the original extracted entry for storing it "_original_dump": entry, "_clc_sync": clc_sync, + "ep_approval": self.ep_approval_request, } diff --git a/cds_migrator_kit/rdm/records/transform/xml_processing/rules/publications.py b/cds_migrator_kit/rdm/records/transform/xml_processing/rules/publications.py index 3803d0e2..53b7d814 100644 --- a/cds_migrator_kit/rdm/records/transform/xml_processing/rules/publications.py +++ b/cds_migrator_kit/rdm/records/transform/xml_processing/rules/publications.py @@ -562,3 +562,33 @@ def resource_type(self, key, value): raise IgnoreKey("resource_type") else: return mapping[best_value] + + +@model.over("ep_approval", "^9031_") +@for_each_value +def ep_approval(self, key, value): + """Translates EP approval status.""" + status = value.get("s", "").strip().lower() + submitted_by = value.get("f", "").strip().lower() + date = value.get("d", "").strip() + deadline = value.get("e", "").strip() + description = value.get("a", "").strip() + ep_report_number = value.get("b", "").strip() + stamp_info = value.get("g", "").strip() + doc_type = value.get("c", "").strip() + if status not in ["waiting", "approved"]: + raise UnexpectedValue(subfield="a", field=key, value=value) + return { + k: v + for k, v in { + "status": status, + "submitted_by": submitted_by, + "date": date, + "deadline": deadline, + "description": description, + "ep_report_number": ep_report_number, + "stamp_info": stamp_info, + "doc_type": doc_type, + }.items() + if v + }