Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 125 additions & 2 deletions arch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,37 @@
import re
from collections import Counter
from dataclasses import asdict, dataclass, field
from datetime import date
from decimal import Decimal
from typing import Any

Scalar = str | int | float | bool | None

ALLOWED_PERIOD_TYPES = {"calendar_year", "tax_year", "fiscal_year", "month"}
ALLOWED_PERIOD_BASES = {
"calendar_year",
"tax_year",
"fiscal_year",
"us_federal_fiscal_year",
"uk_fiscal_year",
"state_fiscal_year",
"reference_month",
"benefit_month",
"payment_month",
"payment_date_fiscal_year",
"statistical_annual",
"projection_year",
}
ALLOWED_ACCOUNTING_BASES = {
"accrual",
"cash",
"cash_outlay",
"cash_payment",
"benefit_month",
"payment_date",
"statistical_total",
"projection",
}
ALLOWED_GEOGRAPHY_LEVELS = {
"country",
"region",
Expand Down Expand Up @@ -70,6 +95,12 @@ class PeriodDimension:

type: str
value: int | str
start_date: str | None = None
end_date: str | None = None
basis: str | None = None
authority: str | None = None
source_label: str | None = None
accounting_basis: str | None = None


@dataclass(frozen=True)
Expand Down Expand Up @@ -247,7 +278,9 @@ def build_label(fact: AggregateFact) -> str:
concept = _humanize(fact.measure.concept)
aggregation = _humanize(fact.aggregation.method)
entity = _humanize(fact.entity.name)
period = f"{fact.period.value} {_humanize(fact.period.type)}"
period = fact.period.source_label or (
f"{fact.period.value} {_humanize(fact.period.type)}"
)
geography = fact.geography.name or fact.geography.id
source = _source_label(fact.source)

Expand Down Expand Up @@ -373,6 +406,7 @@ def validate_fact(fact: AggregateFact) -> tuple[ValidationIssue, ...]:
errors.append(
_issue("missing_period", "Period value is required", "period.value")
)
_validate_period_semantics(errors, fact.period)

if fact.geography.level not in ALLOWED_GEOGRAPHY_LEVELS:
errors.append(
Expand Down Expand Up @@ -500,7 +534,7 @@ def fact_counts(facts: list[AggregateFact]) -> dict[str, dict[str, int]]:

def _canonical_key_payload(fact: AggregateFact) -> dict[str, Any]:
payload = {
"period": asdict(fact.period),
"period": _period_key_payload(fact.period),
"geography": {
"level": fact.geography.level,
"id": fact.geography.id,
Expand All @@ -527,6 +561,95 @@ def _canonical_key_payload(fact: AggregateFact) -> dict[str, Any]:
return payload


def _period_key_payload(period: PeriodDimension) -> dict[str, Any]:
return {key: value for key, value in asdict(period).items() if value is not None}


def _validate_period_semantics(
errors: list[ValidationIssue],
period: PeriodDimension,
) -> None:
if period.basis is not None and period.basis not in ALLOWED_PERIOD_BASES:
errors.append(
_issue(
"malformed_period",
f"Unsupported period basis: {period.basis!r}",
"period.basis",
)
)
if (
period.accounting_basis is not None
and period.accounting_basis not in ALLOWED_ACCOUNTING_BASES
):
errors.append(
_issue(
"malformed_period",
f"Unsupported accounting basis: {period.accounting_basis!r}",
"period.accounting_basis",
)
)
if period.authority is not None and not period.authority.strip():
errors.append(
_issue(
"missing_period",
"Period authority must be nonempty when provided",
"period.authority",
)
)
if period.source_label is not None and not period.source_label.strip():
errors.append(
_issue(
"missing_period",
"Period source label must be nonempty when provided",
"period.source_label",
)
)

parsed_start = _parse_iso_date(errors, period.start_date, "period.start_date")
parsed_end = _parse_iso_date(errors, period.end_date, "period.end_date")
if (
parsed_start is not None
and parsed_end is not None
and parsed_start > parsed_end
):
errors.append(
_issue(
"malformed_period",
"Period start_date must be on or before end_date",
"period.start_date",
)
)


def _parse_iso_date(
errors: list[ValidationIssue],
value: str | None,
field_name: str,
) -> date | None:
if value is None:
return None
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", value):
errors.append(
_issue(
"malformed_period",
f"Period date must use ISO YYYY-MM-DD format: {value!r}",
field_name,
)
)
return None
try:
return date.fromisoformat(value)
except ValueError:
errors.append(
_issue(
"malformed_period",
f"Period date must use ISO YYYY-MM-DD format: {value!r}",
field_name,
)
)
return None


def _validate_value(errors: list[ValidationIssue], value: Any) -> None:
if isinstance(value, Decimal):
return
Expand Down
68 changes: 56 additions & 12 deletions arch/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
source_row_to_mapping,
)

ARCH_DB_SCHEMA_VERSION = "arch.relational.v1"
ARCH_DB_SCHEMA_VERSION = "arch.relational.v2"


@dataclass(frozen=True)
Expand Down Expand Up @@ -77,9 +77,7 @@ def build_arch_db(
columns = source_columns_from_source_rows(rows)
source_row_values_count = sum(len(row.values) for row in rows)
resolved_build_id = build_id or _build_id(facts, cells, rows)
fact_constraints = [
(fact, build_aggregate_constraints(fact)) for fact in facts
]
fact_constraints = [(fact, build_aggregate_constraints(fact)) for fact in facts]
source_record_ids = {
fact.source_record_id for fact in facts if fact.source_record_id is not None
}
Expand Down Expand Up @@ -117,9 +115,7 @@ def build_arch_db(
return ArchDbBuildReport(
build_id=resolved_build_id,
facts_count=len(facts),
constraints_count=sum(
len(constraints) for _, constraints in fact_constraints
),
constraints_count=sum(len(constraints) for _, constraints in fact_constraints),
source_records_count=len(source_record_ids),
source_rows_count=len(rows),
source_columns_count=len(columns),
Expand Down Expand Up @@ -244,13 +240,25 @@ def _create_schema(connection: sqlite3.Connection) -> None:
legal_vintage TEXT,
period_type TEXT,
period_value TEXT,
period_start_date TEXT,
period_end_date TEXT,
period_basis TEXT,
period_authority TEXT,
period_source_label TEXT,
period_accounting_basis TEXT,
PRIMARY KEY (
source_concept,
canonical_concept,
relation,
legal_vintage,
period_type,
period_value
period_value,
period_start_date,
period_end_date,
period_basis,
period_authority,
period_source_label,
period_accounting_basis
)
);

Expand Down Expand Up @@ -278,6 +286,12 @@ def _create_schema(connection: sqlite3.Connection) -> None:
value_numeric REAL,
period_type TEXT NOT NULL,
period_value TEXT NOT NULL,
period_start_date TEXT,
period_end_date TEXT,
period_basis TEXT,
period_authority TEXT,
period_source_label TEXT,
period_accounting_basis TEXT,
geography_level TEXT NOT NULL,
geography_id TEXT NOT NULL,
geography_vintage TEXT,
Expand Down Expand Up @@ -740,7 +754,7 @@ def _insert_concept_alignments(
facts: list[AggregateFact],
build_id: str,
) -> None:
seen: set[tuple[str, str, str, str | None, str, str]] = set()
seen: set[tuple[Any, ...]] = set()
for fact in facts:
measure = fact.measure
if not measure.source_concept or not measure.concept_relation:
Expand All @@ -752,6 +766,12 @@ def _insert_concept_alignments(
measure.legal_vintage,
fact.period.type,
str(fact.period.value),
fact.period.start_date,
fact.period.end_date,
fact.period.basis,
fact.period.authority,
fact.period.source_label,
fact.period.accounting_basis,
)
if key in seen:
continue
Expand All @@ -768,9 +788,15 @@ def _insert_concept_alignments(
evidence_notes,
legal_vintage,
period_type,
period_value
period_value,
period_start_date,
period_end_date,
period_basis,
period_authority,
period_source_label,
period_accounting_basis
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
measure.source_concept,
Expand All @@ -783,6 +809,12 @@ def _insert_concept_alignments(
measure.legal_vintage,
fact.period.type,
str(fact.period.value),
fact.period.start_date,
fact.period.end_date,
fact.period.basis,
fact.period.authority,
fact.period.source_label,
fact.period.accounting_basis,
),
)

Expand Down Expand Up @@ -820,6 +852,12 @@ def _insert_aggregate_fact(
value_numeric,
period_type,
period_value,
period_start_date,
period_end_date,
period_basis,
period_authority,
period_source_label,
period_accounting_basis,
geography_level,
geography_id,
geography_vintage,
Expand Down Expand Up @@ -849,7 +887,7 @@ def _insert_aggregate_fact(
source_extraction_method,
source_method_notes
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
fact_key,
Expand All @@ -875,6 +913,12 @@ def _insert_aggregate_fact(
_numeric_value(fact.value),
fact.period.type,
str(fact.period.value),
fact.period.start_date,
fact.period.end_date,
fact.period.basis,
fact.period.authority,
fact.period.source_label,
fact.period.accounting_basis,
fact.geography.level,
fact.geography.id,
fact.geography.vintage,
Expand Down
Loading
Loading