From 07bcd7d0182ebf0537c107fee50d601236e1a733 Mon Sep 17 00:00:00 2001 From: Francesca Date: Tue, 9 Jun 2026 21:37:13 +0200 Subject: [PATCH 1/3] Add advanced OpenAlex ETL pipeline --- scripts/run_advanced_etl.py | 205 ++++++++++++++++ www/services/api_retriever.py | 213 ++++++++++++++++ www/services/format_functions.py | 35 ++- www/services/mappings.py | 112 +++++++++ www/services/standardizer.py | 404 +++++++++++++++++++++++++++++++ www/services/validators.py | 140 +++++++++++ 6 files changed, 1106 insertions(+), 3 deletions(-) create mode 100644 scripts/run_advanced_etl.py create mode 100644 www/services/api_retriever.py create mode 100644 www/services/mappings.py create mode 100644 www/services/standardizer.py create mode 100644 www/services/validators.py diff --git a/scripts/run_advanced_etl.py b/scripts/run_advanced_etl.py new file mode 100644 index 000000000..81fe55347 --- /dev/null +++ b/scripts/run_advanced_etl.py @@ -0,0 +1,205 @@ +""" +Command-line runner for the Advanced ETL pipeline. + +This script executes the full OpenAlex ETL workflow: + +1. EXTRACT: retrieve raw records from OpenAlex through the API. +2. TRANSFORM: convert raw JSON records into the WoS-like schema. +3. CALCULATED FIELDS: generate SR through the existing Bibliometrix-Python formatter. +4. VALIDATION: check mandatory columns, null values and type contracts. +5. LOAD: export the standardized DataFrame to CSV. + +Example: + +python scripts/run_advanced_etl.py --platform openalex --query "machine learning" --max-results 50 --output standardized_openalex.csv +""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd + + +REPO_ROOT = Path(__file__).resolve().parents[1] +SERVICES_DIR = REPO_ROOT / "www" / "services" +sys.path.insert(0, str(SERVICES_DIR)) + + +from api_retriever import fetch_openalex_works +from mappings import MULTI_VALUE_COLUMNS +from standardizer import standardize_openalex_records +from validators import validate_standardized_dataframe + + +def extract_records( + platform: str, + query: str, + max_results: int, +) -> List[Dict[str, Any]]: + """ + Dispatcher for the EXTRACT phase. + + Args: + platform: Selected bibliographic platform. + query: Textual query provided by the user. + max_results: Maximum number of records to retrieve. + + Returns: + Raw records retrieved from the selected platform. + """ + platform = platform.lower().strip() + + if platform == "openalex": + return fetch_openalex_works( + query=query, + max_results=max_results, + ) + + raise ValueError(f"Unsupported platform: {platform}") + + +def transform_records( + platform: str, + records: List[Dict[str, Any]], +) -> pd.DataFrame: + """ + Dispatcher for the TRANSFORM phase. + + Args: + platform: Selected bibliographic platform. + records: Raw records retrieved during the extract phase. + + Returns: + Standardized WoS-like DataFrame. + """ + platform = platform.lower().strip() + + if platform == "openalex": + return standardize_openalex_records(records) + + raise ValueError(f"Unsupported platform: {platform}") + + +def serialize_lists_for_csv(df: pd.DataFrame) -> pd.DataFrame: + """ + Serialize list-valued columns into semicolon-delimited strings. + + During transformation and validation, multi-value fields are kept as list[str]. + For flat CSV export, they are converted to strings using ';' as delimiter. + """ + output_df = df.copy() + + for column in MULTI_VALUE_COLUMNS: + output_df[column] = output_df[column].apply( + lambda values: "; ".join(values) if isinstance(values, list) else str(values) + ) + + return output_df + + +def save_standardized_csv(df: pd.DataFrame, output_path: str) -> None: + """ + Save the standardized DataFrame as a UTF-8 CSV file. + """ + output_df = serialize_lists_for_csv(df) + output_df.to_csv(output_path, index=False, encoding="utf-8") + + +def run_pipeline( + platform: str, + query: str, + max_results: int, + output_path: str, +) -> pd.DataFrame: + """ + Run the complete Advanced ETL pipeline. + """ + print("=== ADVANCED BIBLIOMETRIX ETL ===") + print(f"Platform: {platform}") + print(f"Query: {query}") + print(f"Max results: {max_results}") + + print("\n[1/5] EXTRACT - Retrieving raw API records...") + raw_records = extract_records( + platform=platform, + query=query, + max_results=max_results, + ) + print(f"Raw records retrieved: {len(raw_records)}") + + print("\n[2/5] TRANSFORM - Standardizing records into WoS-like schema...") + standardized_df = transform_records( + platform=platform, + records=raw_records, + ) + print(f"Standardized rows: {len(standardized_df)}") + + print("\n[3/5] CALCULATED FIELDS - SR generated during standardization.") + print("SR sample:") + print(standardized_df[["TI", "SR"]].head(3).to_string()) + + print("\n[4/5] VALIDATION - Checking schema, null values and type contracts...") + validate_standardized_dataframe(standardized_df) + print("VALIDATION PASSED") + + print("\n[5/5] LOAD - Saving standardized CSV...") + save_standardized_csv(standardized_df, output_path) + print(f"Output saved to: {output_path}") + + print("\nNormalized preview:") + preview_columns = ["DB", "UT", "DI", "TI", "PY", "TC", "AU", "SO", "SR"] + print(standardized_df[preview_columns].head().to_string()) + + return standardized_df + + +def parse_arguments() -> argparse.Namespace: + """ + Parse command-line arguments. + """ + parser = argparse.ArgumentParser( + description="Run the Advanced Bibliometrix ETL pipeline." + ) + + parser.add_argument( + "--platform", + required=True, + choices=["openalex"], + help="Open-access platform to query.", + ) + + parser.add_argument( + "--query", + required=True, + help='Textual search query, for example "machine learning".', + ) + + parser.add_argument( + "--max-results", + type=int, + default=50, + help="Maximum number of records to retrieve.", + ) + + parser.add_argument( + "--output", + default="standardized_openalex.csv", + help="Output CSV path.", + ) + + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_arguments() + + run_pipeline( + platform=args.platform, + query=args.query, + max_results=args.max_results, + output_path=args.output, + ) \ No newline at end of file diff --git a/www/services/api_retriever.py b/www/services/api_retriever.py new file mode 100644 index 000000000..84b06a547 --- /dev/null +++ b/www/services/api_retriever.py @@ -0,0 +1,213 @@ +""" +API retriever module for the Advanced ETL pipeline. + +This module implements the EXTRACT phase. +It retrieves raw bibliographic records from OpenAlex using a textual query. + +The output is intentionally raw JSON-like data. +No standardization is performed here. +""" + +from __future__ import annotations + +import os +import time +from typing import Any, Dict, List, Optional + +import requests + + +OPENALEX_WORKS_URL = "https://api.openalex.org/works" + + +class OpenAlexAPIError(RuntimeError): + """Raised when the OpenAlex API request fails after all retries.""" + + +def _build_openalex_params( + query: str, + cursor: str, + per_page: int, + api_key: Optional[str] = None, + mailto: Optional[str] = None, +) -> Dict[str, Any]: + """ + Build the parameters for the OpenAlex Works API. + + Args: + query: Textual search query, for example "machine learning". + cursor: Cursor used for pagination. The first cursor must be "*". + per_page: Number of records requested in one API call. + api_key: Optional OpenAlex API key. + mailto: Optional email address for polite API usage. + + Returns: + A dictionary containing the request parameters. + """ + params: Dict[str, Any] = { + "search": query, + "cursor": cursor, + "per_page": per_page, + "select": ",".join( + [ + "id", + "doi", + "title", + "display_name", + "publication_year", + "publication_date", + "type", + "language", + "cited_by_count", + "authorships", + "primary_location", + "locations", + "abstract_inverted_index", + "concepts", + "keywords", + "referenced_works", + "biblio", + ] + ), + } + + if api_key: + params["api_key"] = api_key + + if mailto: + params["mailto"] = mailto + + return params + + +def _request_with_retries( + url: str, + params: Dict[str, Any], + max_retries: int = 3, + timeout: int = 30, + backoff_seconds: float = 2.0, +) -> Dict[str, Any]: + """ + Perform an HTTP GET request with retry logic. + + This function retries temporary failures, server errors, and rate-limit errors. + + Args: + url: API endpoint. + params: Request parameters. + max_retries: Maximum number of retry attempts. + timeout: Request timeout in seconds. + backoff_seconds: Initial waiting time between retries. + + Returns: + Parsed JSON response. + + Raises: + OpenAlexAPIError: If the request fails after all retries. + """ + last_error: Optional[Exception] = None + + for attempt in range(max_retries): + try: + response = requests.get(url, params=params, timeout=timeout) + + if response.status_code == 429: + retry_after = response.headers.get("Retry-After") + sleep_time = float(retry_after) if retry_after else backoff_seconds * (2 ** attempt) + time.sleep(sleep_time) + continue + + if 500 <= response.status_code < 600: + time.sleep(backoff_seconds * (2 ** attempt)) + continue + + response.raise_for_status() + return response.json() + + except requests.RequestException as exc: + last_error = exc + time.sleep(backoff_seconds * (2 ** attempt)) + + raise OpenAlexAPIError( + f"OpenAlex request failed after {max_retries} attempts: {last_error}" + ) + + +def fetch_openalex_works( + query: str, + max_results: int = 100, + per_page: int = 100, + api_key: Optional[str] = None, + mailto: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + Retrieve raw works from OpenAlex using cursor pagination. + + Args: + query: Textual query inserted by the user. + max_results: Maximum number of records to retrieve. + per_page: Number of records per API call. + api_key: Optional OpenAlex API key. If missing, the function reads OPENALEX_API_KEY. + mailto: Optional email address. If missing, the function reads OPENALEX_MAILTO. + + Returns: + A list of raw OpenAlex records. + """ + if not query or not query.strip(): + raise ValueError("Query cannot be empty.") + + if max_results <= 0: + raise ValueError("max_results must be greater than 0.") + + if per_page <= 0 or per_page > 100: + raise ValueError("per_page must be between 1 and 100.") + + api_key = api_key or os.getenv("OPENALEX_API_KEY") + mailto = mailto or os.getenv("OPENALEX_MAILTO") + + records: List[Dict[str, Any]] = [] + cursor = "*" + + while len(records) < max_results: + current_per_page = min(per_page, max_results - len(records)) + + params = _build_openalex_params( + query=query, + cursor=cursor, + per_page=current_per_page, + api_key=api_key, + mailto=mailto, + ) + + payload = _request_with_retries( + OPENALEX_WORKS_URL, + params=params, + ) + + page_results = payload.get("results", []) + + if not page_results: + break + + records.extend(page_results) + + meta = payload.get("meta", {}) + cursor = meta.get("next_cursor") + + if not cursor: + break + + time.sleep(0.2) + + return records[:max_results] + + +if __name__ == "__main__": + works = fetch_openalex_works("machine learning", max_results=5) + + print(f"Retrieved records: {len(works)}") + + for index, work in enumerate(works, start=1): + title = work.get("title") or work.get("display_name") or "No title" + year = work.get("publication_year") or "No year" + print(f"{index}. {title} ({year})") \ No newline at end of file diff --git a/www/services/format_functions.py b/www/services/format_functions.py index 1a8ee7af4..bca2d0a15 100644 --- a/www/services/format_functions.py +++ b/www/services/format_functions.py @@ -1,5 +1,14 @@ -from .utils import * -from .parsers import * +try: + from .utils import * +except ImportError: + pass + +try: + from .parsers import * +except ImportError: + pass + +import re import zipfile import tempfile import os @@ -1252,7 +1261,27 @@ def format_sr_column(entry, source, file_type): # Function for SR Column (forma publication_year = entry.get('YR', '') ta = entry.get('SO', '') sr = author + ', ' + publication_year + ', ' + ta - + elif source == 'OpenAlex': + authors = entry.get('AU', []) + if isinstance(authors, list) and len(authors) > 0: + first_author = str(authors[0]).strip() + else: + first_author = str(authors or '').strip() + + publication_year = str(entry.get('PY', '')).strip() + + journal = entry.get('SO', '') + if isinstance(journal, list): + journal = journal[0] if len(journal) > 0 else '' + journal = str(journal).strip() + + sr_parts = [ + part + for part in [first_author, publication_year, journal] + if part + ] + + sr = ', '.join(sr_parts) return sr diff --git a/www/services/mappings.py b/www/services/mappings.py new file mode 100644 index 000000000..07bc81221 --- /dev/null +++ b/www/services/mappings.py @@ -0,0 +1,112 @@ +""" +Mapping definitions for the Advanced ETL pipeline. + +This module contains the target WoS-like schema and the OpenAlex-to-WoS +mapping rules. No extraction or transformation logic is executed here. +Keeping mappings separated makes the ETL pipeline easier to maintain +and avoids hardcoded transformations inside analytical functions. +""" + +from __future__ import annotations + +from typing import Dict, List, Set + + +STANDARD_COLUMNS: List[str] = [ + "DB", + "UT", + "DI", + "PMID", + "TI", + "SO", + "JI", + "PY", + "DT", + "LA", + "TC", + "AU", + "AF", + "C1", + "RP", + "CR", + "DE", + "ID", + "AB", + "VL", + "IS", + "BP", + "EP", + "SR", +] + + +MULTI_VALUE_COLUMNS: Set[str] = { + "AU", + "AF", + "C1", + "CR", + "DE", + "ID", +} + + +STRING_COLUMNS: Set[str] = { + "DB", + "UT", + "DI", + "PMID", + "TI", + "SO", + "JI", + "PY", + "DT", + "LA", + "RP", + "AB", + "VL", + "IS", + "BP", + "EP", + "SR", +} + + +INTEGER_COLUMNS: Set[str] = { + "TC", +} + + +OPENALEX_TO_WOS_MAPPING: Dict[str, str] = { + "id": "UT", + "doi": "DI", + "title": "TI", + "display_name": "TI", + "publication_year": "PY", + "type": "DT", + "language": "LA", + "cited_by_count": "TC", + "referenced_works": "CR", +} + + +OPENALEX_DERIVED_FIELDS: Dict[str, str] = { + "database_source": "DB", + "source_name": "SO", + "source_abbreviation": "JI", + "authors_short": "AU", + "authors_full": "AF", + "author_affiliations": "C1", + "author_keywords": "DE", + "index_keywords": "ID", + "abstract_text": "AB", + "volume": "VL", + "issue": "IS", + "first_page": "BP", + "last_page": "EP", + "short_reference": "SR", +} + + +DATABASE_SOURCE_LABELS: Dict[str, str] = { + "openalex": "OPENALEX", +} \ No newline at end of file diff --git a/www/services/standardizer.py b/www/services/standardizer.py new file mode 100644 index 000000000..fc263d338 --- /dev/null +++ b/www/services/standardizer.py @@ -0,0 +1,404 @@ +""" +Standardization module for the Advanced ETL pipeline. + +This module implements the TRANSFORM phase. +It converts raw OpenAlex JSON records into the WoS-like schema required by +Bibliometrix-Python. + +The module does not retrieve data from APIs and does not save files. +It only transforms records and enforces basic type contracts. +""" + +from __future__ import annotations + +import re +from typing import Any, Dict, List, Optional + +import pandas as pd + +try: + from .mappings import ( + DATABASE_SOURCE_LABELS, + MULTI_VALUE_COLUMNS, + OPENALEX_TO_WOS_MAPPING, + STANDARD_COLUMNS, + ) +except ImportError: + from mappings import ( + DATABASE_SOURCE_LABELS, + MULTI_VALUE_COLUMNS, + OPENALEX_TO_WOS_MAPPING, + STANDARD_COLUMNS, + ) +try: + from .format_functions import format_sr_column +except Exception: + try: + from format_functions import format_sr_column + except Exception: + format_sr_column = None + +def is_missing(value: Any) -> bool: + """ + Check whether a value is missing. + + This prevents None, NaN, empty strings and textual null values from reaching + the final standardized DataFrame. + """ + if value is None: + return True + + try: + if pd.isna(value): + return True + except (TypeError, ValueError): + pass + + if isinstance(value, str) and value.strip().lower() in { + "", + "nan", + "none", + "null", + "na", + "n/a", + }: + return True + + return False + + +def clean_string(value: Any) -> str: + """ + Convert a scalar value into a clean string. + + Missing scalar values are replaced by an empty string, as required by the + project type contracts. + """ + if is_missing(value): + return "" + + if isinstance(value, list): + return "; ".join(clean_string(item) for item in value if not is_missing(item)) + + return str(value).strip() + + +def clean_doi(value: Any) -> str: + """ + Normalize DOI values. + + OpenAlex often returns DOI values as URLs, for example: + https://doi.org/10.xxxx/yyyy + + The standardized output keeps only the DOI code. + """ + doi = clean_string(value) + + if doi.lower().startswith("https://doi.org/"): + doi = doi[len("https://doi.org/") :] + + if doi.lower().startswith("http://doi.org/"): + doi = doi[len("http://doi.org/") :] + + return doi.strip() + + +def clean_year(value: Any) -> str: + """ + Extract a four-digit publication year. + """ + text = clean_string(value) + match = re.search(r"(19|20)\d{2}", text) + return match.group(0) if match else "" + + +def clean_integer(value: Any) -> int: + """ + Convert a value to integer. + + Invalid or missing values are converted to 0. + """ + if is_missing(value): + return 0 + + try: + return int(float(clean_string(value))) + except ValueError: + return 0 + + +def ensure_list_of_strings(value: Any) -> List[str]: + """ + Convert a value into list[str]. + + This is used for multi-value fields such as AU, AF, C1, CR, DE and ID. + """ + if is_missing(value): + return [] + + if isinstance(value, list): + cleaned_items: List[str] = [] + + for item in value: + if isinstance(item, list): + cleaned_items.extend(ensure_list_of_strings(item)) + elif isinstance(item, dict): + item_string = clean_string( + item.get("id") + or item.get("display_name") + or item.get("title") + or item + ) + if item_string: + cleaned_items.append(item_string) + else: + item_string = clean_string(item) + if item_string: + cleaned_items.append(item_string) + + return cleaned_items + + text = clean_string(value) + + if not text: + return [] + + parts = re.split(r"\s*;\s*|\s*\|\s*|\n+", text) + return [part.strip() for part in parts if part.strip()] + + +def reconstruct_abstract(abstract_inverted_index: Optional[Dict[str, List[int]]]) -> str: + """ + Reconstruct an OpenAlex abstract from its inverted-index format. + + OpenAlex stores abstracts as: + { + "machine": [0], + "learning": [1] + } + + This function reconstructs: + "machine learning" + """ + if not abstract_inverted_index or not isinstance(abstract_inverted_index, dict): + return "" + + positioned_words: Dict[int, str] = {} + + for word, positions in abstract_inverted_index.items(): + if not isinstance(positions, list): + continue + + for position in positions: + if isinstance(position, int): + positioned_words[position] = word + + if not positioned_words: + return "" + + return " ".join(positioned_words[index] for index in sorted(positioned_words)) + + +def get_source_info(record: Dict[str, Any]) -> Dict[str, str]: + """ + Extract journal/source information from an OpenAlex record. + """ + primary_location = record.get("primary_location") or {} + source = primary_location.get("source") or {} + + source_name = clean_string(source.get("display_name")) + source_abbreviation = clean_string(source.get("abbreviated_title")) + + return { + "SO": source_name, + "JI": source_abbreviation, + } + + +def get_biblio_info(record: Dict[str, Any]) -> Dict[str, str]: + """ + Extract volume, issue and page information from the OpenAlex biblio object. + """ + biblio = record.get("biblio") or {} + + return { + "VL": clean_string(biblio.get("volume")), + "IS": clean_string(biblio.get("issue")), + "BP": clean_string(biblio.get("first_page")), + "EP": clean_string(biblio.get("last_page")), + } + + +def get_author_info(record: Dict[str, Any]) -> Dict[str, List[str]]: + """ + Extract author names and affiliations from OpenAlex authorships. + """ + authorships = record.get("authorships") or [] + + authors_short: List[str] = [] + authors_full: List[str] = [] + affiliations: List[str] = [] + + for authorship in authorships: + author = authorship.get("author") or {} + author_name = clean_string(author.get("display_name")) + + if author_name: + authors_full.append(author_name) + authors_short.append(author_name) + + institutions = authorship.get("institutions") or [] + + for institution in institutions: + institution_name = clean_string(institution.get("display_name")) + country_code = clean_string(institution.get("country_code")) + + if institution_name and country_code: + affiliations.append(f"{institution_name}, {country_code}") + elif institution_name: + affiliations.append(institution_name) + + raw_affiliation_strings = authorship.get("raw_affiliation_strings") or [] + + for raw_affiliation in raw_affiliation_strings: + raw_affiliation = clean_string(raw_affiliation) + if raw_affiliation: + affiliations.append(raw_affiliation) + + return { + "AU": authors_short, + "AF": authors_full, + "C1": list(dict.fromkeys(affiliations)), + } + + +def get_keyword_info(record: Dict[str, Any]) -> Dict[str, List[str]]: + """ + Extract keywords and concepts from OpenAlex. + + In the standardized schema: + - DE contains author-like keywords where available. + - ID contains broader index/concept keywords. + """ + keywords = record.get("keywords") or [] + concepts = record.get("concepts") or [] + + author_keywords: List[str] = [] + index_keywords: List[str] = [] + + for keyword in keywords: + keyword_text = clean_string( + keyword.get("display_name") + or keyword.get("keyword") + or keyword.get("name") + ) + + if keyword_text: + author_keywords.append(keyword_text) + + for concept in concepts: + concept_name = clean_string(concept.get("display_name")) + + if concept_name: + index_keywords.append(concept_name) + + return { + "DE": list(dict.fromkeys(author_keywords)), + "ID": list(dict.fromkeys(index_keywords)), + } + + +def create_empty_standard_record() -> Dict[str, Any]: + """ + Create an empty record with all mandatory standard columns. + """ + record: Dict[str, Any] = {} + + for column in STANDARD_COLUMNS: + if column in MULTI_VALUE_COLUMNS: + record[column] = [] + elif column == "TC": + record[column] = 0 + else: + record[column] = "" + + return record + + +def standardize_openalex_record(record: Dict[str, Any]) -> Dict[str, Any]: + """ + Convert one raw OpenAlex record into the WoS-like target schema. + """ + standardized = create_empty_standard_record() + + standardized["DB"] = DATABASE_SOURCE_LABELS["openalex"] + + for openalex_field, wos_field in OPENALEX_TO_WOS_MAPPING.items(): + raw_value = record.get(openalex_field) + + if wos_field == "DI": + standardized[wos_field] = clean_doi(raw_value) + elif wos_field == "PY": + standardized[wos_field] = clean_year(raw_value) + elif wos_field == "TC": + standardized[wos_field] = clean_integer(raw_value) + elif wos_field in MULTI_VALUE_COLUMNS: + standardized[wos_field] = [ + clean_string(item) + for item in ensure_list_of_strings(raw_value) + if clean_string(item) + ] + else: + standardized[wos_field] = clean_string(raw_value) + + source_info = get_source_info(record) + standardized.update(source_info) + + biblio_info = get_biblio_info(record) + standardized.update(biblio_info) + + author_info = get_author_info(record) + standardized.update(author_info) + + keyword_info = get_keyword_info(record) + standardized.update(keyword_info) + + standardized["AB"] = reconstruct_abstract(record.get("abstract_inverted_index")) + + if format_sr_column is not None: + standardized["SR"] = clean_string( + format_sr_column( + standardized, + source="OpenAlex", + file_type=".json", + ) + ) + return standardized + + +def standardize_openalex_records(records: List[Dict[str, Any]]) -> pd.DataFrame: + """ + Convert a list of raw OpenAlex records into a standardized DataFrame. + """ + standardized_records = [ + standardize_openalex_record(record) + for record in records + ] + + df = pd.DataFrame(standardized_records) + + for column in STANDARD_COLUMNS: + if column not in df.columns: + if column in MULTI_VALUE_COLUMNS: + df[column] = [[] for _ in range(len(df))] + elif column == "TC": + df[column] = 0 + else: + df[column] = "" + + df = df[STANDARD_COLUMNS] + + for column in MULTI_VALUE_COLUMNS: + df[column] = df[column].apply(ensure_list_of_strings) + + return df \ No newline at end of file diff --git a/www/services/validators.py b/www/services/validators.py new file mode 100644 index 000000000..1f6e43f7e --- /dev/null +++ b/www/services/validators.py @@ -0,0 +1,140 @@ +""" +Validation module for the Advanced ETL pipeline. + +This module implements the VALIDATION phase required by the project. +It checks that the standardized DataFrame follows the WoS-like schema, +contains no null values, and respects the expected type contracts. +""" + +from __future__ import annotations + +from typing import List +from numbers import Integral + +import pandas as pd + +try: + from .mappings import ( + INTEGER_COLUMNS, + MULTI_VALUE_COLUMNS, + STANDARD_COLUMNS, + STRING_COLUMNS, + ) +except ImportError: + from mappings import ( + INTEGER_COLUMNS, + MULTI_VALUE_COLUMNS, + STANDARD_COLUMNS, + STRING_COLUMNS, + ) + + +class ValidationError(ValueError): + """Raised when the standardized DataFrame violates the target schema.""" + + +def validate_mandatory_columns(df: pd.DataFrame) -> None: + """ + Verify that all mandatory WoS-like columns exist in the DataFrame. + """ + missing_columns: List[str] = [ + column + for column in STANDARD_COLUMNS + if column not in df.columns + ] + + if missing_columns: + raise ValidationError( + f"Missing mandatory columns: {missing_columns}" + ) + + +def validate_no_null_values(df: pd.DataFrame) -> None: + """ + Verify that the DataFrame contains no pandas NaN values. + """ + if df.isna().any().any(): + columns_with_nulls = df.columns[df.isna().any()].tolist() + raise ValidationError( + f"Null values found in columns: {columns_with_nulls}" + ) + + for column in df.columns: + for index, value in df[column].items(): + if value is None: + raise ValidationError( + f"None value found at row {index}, column {column}" + ) + + +def validate_multi_value_columns(df: pd.DataFrame) -> None: + """ + Verify that multi-value fields are list[str]. + + Multi-value columns such as AU, AF, C1, CR, DE and ID must be Python lists. + Each element inside the list must be a string. + """ + for column in MULTI_VALUE_COLUMNS: + if column not in df.columns: + continue + + for index, value in df[column].items(): + if not isinstance(value, list): + raise ValidationError( + f"Column {column} at row {index} must be a Python list, " + f"got {type(value).__name__}" + ) + + for item in value: + if not isinstance(item, str): + raise ValidationError( + f"Column {column} at row {index} must contain only strings. " + f"Invalid item type: {type(item).__name__}. " + f"Invalid item value: {repr(item)}" + ) + + +def validate_string_columns(df: pd.DataFrame) -> None: + """ + Verify that scalar string fields are strings. + """ + for column in STRING_COLUMNS: + if column not in df.columns: + continue + + for index, value in df[column].items(): + if not isinstance(value, str): + raise ValidationError( + f"Column {column} at row {index} must be str, " + f"got {type(value).__name__}" + ) + + +def validate_integer_columns(df: pd.DataFrame) -> None: + """ + Verify that integer fields are integers. + """ + for column in INTEGER_COLUMNS: + if column not in df.columns: + continue + + for index, value in df[column].items(): + if not isinstance(value, int): + raise ValidationError( + f"Column {column} at row {index} must be int, " + f"got {type(value).__name__}" + ) + + +def validate_standardized_dataframe(df: pd.DataFrame) -> None: + """ + Run the complete validation suite on a standardized DataFrame. + + The function raises ValidationError if the DataFrame is invalid. + If no exception is raised, the DataFrame is valid. + """ + validate_mandatory_columns(df) + validate_no_null_values(df) + validate_multi_value_columns(df) + validate_string_columns(df) + validate_integer_columns(df) \ No newline at end of file From c3e746e0c430cbcb22bd537d6df4320a50cb49cb Mon Sep 17 00:00:00 2001 From: Francesca Date: Tue, 9 Jun 2026 21:39:17 +0200 Subject: [PATCH 2/3] Ignore generated ETL output --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 23b99e089..ede19691b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ __pycache__/ bibliovenv/ Bibenv/ -.idea/ \ No newline at end of file +.idea/ +standardized_openalex.csv \ No newline at end of file From 6337c4ac80c96db063c51e0d5168de8ad27a0ca4 Mon Sep 17 00:00:00 2001 From: Francesca Date: Sat, 13 Jun 2026 13:44:03 +0200 Subject: [PATCH 3/3] Ignore local dashboard test artifacts --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ede19691b..bdbb8030b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ __pycache__/ bibliovenv/ Bibenv/ .idea/ -standardized_openalex.csv \ No newline at end of file +standardized_openalex.csv +.venv311/ +standardized_openalex_preview.xlsx \ No newline at end of file