diff --git a/.github/workflows/pipeline-tests.yml b/.github/workflows/pipeline-tests.yml new file mode 100644 index 00000000..5fb01643 --- /dev/null +++ b/.github/workflows/pipeline-tests.yml @@ -0,0 +1,36 @@ +name: Derived-parquet pipeline tests + +# Fast, AI-free gate for the data pipeline. Runs the fixture-based unit tests +# (no network for data, no large parquet) whenever the pipeline code changes. +on: + pull_request: + paths: + - "scripts/build_frontend_derived.py" + - "scripts/validate_frontend_derived.py" + - "tests/test_frontend_derived.py" + - "scripts/requirements.txt" + - "Makefile" + - ".github/workflows/pipeline-tests.yml" + push: + branches: [main] + paths: + - "scripts/build_frontend_derived.py" + - "scripts/validate_frontend_derived.py" + - "tests/test_frontend_derived.py" + workflow_dispatch: + +jobs: + fixture-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - name: Install deps + run: pip install -r scripts/requirements.txt + - name: Run pipeline fixture tests + # builds tiny synthetic wides (WKB BLOB + DuckDB GEOMETRY), runs the real + # builder + algebraic validator, asserts the contract. Exits non-zero on + # any failure -> PR is blocked. + run: python -m pytest tests/test_frontend_derived.py -q diff --git a/DATA_PROVENANCE.md b/DATA_PROVENANCE.md new file mode 100644 index 00000000..4b795ce9 --- /dev/null +++ b/DATA_PROVENANCE.md @@ -0,0 +1,83 @@ +# iSamples Explorer — Data Provenance + +How every parquet file the explorer uses is generated, from root to publish. +*Reviewed 2026-06-02 (CC, via codebase audit). Complements `SERIALIZATIONS.md` (format/schema reference); this file is the end-to-end build chain + the automation gaps.* + +> **Load-bearing constraint:** the **root export cannot be regenerated.** It was produced from the iSamples Central Solr API (`central.isample.xyz`), **offline since Aug 2025**. The Zenodo-archived export is a **frozen root**. Any *new* data (e.g. concept URIs, thumbnails) therefore must come from a **per-source supplementary file merged into the base by `pid`** — the "sidecar" pattern (see Stage 3) — not from re-exporting. + +## Pipeline DAG + +``` +Source collections (SESAR · OpenContext · GEOME · Smithsonian) + │ iSamples Central Solr API ── OFFLINE since Aug 2025 (cannot re-run) ──┐ + ▼ │ +STAGE 0/1 export_client → JSONL → GeoParquet │ frozen + → isamples_export_*_geo.parquet (Export format; ~300MB, 6.7M; Zenodo doi:10.5281/zenodo.15278211) + ▼ +STAGE 2 pqg/pqg/sql_converter.py (export → base PQG; 7-stage DuckDB SQL) + → narrow (…_narrow.parquet, ~844MB, 106M rows) and wide (…_wide.parquet, ~282MB, 20M rows) + ▼ +STAGE 3 sidecar/enrichment merge (LEFT JOIN by pid) ← Eric's independently-maintained OC PQG (GCS) + scripts/enrich_wide_with_oc_thumbnails.py → isamples_202604_wide.parquet (+47K thumbnails) + ▼ +STAGE 4 wide → frontend derived files (NOW SCRIPTED: scripts/build_frontend_derived.py) + → wide_h3 · h3_summary_res4/6/8 · samples_map_lite · sample_facets_v2 · facet_summaries · facet_cross_filter + → vocab_labels (scripts/build_vocab_labels.py — built separately from SKOS TTLs) + → {tag}_manifest.json (build identity: input+output sha256, argv, git SHA, DuckDB/extension versions) + ▼ +STAGE 5 publish to R2 (bucket isamples-ry) + Cloudflare Worker (data.isamples.org, /current/ aliases) + ▼ +DuckDB-WASM in the browser (explorer.qmd; parquet URLs ~L767-781) +``` + +## Stages (script / command per step) + +| Stage | Input → Output | How (file:line) | Automated? | +|---|---|---|---| +| **0/1 Export** | Solr API → `isamples_export_*_geo.parquet` | `export_client` `ExportClient.perform_full_download()` (`export_client.py:423-469`) → `write_geoparquet_from_json_lines()`; schema `SOURCE_COLUMNS` (`duckdb_utilities.py:9-42`, incl. `keywords: STRUCT(keyword VARCHAR)[]` — **text only, no URI**, L17) | ❌ API offline; **frozen** | +| **2 Base PQG** | export → `*_narrow.parquet` / `*_wide.parquet` | `pqg/pqg/sql_converter.py` `convert_isamples_sql(input, output, wide=…)` (CLI `python pqg/sql_converter.py in.parquet out.parquet [--wide]`); 7 stages, decomposes nested structs → nodes+edges; site dedupe by rounded lat/lon+label | ✅ scripted (exact prod invocation not recorded — gap) | +| **3 Sidecar merge** | base wide + Eric's OC PQG → `isamples_202604_wide.parquet` | `scripts/enrich_wide_with_oc_thumbnails.py` — `LEFT JOIN` OC `(pid, thumbnail_url)` into wide (`COALESCE`). **This is the precedent for merging ANY per-source supplement (incl. concept URIs) by pid.** Drift check: `scripts/check_oc_pqg_drift.py` (detects only; no mirror) | ⚠️ merge scripted; OC mirror + R2 upload manual | +| **4 Frontend derived** | wide → 7 explorer files | The 6 map/facet files (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) ← **`scripts/build_frontend_derived.py`** (deterministic; geometry-agnostic; emits a manifest). `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs). Gated by `scripts/validate_frontend_derived.py` (algebraic + `--wide` semantic re-derivation) + `tests/test_frontend_derived.py` (fixtures, CI). | ✅ scripted; facet/map files semantic-tested; wide_h3 column-smoke-tested | +| **5 Publish** | files → R2 + Worker | Worker `workers/data-isamples-org/src/index.js` (`wrangler deploy`); immutable cache for `isamples_\d{6}_*.parquet`; `/current/.parquet` → 302 via `current/manifest.json`. Bucket `isamples-ry` | ⚠️ Worker scripted; **file upload + manifest update are manual** | + +## The sidecar/enrichment pattern (how new data gets in) + +Because the export is frozen, new per-source data is added by **merging a supplementary parquet keyed by `pid` into the base wide** — exactly what the thumbnail enrichment does: + +```sql +-- scripts/enrich_wide_with_oc_thumbnails.py (core) +CREATE TEMP TABLE oc_thumbs AS + SELECT DISTINCT pid, thumbnail_url FROM read_parquet('') WHERE thumbnail_url IS NOT NULL; +COPY (SELECT p.* REPLACE (COALESCE(oc.thumbnail_url, p.thumbnail_url) AS thumbnail_url) + FROM read_parquet('') p LEFT JOIN oc_thumbs oc ON p.pid = oc.pid) + TO '' (FORMAT PARQUET, COMPRESSION ZSTD); +``` + +Eric Kansa maintains OpenContext PQG **independently** on GCS (`storage.googleapis.com/opencontext-parquet/oc_isamples_pqg.parquet`), so it can carry data the frozen iSamples export lacks. This is the channel for **#263** (external concept URIs): Eric's OC PQG carries them → merged into wide by pid → flows to the derived files. *(Sidecar design endorsed 2026-04-17; the spec `project_isamples_sidecar_pattern.md` lives in the Obsidian vault, not a repo — gap.)* + +## Stage 4 builder contract (`scripts/build_frontend_derived.py`) + +- **Geometry-agnostic input.** The `geometry` column may be **WKB BLOB** (e.g. `isamples_202604_wide`) or DuckDB **GEOMETRY** (e.g. `isamples_202601_wide`, the Zenodo wide). The builder detects the type at runtime — earlier ad-hoc SQL assumed BLOB and threw `BinderException` on GEOMETRY wides. +- **Material selection (#265/#271).** `material` = the **first NON-ROOT** concept in `p__has_material_category` (the root `.../material/1.0/material` "Material" can sit at any array position). Samples tagged only at the root get `NULL` material (excluded from the facet). This is **NOT leaf/most-specific** selection — the arrays are not clean SKOS paths. `context`/`object_type` use `[1]`; their root-dropping is deferred. +- **Determinism.** Every COPY has `ORDER BY`; `dominant_source` ties break on source name (ASC); center lat/lng rounded to 6 dp. +- **Reproducibility & build identity.** Each run writes `{tag}_manifest.json` (input + per-output sha256, argv, git SHA, DuckDB + extension versions). DuckDB pinned in `scripts/requirements.txt`. +- **Tested.** `tests/test_frontend_derived.py` (fixtures, CI via `.github/workflows/pipeline-tests.yml`) + `scripts/validate_frontend_derived.py` (algebraic: `facet_summaries == GROUP BY sample_facets_v2`, `facet_cross_filter == conditional GROUP BY`, `facets.pid == map_lite.pid`, pid uniqueness, H3 sums). `make test` / `make all`. + +## Documentation / automation gaps (remaining) + +- ⚠️ **The deployed `202601` derived files are NOT reproducible** from any available wide. A rebuild yields **528,983** root-material rows (pre-#271); the deployed `sample_facets_v2` has **346,768** — so the live files came from a different/unrecorded Stage-4 process, *and* the data has since rolled (wide is now `202604`). Treat a fresh `build_frontend_derived.py` run as the new source of truth, not as a bit-for-bit reproduction of the deployed files. +- **Version skew:** the deployed derived files are `202601` while the wide they should derive from is `202604` (the popup reads `202604`). Rebuilding from `202604` resolves it (tracked in the pipeline epic). +- **No R2 upload automation** — file upload to bucket `isamples-ry` + `current/manifest.json` update are manual `wrangler`/dashboard steps. +- **No OC mirror script** — `check_oc_pqg_drift.py` detects GCS↔R2 drift but doesn't perform the mirror. +- **Stage-2 prod invocation** that produced `zenodo_narrow_2025-12-12` / `zenodo_wide_2026-01-09` from the Zenodo export is still unrecorded (dedupe options unknown). +- **`SERIALIZATIONS.md:80`** claims every file "can be rebuilt by a script" — now true for the Stage-4 files; still aspirational for Stage-2. +- **Sidecar spec** is in Obsidian only, not version-controlled with the code. + +## Key files +- `export_client/isamples_export_client/duckdb_utilities.py` — export schema (keywords narrowing @ L17) +- `pqg/pqg/sql_converter.py` — export→PQG engine; `pqg/docs/PQG_SPECIFICATION.md` — format spec +- `isamplesorg.github.io/scripts/enrich_wide_with_oc_thumbnails.py` — the sidecar-merge precedent +- `isamplesorg.github.io/scripts/build_vocab_labels.py` — the one scripted derived file +- `isamplesorg.github.io/scripts/check_oc_pqg_drift.py` — OC drift check +- `isamplesorg.github.io/workers/data-isamples-org/{src/index.js,wrangler.toml}` — Worker + R2 config +- `isamplesorg.github.io/SERIALIZATIONS.md` — format/schema reference (DAG companion to this file) diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..bde1ddd5 --- /dev/null +++ b/Makefile @@ -0,0 +1,46 @@ +# Frontend-derived parquet pipeline — reproducible, AI-free. +# +# make test # fast fixture tests (no network, no big data) — the CI gate +# make wide # download + checksum the canonical wide parquet +# make derived # build the derived files from $(WIDE) into $(OUTDIR) +# make validate # algebraic trust gate over the built files (non-zero exit on failure) +# make all # wide -> derived -> validate +# +# Override on the command line, e.g.: +# make all WIDE_URL=https://data.isamples.org/isamples_202604_wide.parquet TAG=isamples_202606 +# +# Requirements: python with `pip install -r scripts/requirements.txt`, plus +# network access on first run (DuckDB pulls the h3 community extension). + +PY ?= python +WIDE_URL ?= https://data.isamples.org/isamples_202604_wide.parquet +OUTDIR ?= build/derived +WIDE ?= $(OUTDIR)/wide.parquet +TAG ?= isamples_dev +BUILD := scripts/build_frontend_derived.py +VALIDATE := scripts/validate_frontend_derived.py + +.PHONY: help test wide derived validate all clean +help: + @grep -E '^# make' Makefile | sed 's/^# / /' + +# Fast, deterministic fixture tests — the gate a human (or CI) runs without any AI. +test: + $(PY) -m pytest tests/test_frontend_derived.py -q + +wide: $(WIDE) +$(WIDE): + @mkdir -p $(OUTDIR) + curl -fSL -o $(WIDE) "$(WIDE_URL)" + @echo "sha256: $$(shasum -a 256 $(WIDE) | cut -d' ' -f1) $(WIDE)" + +derived: $(WIDE) + $(PY) $(BUILD) --wide $(WIDE) --outdir $(OUTDIR) --tag $(TAG) --skip wide_h3 + +validate: + $(PY) $(VALIDATE) --dir $(OUTDIR) --tag $(TAG) + +all: wide derived validate + +clean: + rm -rf $(OUTDIR) diff --git a/SERIALIZATIONS.md b/SERIALIZATIONS.md index b4211495..b5a9be53 100644 --- a/SERIALIZATIONS.md +++ b/SERIALIZATIONS.md @@ -77,9 +77,12 @@ vocab_labels.parquet (58 KB, 537 SKOS concepts) └─► consumed by Search Explorer to render facet URIs as prefLabels ``` -Arrows indicate derivation, not containment. Every file in the left -column can be rebuilt from its parent by a script in -`isamples-python/` or `isamplesorg.github.io/scripts/`. +Arrows indicate derivation, not containment. The Stage-4 frontend-derived +files are rebuilt by `isamplesorg.github.io/scripts/build_frontend_derived.py` +(+ `build_vocab_labels.py`); the Stage-2 narrow/wide files are rebuilt by +`pqg/`. Note: the **currently deployed** `isamples_202601_*` files predate that +builder — a fresh build is NOT bit-for-bit identical to them (see +`DATA_PROVENANCE.md`, "deployed 202601 not reproducible"). ## 3. Catalog @@ -226,7 +229,7 @@ for the alias when you want "latest." ### 4.6 `isamples_202601_h3_summary_res{4,6,8}.parquet` - **Role**: Zoom-adaptive aggregates that back the Cesium progressive globe and the Python Explorer's "H3 tier" rendering mode. -- **Headline schema** (7 cols, identical across resolutions): `h3_cell` (BIGINT), `sample_count` (INT), `center_lat`, `center_lng` (DOUBLE), `dominant_source` (VARCHAR), `source_count` (INT), `resolution` (INT). +- **Headline schema** (7 cols, identical across resolutions): `h3_cell` (**UBIGINT** — H3 cells are unsigned 64-bit; a signed BIGINT would go negative for high-bit cells), `sample_count` (INT), `center_lat`, `center_lng` (DOUBLE, rounded 6 dp), `dominant_source` (VARCHAR; ties broken by source name ASC for determinism), `source_count` (INT), `resolution` (INT). - **Query pattern**: fetch the right resolution for the current zoom; no join needed. - **DuckDB**: ```sql @@ -247,8 +250,10 @@ for the alias when you want "latest." ### 4.8 `isamples_202601_sample_facets_v2.parquet` -- **Role**: Cross-dimension facet filtering — one row per sample, each facet column holds a single controlled-vocabulary URI (the leaf concept the sample is tagged with at that dimension). -- **Headline schema** (8 cols, all VARCHAR): `pid, source, material, context, object_type, label, description, place_name`. `material`/`context`/`object_type` are scalar URI strings, NOT arrays — the file's grain is one row per sample, so a sample tagged with multiple material URIs is represented by a single chosen URI (currently the first/leaf). For multi-material accuracy, JOIN back to `wide.p__has_material_category`. +> ⚠️ **Deployed-file caveat:** the live `isamples_202601_sample_facets_v2.parquet` still contains **346,768** bare-root "Material" rows — it predates the #271 selection rule below. The rule describes the **builder contract** for the next rebuild (verified to drop the root → 0), not the file currently served. + +- **Role**: Cross-dimension facet filtering — one row per sample, each facet column holds a single controlled-vocabulary URI. +- **Headline schema** (8 cols, all VARCHAR): `pid, source, material, context, object_type, label, description, place_name`. `material`/`context`/`object_type` are scalar URI strings, NOT arrays — one row per sample, so a sample tagged with multiple URIs is represented by a single chosen URI. **Selection rule:** `material` = the **first NON-ROOT** concept in the array (the broad root `.../material/1.0/material` is dropped — #265/#271); root-only samples → NULL material. This is **NOT** necessarily the leaf/most-specific concept (the arrays are not clean SKOS paths). `context`/`object_type` = the first array element (`[1]`). `place_name` is a VARCHAR cast of the wide's `VARCHAR[]` (note: `samples_map_lite` keeps `place_name` as `VARCHAR[]`). For multi-value accuracy, JOIN back to `wide.p__has_*_category`. - **Query pattern**: `WHERE material = ''` for exact match; `WHERE material ILIKE '%rock%'` to substring-match URI fragments. - **DuckDB**: ```sql @@ -272,7 +277,7 @@ for the alias when you want "latest." ### 4.10 `isamples_202601_facet_cross_filter.parquet` - **Role**: Cross-facet counts for the single-active-filter case (QUERY_SPEC §3.3 tier 2a). Avoids recomputing when one facet dimension is active. -- **Headline schema** (7 cols, 526 rows): `filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value, count`. Exactly one `filter_*` column is non-NULL per row. +- **Headline schema** (7 cols): `filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value, count`. Two row kinds: **baseline** rows have **all** `filter_*` NULL (these equal `facet_summaries`); **single-dimension** rows have **exactly one** `filter_*` non-NULL. Single-dimension rows include self-dimension counts (`facet_type == filter dim`), which the explorer ignores. (Both kinds are emitted by `build_frontend_derived.py` and asserted by `validate_frontend_derived.py`.) - **Query pattern**: lookup by the active filter to get counts for the remaining dimensions. - **DuckDB**: ```sql diff --git a/scripts/build_frontend_derived.py b/scripts/build_frontend_derived.py new file mode 100755 index 00000000..b4f10391 --- /dev/null +++ b/scripts/build_frontend_derived.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 +"""Build the explorer's frontend-derived parquet files from a wide PQG parquet. + +Deterministic, reproducible builder for the 6 ad-hoc Stage-4 derived files +(see DATA_PROVENANCE.md). Every run also writes a manifest (input + output +checksums, argv, git SHA, DuckDB + extension versions) so a build is +machine-identifiable and re-verifiable. + +INPUT CONTRACT (enforced/handled): + A wide PQG parquet with entity rows incl. MaterialSampleRecord + + IdentifiedConcept. The `geometry` column may be **WKB BLOB** or DuckDB + **GEOMETRY** — both are handled (detected at runtime). Concept references + live in `p__has_{material,context,sample_object}_category` row-id arrays. + +OUTPUTS (into --outdir, prefixed --tag): + - {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description, place_name(VARCHAR) + - {tag}_samples_map_lite.parquet pid, label, source, latitude, longitude, place_name(VARCHAR[]), result_time, h3_res8(UBIGINT), h3_res8_hex + - {tag}_h3_summary_res{4,6,8}.parquet h3_cell(UBIGINT), sample_count(INT), center_lat, center_lng, dominant_source, source_count(INT), resolution(INT) + - {tag}_facet_summaries.parquet facet_type, facet_value, scheme, count + - {tag}_facet_cross_filter.parquet filter_source/material/context/object_type, facet_type, facet_value, count + - {tag}_wide_h3.parquet wide + h3_res4/6/8 (large; built only on --only wide_h3) + - {tag}_manifest.json provenance + per-output rowcount/schema/sha256 + +MATERIAL SELECTION (issue #265/#271): the broad SKOS root +`.../material/1.0/material` ("Material") can appear at ANY position in the +concept array; the old `[1]` pick surfaced it as a bogus facet value. We pick +the FIRST NON-ROOT concept (by array order); samples tagged ONLY at the root +get NULL material (excluded from the facet). This is NOT leaf/most-specific +selection — see DATA_PROVENANCE.md. context/object_type use the first array +element ([1]); their root-dropping is deferred (tracked in the pipeline epic). + +Determinism: row order and all DISCRETE values are deterministic (ORDER BY on +every COPY; dominant_source ties broken by source name ASC; non-unique keys are +a hard error). Floating centroids (center_lat/lng) are rounded to 6 dp and are +display-only — not part of the reproducibility guarantee; pass --threads 1 for +bit-stable centroids across machines. + +Usage: + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag isamples_202606 + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag T --only sample_facets_v2,facet_summaries +""" +import argparse, hashlib, json, os, subprocess, sys, time +import duckdb + +MATERIAL_ROOT = "https://w3id.org/isample/vocabulary/material/1.0/material" +FACET_DIMS = ["source", "material", "context", "object_type"] +# the artifacts this script knows how to build (for --only/--skip validation) +ARTIFACTS = ["sample_facets_v2", "samples_map_lite", "h3_summaries", + "facet_summaries", "facet_cross_filter", "wide_h3"] + + +def log(msg, t0): + print(f"[{time.time()-t0:6.1f}s] {msg}", flush=True) + + +def sha256_file(path, _bufsize=1 << 20): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(_bufsize), b""): + h.update(chunk) + return h.hexdigest() + + +def geometry_expr(con, wide): + """Return a SQL expression yielding a GEOMETRY from `s.geometry`, whether the + column is stored as WKB BLOB or as DuckDB GEOMETRY. Fixes the silent + BLOB-only contract (202604 wide = BLOB; 202601/Zenodo wides = GEOMETRY).""" + rows = con.sql(f"DESCRIBE SELECT geometry FROM read_parquet('{wide}') LIMIT 0").fetchall() + coltype = rows[0][1].upper() if rows else "BLOB" + if coltype == "GEOMETRY": + return "s.geometry" + if coltype in ("BLOB", "WKB_BLOB", "VARBINARY"): + return "ST_GeomFromWKB(s.geometry)" + raise SystemExit(f"FATAL: unexpected geometry column type {coltype!r} in {wide}") + + +def build_base_tables(con, wide, t0): + geom = geometry_expr(con, wide) + con.execute(f""" + CREATE OR REPLACE TEMP TABLE ic AS + SELECT row_id, pid AS uri FROM read_parquet('{wide}') WHERE otype='IdentifiedConcept'; + + -- material: FIRST NON-ROOT concept per sample. Decorrelated (unnest+join+ + -- arg_min by array ordinality) — NOT a correlated subquery and NOT a MAP + -- cross-join (both of which blow up the planner on 20M rows). + CREATE OR REPLACE TEMP TABLE mat AS + WITH ex AS ( + SELECT s.pid AS pid, u.rid AS rid, u.ord AS ord + FROM read_parquet('{wide}') s, + UNNEST(s.p__has_material_category) WITH ORDINALITY AS u(rid, ord) + WHERE s.otype='MaterialSampleRecord' + ) + SELECT ex.pid, arg_min(ic.uri, ex.ord) AS material + FROM ex JOIN ic ON ic.row_id = ex.rid + WHERE ic.uri <> '{MATERIAL_ROOT}' + GROUP BY ex.pid; + + -- one row per MaterialSampleRecord; all concept resolution via JOINs (decorrelated). + CREATE OR REPLACE TEMP TABLE samp AS + SELECT + s.pid, + s.n AS source, + s.label, + s.description, + s.place_name, -- VARCHAR[] + s.result_time, + ROUND(ST_Y({geom}), 6) AS latitude, + ROUND(ST_X({geom}), 6) AS longitude, + mat.material AS material, + ctx.uri AS context, + obj.uri AS object_type + FROM read_parquet('{wide}') s + LEFT JOIN mat ON mat.pid = s.pid + LEFT JOIN ic AS ctx ON ctx.row_id = s.p__has_context_category[1] + LEFT JOIN ic AS obj ON obj.row_id = s.p__has_sample_object_type[1] + WHERE s.otype='MaterialSampleRecord'; + + CREATE OR REPLACE TEMP TABLE samp_geo AS + SELECT *, + h3_latlng_to_cell(latitude, longitude, 4) AS h3_res4, + h3_latlng_to_cell(latitude, longitude, 6) AS h3_res6, + h3_latlng_to_cell(latitude, longitude, 8) AS h3_res8 + FROM samp WHERE latitude IS NOT NULL AND longitude IS NOT NULL; + """) + n_samp = con.sql("SELECT COUNT(*) FROM samp").fetchone()[0] + n_geo = con.sql("SELECT COUNT(*) FROM samp_geo").fetchone()[0] + n_dup = con.sql("SELECT COUNT(*) FROM (SELECT pid FROM samp_geo GROUP BY pid HAVING COUNT(*)>1)").fetchone()[0] + n_icdup = con.sql("SELECT COUNT(*) FROM (SELECT row_id FROM ic GROUP BY row_id HAVING COUNT(*)>1)").fetchone()[0] + log(f"samp={n_samp:,} samp_geo={n_geo:,} duplicate_pids={n_dup:,} duplicate_concept_row_ids={n_icdup:,}", t0) + if n_dup or n_icdup: + # HARD fail (Codex): non-unique keys make the output grain wrong (inflated + # facet counts, ambiguous joins, non-total ORDER BY pid). Abort before writing. + raise SystemExit( + f"FATAL: non-unique keys — duplicate_pids={n_dup}, duplicate_concept_row_ids={n_icdup}. " + f"Output grain/joins would be wrong; refusing to write.") + + +def build_sample_facets_v2(con, out): + con.execute(f"""COPY ( + SELECT pid, source, material, context, object_type, label, description, + place_name::VARCHAR AS place_name + FROM samp_geo ORDER BY pid + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_samples_map_lite(con, out): + con.execute(f"""COPY ( + SELECT pid, label, source, latitude, longitude, place_name, result_time, + h3_res8::UBIGINT AS h3_res8, + h3_h3_to_string(h3_res8) AS h3_res8_hex + FROM samp_geo ORDER BY pid + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_h3_summary(con, out, res): + col = f"h3_res{res}" + # deterministic dominant_source: max sample count, ties broken by source name ASC. + con.execute(f"""COPY ( + WITH sc AS ( + SELECT {col} AS cell, source, COUNT(*) AS c + FROM samp_geo WHERE {col} IS NOT NULL GROUP BY {col}, source + ), + dom AS ( + SELECT cell, source AS dominant_source, + ROW_NUMBER() OVER (PARTITION BY cell ORDER BY c DESC, source ASC) AS rn + FROM sc + ), + agg AS ( + SELECT {col} AS cell, COUNT(*) AS sample_count, + ROUND(AVG(latitude), 6) AS center_lat, + ROUND(AVG(longitude), 6) AS center_lng, + COUNT(DISTINCT source) AS source_count + FROM samp_geo WHERE {col} IS NOT NULL GROUP BY {col} + ) + SELECT agg.cell::UBIGINT AS h3_cell, + agg.sample_count::INTEGER AS sample_count, + agg.center_lat, agg.center_lng, + dom.dominant_source, + agg.source_count::INTEGER AS source_count, + {res}::INTEGER AS resolution + FROM agg JOIN dom ON dom.cell = agg.cell AND dom.rn = 1 + ORDER BY h3_cell + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_facet_summaries(con, out): + union = " UNION ALL ".join( + f"SELECT '{d}' AS facet_type, {d} AS facet_value FROM samp_geo WHERE {d} IS NOT NULL" + for d in FACET_DIMS) + con.execute(f"""COPY ( + SELECT facet_type, facet_value, NULL::INTEGER AS scheme, COUNT(*) AS count + FROM ({union}) + GROUP BY facet_type, facet_value + ORDER BY facet_type, facet_value + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_facet_cross_filter(con, out): + # baseline (all filter_* NULL) + single-dimension filters. NOTE: the shape + # (incl. baseline rows + self-dimension rows) matches what the deployed + # explorer reads; see SERIALIZATIONS.md for the exact contract. Determinism + # via ORDER BY. + selects = [] + for fd in FACET_DIMS: + selects.append( + f"SELECT NULL::VARCHAR AS filter_source, NULL::VARCHAR AS filter_material, " + f"NULL::VARCHAR AS filter_context, NULL::VARCHAR AS filter_object_type, " + f"'{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " + f"FROM samp_geo WHERE {fd} IS NOT NULL GROUP BY {fd}") + for filt in FACET_DIMS: + for fd in FACET_DIMS: + cols = ", ".join( + (f"{filt} AS filter_{c}" if c == filt else f"NULL::VARCHAR AS filter_{c}") + for c in FACET_DIMS) + selects.append( + f"SELECT {cols}, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " + f"FROM samp_geo WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}") + con.execute(f"""COPY ( + SELECT filter_source, filter_material, filter_context, filter_object_type, + facet_type, facet_value, count + FROM ({' UNION ALL '.join(selects)}) + ORDER BY filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_wide_h3(con, wide, out): + geom = geometry_expr(con, wide).replace("s.geometry", "geometry") + con.execute(f"""COPY ( + SELECT *, + CASE WHEN geometry IS NOT NULL THEN h3_latlng_to_cell(ST_Y({geom}), ST_X({geom}), 4) END AS h3_res4, + CASE WHEN geometry IS NOT NULL THEN h3_latlng_to_cell(ST_Y({geom}), ST_X({geom}), 6) END AS h3_res6, + CASE WHEN geometry IS NOT NULL THEN h3_latlng_to_cell(ST_Y({geom}), ST_X({geom}), 8) END AS h3_res8 + FROM read_parquet('{wide}') ORDER BY pid + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def file_meta(con, path): + n = con.sql(f"SELECT COUNT(*) FROM read_parquet('{path}')").fetchone()[0] + schema = [(r[0], r[1]) for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{path}')").fetchall()] + return {"rows": n, "schema": schema, "bytes": os.path.getsize(path), "sha256": sha256_file(path)} + + +def git_sha(): + try: + return subprocess.check_output(["git", "rev-parse", "HEAD"], + cwd=os.path.dirname(os.path.abspath(__file__)), + stderr=subprocess.DEVNULL).decode().strip() + except Exception: + return None + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--wide", required=True) + ap.add_argument("--outdir", required=True) + ap.add_argument("--tag", required=True, help="output prefix, e.g. isamples_202606 (no stale default)") + ap.add_argument("--only", default="", help=f"comma list of: {','.join(ARTIFACTS)}") + ap.add_argument("--skip", default="", help="comma list of the same names to skip") + ap.add_argument("--no-manifest", action="store_true", help="skip writing {tag}_manifest.json") + ap.add_argument("--threads", type=int, default=0, + help="DuckDB thread count; set 1 for bit-stable floating centroids (slower)") + args = ap.parse_args() + + only = set(filter(None, args.only.split(","))) + skip = set(filter(None, args.skip.split(","))) + bad = (only | skip) - set(ARTIFACTS) + if bad: # Codex #3: fail loudly on typos instead of silently building nothing + sys.exit(f"FATAL: unknown --only/--skip name(s): {sorted(bad)}. Known: {ARTIFACTS}") + want = lambda name: (not only or name in only) and name not in skip + os.makedirs(args.outdir, exist_ok=True) + + t0 = time.time() + con = duckdb.connect() + if args.threads: + con.execute(f"PRAGMA threads={args.threads}") + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + log("building base sample tables…", t0) + build_base_tables(con, args.wide, t0) + + p = lambda name: os.path.join(args.outdir, f"{args.tag}_{name}.parquet") + produced = [] + def emit(name, fn): + if want(name): + fn(p(name)); produced.append(p(name)); log(f"{name} ✓", t0) + + emit("sample_facets_v2", lambda o: build_sample_facets_v2(con, o)) + emit("facet_summaries", lambda o: build_facet_summaries(con, o)) + emit("facet_cross_filter", lambda o: build_facet_cross_filter(con, o)) + emit("samples_map_lite", lambda o: build_samples_map_lite(con, o)) + if want("h3_summaries"): + for res in (4, 6, 8): + build_h3_summary(con, p(f"h3_summary_res{res}"), res); produced.append(p(f"h3_summary_res{res}")) + log("h3_summary_res{4,6,8} ✓", t0) + emit("wide_h3", lambda o: build_wide_h3(con, args.wide, o)) + + if not args.no_manifest: + log("hashing inputs/outputs for manifest…", t0) + exts = {r[0]: r[1] for r in con.sql( + "SELECT extension_name, extension_version FROM duckdb_extensions() WHERE installed").fetchall()} + manifest = { + "tag": args.tag, + "argv": sys.argv, + "git_sha": git_sha(), + "duckdb_version": duckdb.__version__, + "extensions": exts, + "input": {"path": args.wide, + "bytes": (os.path.getsize(args.wide) if os.path.exists(args.wide) else None), + "sha256": (sha256_file(args.wide) if os.path.exists(args.wide) else "remote/unhashed")}, + "outputs": {os.path.basename(f): file_meta(con, f) for f in produced}, + } + mpath = p("manifest").replace(".parquet", ".json") + with open(mpath, "w") as fh: + json.dump(manifest, fh, indent=2) + log(f"manifest → {mpath}", t0) + + log("done", t0) + + +if __name__ == "__main__": + main() diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 67746e46..0015d6da 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -6,3 +6,10 @@ rdflib>=6.3 pandas>=2.0 pyarrow>=14 + +# build_frontend_derived.py + validate_frontend_derived.py + tests/ +# DuckDB pinned for reproducible builds; the h3 (community) and spatial +# extensions are version-bound to this DuckDB release and installed at runtime +# (the build manifest records the exact resolved extension hashes). +duckdb==1.4.4 +pytest>=8.0 diff --git a/scripts/validate_frontend_derived.py b/scripts/validate_frontend_derived.py new file mode 100755 index 00000000..4cd8ad74 --- /dev/null +++ b/scripts/validate_frontend_derived.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +"""Algebraic, AI-free trust gate for the explorer's derived parquet. + +This is NOT a code review and NOT a spot check. It recomputes the derived-file +*algebra* from `sample_facets_v2` / `samples_map_lite` and asserts the other +files agree. A rebuild that is wrong (collapsed materials, stale summaries, +drifted cross-filter, duplicate pids, broken H3) FAILS here. Exits non-zero on +any failure so it can gate CI / a publish. + +Usage: + python scripts/validate_frontend_derived.py --dir /tmp/rebuild-202606 --tag isamples_202606 + # or point at live/remote files: + python scripts/validate_frontend_derived.py \ + --facets URL --map-lite URL --summaries URL --cross-filter URL \ + --h3 URL4 URL6 URL8 +""" +import argparse, hashlib, json, os, sys +import duckdb + + +def sha256_file(path, _b=1 << 20): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(_b), b""): + h.update(chunk) + return h.hexdigest() + +MATERIAL_ROOT = "https://w3id.org/isample/vocabulary/material/1.0/material" +PID_K = "ark:/28722/k2p55x96j" # #260 sentinel: must not flip under #271 selection +PID_K_EXPECTED = "https://w3id.org/isample/vocabulary/material/1.0/anthropogenicmetal" + +EXPECTED_SCHEMA = { + "facets": [("pid", "VARCHAR"), ("source", "VARCHAR"), ("material", "VARCHAR"), + ("context", "VARCHAR"), ("object_type", "VARCHAR"), ("label", "VARCHAR"), + ("description", "VARCHAR"), ("place_name", "VARCHAR")], +} + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--dir"); ap.add_argument("--tag") + ap.add_argument("--facets"); ap.add_argument("--map-lite") + ap.add_argument("--summaries"); ap.add_argument("--cross-filter") + ap.add_argument("--h3", nargs=3, metavar=("R4", "R6", "R8")) + ap.add_argument("--wide", help="source wide parquet — enables the SEMANTIC gate " + "(re-derive and diff the written files against a fresh build)") + ap.add_argument("--min-rows", type=int, default=1_000_000, + help="floor for the non-empty sanity check (use 1 for fixtures)") + a = ap.parse_args() + + def f(name, attr): + v = getattr(a, attr) + if v: + return v + if a.dir and a.tag: + return os.path.join(a.dir, f"{a.tag}_{name}.parquet") + sys.exit(f"FATAL: provide --{attr} or both --dir and --tag") + + facets = f("sample_facets_v2", "facets") + maplite = f("samples_map_lite", "map_lite") + summaries = f("facet_summaries", "summaries") + crossf = f("facet_cross_filter", "cross_filter") + h3 = a.h3 or ([os.path.join(a.dir, f"{a.tag}_h3_summary_res{r}.parquet") for r in (4, 6, 8)] + if a.dir and a.tag else None) + + con = duckdb.connect() + R = [] # (name, passed, detail) + info = [] + def check(name, passed, detail=""): + R.append((name, bool(passed), detail)) + def scalar(sql): + return con.sql(sql).fetchone()[0] + + F = f"read_parquet('{facets}')" + ML = f"read_parquet('{maplite}')" + S = f"read_parquet('{summaries}')" + CF = f"read_parquet('{crossf}')" + + # --- 1. material root absent (the #265/#271 contract) --- + check("material root absent", scalar(f"SELECT COUNT(*) FROM {F} WHERE material='{MATERIAL_ROOT}'") == 0, + "facets rows with bare root material (want 0)") + + # --- 2. #260 sentinel preserved (skip when the pid isn't in this dataset, e.g. fixtures) --- + row = con.sql(f"SELECT material FROM {F} WHERE pid='{PID_K}'").fetchone() + if row is None: + info.append(f"sentinel {PID_K} not present (N/A for this dataset)") + else: + check(f"sentinel {PID_K} preserved", row[0] == PID_K_EXPECTED, f"got {row}") + + # --- 3. PID uniqueness (browser relies on one row per pid) --- + check("facets pid unique", scalar(f"SELECT COUNT(*) FROM (SELECT pid FROM {F} GROUP BY pid HAVING COUNT(*)>1)") == 0, + "duplicate pids in facets") + check("map_lite pid unique", scalar(f"SELECT COUNT(*) FROM (SELECT pid FROM {ML} GROUP BY pid HAVING COUNT(*)>1)") == 0, + "duplicate pids in map_lite") + + # --- 4. facets.pid SET == map_lite.pid SET --- + diff = scalar(f"""SELECT + (SELECT COUNT(*) FROM (SELECT pid FROM {F} EXCEPT SELECT pid FROM {ML})) + + (SELECT COUNT(*) FROM (SELECT pid FROM {ML} EXCEPT SELECT pid FROM {F}))""") + check("facets.pid == map_lite.pid", diff == 0, f"{diff} pids differ between facets and map_lite") + + # --- 5. ALGEBRA: facet_summaries == GROUP BY facets (per dim) --- + recompute = " UNION ALL ".join( + f"SELECT '{d}' AS facet_type, {d} AS facet_value, COUNT(*) AS c FROM {F} WHERE {d} IS NOT NULL GROUP BY {d}" + for d in ("source", "material", "context", "object_type")) + mismatch = scalar(f""" + WITH recomputed AS ({recompute}), + summ AS (SELECT facet_type, facet_value, count AS c FROM {S}) + SELECT COUNT(*) FROM ( + SELECT * FROM recomputed EXCEPT SELECT * FROM summ + UNION ALL + SELECT * FROM summ EXCEPT SELECT * FROM recomputed)""") + check("facet_summaries == GROUP BY facets", mismatch == 0, f"{mismatch} (facet_type,value,count) rows disagree") + check("facet_summaries.scheme all NULL", scalar(f"SELECT COUNT(*) FROM {S} WHERE scheme IS NOT NULL") == 0, + "non-NULL scheme rows (contract: scheme is NULL)") + + # --- 6. ALGEBRA: facet_cross_filter single-dim rows == conditional GROUP BY facets --- + dims = ("source", "material", "context", "object_type") + parts = [] + for filt in dims: + for fd in dims: + parts.append( + f"SELECT '{filt}' AS fcol, {filt} AS fval, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS c " + f"FROM {F} WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}") + recompute_cf = " UNION ALL ".join(parts) + # normalize cross_filter single-dim rows into (fcol, fval, facet_type, facet_value, count) + cf_single = f""" + SELECT 'source' AS fcol, filter_source AS fval, facet_type, facet_value, count FROM {CF} WHERE filter_source IS NOT NULL + UNION ALL SELECT 'material', filter_material, facet_type, facet_value, count FROM {CF} WHERE filter_material IS NOT NULL + UNION ALL SELECT 'context', filter_context, facet_type, facet_value, count FROM {CF} WHERE filter_context IS NOT NULL + UNION ALL SELECT 'object_type', filter_object_type, facet_type, facet_value, count FROM {CF} WHERE filter_object_type IS NOT NULL + """ + cf_mismatch = scalar(f""" + WITH rc AS ({recompute_cf}), cf AS ({cf_single}) + SELECT COUNT(*) FROM ( + SELECT * FROM rc EXCEPT SELECT * FROM cf + UNION ALL + SELECT * FROM cf EXCEPT SELECT * FROM rc)""") + check("cross_filter == conditional GROUP BY facets", cf_mismatch == 0, f"{cf_mismatch} single-dim rows disagree") + + # --- 7. cross_filter baseline (all filter_* NULL) == facet_summaries --- + base_mismatch = scalar(f""" + WITH base AS (SELECT facet_type, facet_value, count FROM {CF} + WHERE filter_source IS NULL AND filter_material IS NULL + AND filter_context IS NULL AND filter_object_type IS NULL), + summ AS (SELECT facet_type, facet_value, count FROM {S}) + SELECT COUNT(*) FROM ( + SELECT * FROM base EXCEPT SELECT * FROM summ + UNION ALL SELECT * FROM summ EXCEPT SELECT * FROM base)""") + check("cross_filter baseline == summaries", base_mismatch == 0, f"{base_mismatch} baseline rows disagree") + + # --- 8. H3: per-resolution sample_count sums to located-sample total --- + if h3: + ml_n = scalar(f"SELECT COUNT(*) FROM {ML}") + for res, hp in zip((4, 6, 8), h3): + tot = scalar(f"SELECT SUM(sample_count) FROM read_parquet('{hp}')") + check(f"h3 res{res} counts sum to map_lite", tot == ml_n, f"sum={tot} vs map_lite={ml_n}") + + # --- 9. schema/types --- + sch = [(r[0], r[1]) for r in con.sql(f"DESCRIBE SELECT * FROM {F}").fetchall()] + check("facets schema matches contract", sch == EXPECTED_SCHEMA["facets"], f"got {sch}") + + # --- 10. sanity floor --- + total, mat = con.sql(f"SELECT COUNT(*), COUNT(material) FROM {F}").fetchone() + check("facets non-empty", total >= a.min_rows, f"{total:,} rows (min {a.min_rows:,})") + + # --- 11. SEMANTIC gate vs the source wide (the REAL trust gate) --- + # Internal-consistency checks (1-10) pass even on a wrecked-but-self-consistent + # rebuild (proven). Re-derive from the wide with the SAME builder logic and + # assert the WRITTEN files match it. Catches corrupted material/coords/H3, + # stale files, and wrong-version artifacts. + if a.wide: + sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + import build_frontend_derived as B + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + B.build_base_tables(con, a.wide, 0.0) # builds samp_geo; hard-fails on dup keys + + def except_diff(asql, bsql): + return scalar(f"SELECT (SELECT COUNT(*) FROM (({asql}) EXCEPT ({bsql}))) " + f"+ (SELECT COUNT(*) FROM (({bsql}) EXCEPT ({asql})))") + + ref_facets = ("SELECT pid, source, material, context, object_type, label, description, " + "place_name::VARCHAR AS place_name FROM samp_geo") + file_facets = f"SELECT pid, source, material, context, object_type, label, description, place_name FROM {F}" + check("facets == fresh build from --wide", except_diff(ref_facets, file_facets) == 0, + "facets rows differ from a re-derivation off the wide (corruption/stale/wrong-version)") + + ref_ml = ("SELECT pid, label, source, latitude, longitude, result_time, " + "h3_res8::UBIGINT AS h3_res8, h3_h3_to_string(h3_res8) AS h3_res8_hex, " + "place_name::VARCHAR AS pn FROM samp_geo") + file_ml = (f"SELECT pid, label, source, latitude, longitude, result_time, h3_res8, h3_res8_hex, " + f"place_name::VARCHAR AS pn FROM {ML}") + check("map_lite == fresh build from --wide", except_diff(ref_ml, file_ml) == 0, + "map_lite coords/h3/place_name differ from a re-derivation off the wide") + + if h3: + for res, hp in zip((4, 6, 8), h3): + ref_h3 = (f"WITH sc AS (SELECT h3_res{res} AS cell, source, COUNT(*) c FROM samp_geo " + f"WHERE h3_res{res} IS NOT NULL GROUP BY h3_res{res}, source), " + f"dom AS (SELECT cell, source AS ds, ROW_NUMBER() OVER (PARTITION BY cell ORDER BY c DESC, source ASC) rn FROM sc), " + f"agg AS (SELECT h3_res{res} AS cell, COUNT(*) sc2, COUNT(DISTINCT source) srcc, " + f"ROUND(AVG(latitude),6) clat, ROUND(AVG(longitude),6) clng FROM samp_geo " + f"WHERE h3_res{res} IS NOT NULL GROUP BY h3_res{res}) " + f"SELECT agg.cell::UBIGINT h3_cell, agg.sc2::INTEGER sample_count, agg.clat center_lat, " + f"agg.clng center_lng, dom.ds AS dominant_source, agg.srcc::INTEGER source_count, " + f"{res}::INTEGER resolution FROM agg JOIN dom ON dom.cell=agg.cell AND dom.rn=1") + FH = f"read_parquet('{hp}')" + # discrete cols exact (now incl. resolution) + disc_ref = f"SELECT h3_cell, sample_count, dominant_source, source_count, resolution FROM ({ref_h3})" + disc_file = f"SELECT h3_cell, sample_count, dominant_source, source_count, resolution FROM {FH}" + check(f"h3 res{res} discrete == fresh build", except_diff(disc_ref, disc_file) == 0, + "h3 cells/counts/dominant_source/resolution differ from re-derivation off the wide") + # centers: tolerant (float/thread last-ULP jitter ok; gross corruption like 0,0 caught) + cdiff = scalar(f"SELECT COALESCE(MAX(GREATEST(ABS(f.center_lat-r.center_lat), " + f"ABS(f.center_lng-r.center_lng))), 0) FROM {FH} f JOIN ({ref_h3}) r ON f.h3_cell=r.h3_cell") + # tolerance 1e-5 (~1 m): absorbs the ~1e-6 round/thread jitter, catches + # any meaningful shift (an adversary's 8e-5/~9 m shift previously slipped + # through the old 1e-4). Residual undetected error is bounded at ~1 m on + # display-only cluster centroids. + check(f"h3 res{res} centers within 1e-5 (~1m)", cdiff <= 1e-5, f"max center delta {cdiff}") + + # --- 12. manifest integrity (when a {tag}_manifest.json is present) --- + # The build emits a provenance manifest (per-output + input sha256). Verify the + # written files match it, so the manifest is a guarded attestation, not decoration. + # NOTE: the manifest is self-attesting (not signed) — this catches accidental + # corruption, stale files, and tampering that didn't also rewrite the manifest; + # it does NOT defend against an attacker who rewrites file+manifest consistently. + if a.dir and a.tag: + man_path = os.path.join(a.dir, f"{a.tag}_manifest.json") + if os.path.exists(man_path): + try: + man = json.load(open(man_path)) + except Exception as e: + man = None + check("manifest parses", False, f"unreadable: {e}") + if man: + outs = man.get("outputs", {}) + check("manifest has outputs", bool(outs), "empty manifest.outputs") + bad = [] + for fname, m in outs.items(): + fp = os.path.join(a.dir, fname) + if not os.path.exists(fp): + bad.append(f"{fname}:missing") + elif sha256_file(fp) != m.get("sha256"): + bad.append(f"{fname}:sha256") + check("manifest sha256 matches output files", not bad, f"mismatches: {bad}") + if a.wide: + msha = man.get("input", {}).get("sha256") + if msha and msha != "remote/unhashed": + check("manifest input sha256 matches --wide", sha256_file(a.wide) == msha, + "wide sha256 != manifest.input.sha256") + else: + info.append(f"no {a.tag}_manifest.json (manifest verification skipped)") + + # --- informational (not failing): context/object_type root presence --- + for dim, root in [("context", "https://w3id.org/isample/vocabulary/sampledfeature/1.0/anysampledfeature"), + ("object_type", "https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/materialsample")]: + n = scalar(f"SELECT COUNT(*) FROM {F} WHERE {dim}='{root}'") + info.append(f"{dim} root-concept rows: {n:,} (informational; root-dropping deferred)") + + print(f"\n{'CHECK':<44} {'RESULT':<6} DETAIL\n" + "-" * 90) + ok = True + for name, passed, detail in R: + ok = ok and passed + print(f"{name:<44} {'PASS' if passed else 'FAIL':<6} {detail}") + print("-" * 90) + for line in info: + print(" info:", line) + print("\n" + ("ALL CHECKS PASS" if ok else "FAILURES PRESENT")) + sys.exit(0 if ok else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_frontend_derived.py b/tests/test_frontend_derived.py new file mode 100644 index 00000000..b97c1af1 --- /dev/null +++ b/tests/test_frontend_derived.py @@ -0,0 +1,299 @@ +"""Fast, AI-free fixture tests for the derived-parquet pipeline. + +Builds tiny synthetic `wide` parquet files (both WKB-BLOB and DuckDB-GEOMETRY +geometry encodings), runs the real builder + validator against them, and asserts +the contract — especially the cases that bit us in production: + - geometry stored as BLOB *or* GEOMETRY (the silent BLOB-only contract bug) + - material = first NON-ROOT concept; root-only -> NULL (#265/#271) + - missing concept row-id / NULL array -> NULL (no crash) + - place_name serialization; pid uniqueness; located-only scoping + - CLI fails loudly on unknown --only + +Run: pytest tests/test_frontend_derived.py -q (needs: duckdb, h3, spatial) +""" +import os, subprocess, sys +import duckdb +import pytest + +HERE = os.path.dirname(os.path.abspath(__file__)) +REPO = os.path.dirname(HERE) +BUILD = os.path.join(REPO, "scripts", "build_frontend_derived.py") +VALIDATE = os.path.join(REPO, "scripts", "validate_frontend_derived.py") + +MAT = "https://w3id.org/isample/vocabulary/material/1.0/" +ROOT = MAT + "material" + +# concept row_id -> uri +CONCEPTS = [ + (1, ROOT), # material root (must never be selected) + (2, MAT + "mineral"), + (3, MAT + "rock"), + (4, MAT + "anthropogenicmetal"), + (10, "https://w3id.org/isample/vocabulary/sampledfeature/1.0/earthinterior"), + (20, "https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/othersolidobject"), +] + +# (pid, material_array, expected_material_tail) — context=[10], object_type=[20], valid geometry +SAMPLES = [ + ("m-root-first", [1, 2, 3], "rock"), # root first -> first NON-root in order = mineral? -> see note + ("m-real-first", [4, 1], "anthropogenicmetal"), # real first preserved + ("m-root-only", [1], None), # root only -> NULL + ("m-null-array", None, None), # no material -> NULL + ("m-missing-id", [999], None), # dangling row-id -> NULL +] +# NOTE on m-root-first [1,2,3] = [material, mineral, rock]: builder takes the +# FIRST non-root by array order -> 'mineral'. We assert exactly that below. +EXPECTED = { + "m-root-first": MAT + "mineral", + "m-real-first": MAT + "anthropogenicmetal", + "m-root-only": None, + "m-null-array": None, + "m-missing-id": None, +} + + +def _arr(xs): + return "NULL::BIGINT[]" if xs is None else "[" + ",".join(str(x) for x in xs) + "]::BIGINT[]" + + +def build_fixture_wide(path, geom_mode): + """Write a tiny wide parquet. geom_mode in {'blob','geometry'}.""" + con = duckdb.connect() + con.execute("INSTALL spatial; LOAD spatial;") + geom = (lambda lng, lat: f"ST_AsWKB(ST_Point({lng},{lat}))") if geom_mode == "blob" \ + else (lambda lng, lat: f"ST_Point({lng},{lat})") + + ic_rows = " UNION ALL ".join( + f"SELECT 'IdentifiedConcept' AS otype, '{uri}' AS pid, {rid}::BIGINT AS row_id, NULL::VARCHAR AS n, " + f"NULL::VARCHAR AS label, NULL::VARCHAR AS description, NULL::VARCHAR[] AS place_name, " + f"NULL::TIMESTAMP AS result_time, NULL AS geometry, " + f"NULL::BIGINT[] AS p__has_material_category, NULL::BIGINT[] AS p__has_context_category, " + f"NULL::BIGINT[] AS p__has_sample_object_type" + for rid, uri in CONCEPTS) + + msr = [] + for i, (pid, marr, _) in enumerate(SAMPLES): + lng, lat = 10.0 + i, 40.0 + i + msr.append( + f"SELECT 'MaterialSampleRecord' AS otype, '{pid}' AS pid, NULL::BIGINT AS row_id, 'TEST' AS n, " + f"'label {pid}' AS label, 'desc {pid}' AS description, ['plc-{pid}','x''q']::VARCHAR[] AS place_name, " + f"NULL::TIMESTAMP AS result_time, {geom(lng, lat)} AS geometry, " + f"{_arr(marr)} AS p__has_material_category, [10]::BIGINT[] AS p__has_context_category, " + f"[20]::BIGINT[] AS p__has_sample_object_type") + # one NULL-geometry sample -> must be EXCLUDED from located outputs + msr.append( + "SELECT 'MaterialSampleRecord' AS otype, 'm-nogeo' AS pid, NULL::BIGINT AS row_id, 'TEST' AS n, " + "'l' AS label, 'd' AS description, NULL::VARCHAR[] AS place_name, NULL::TIMESTAMP AS result_time, " + "NULL AS geometry, [4]::BIGINT[] AS p__has_material_category, [10]::BIGINT[] AS p__has_context_category, " + "[20]::BIGINT[] AS p__has_sample_object_type") + + con.execute(f"COPY ({ic_rows} UNION ALL {' UNION ALL '.join(msr)}) " + f"TO '{path}' (FORMAT PARQUET)") + con.close() + + +def run_builder(wide, outdir, tag, extra=None): + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", outdir, "--tag", tag, + "--skip", "wide_h3", "--no-manifest"] + (extra or []) + return subprocess.run(cmd, capture_output=True, text=True) + + +@pytest.mark.parametrize("geom_mode", ["blob", "geometry"]) +def test_material_selection_and_geometry(tmp_path, geom_mode): + wide = str(tmp_path / f"wide_{geom_mode}.parquet") + build_fixture_wide(wide, geom_mode) + + # confirm the fixture actually stored the geometry type we intend to test + con = duckdb.connect(); con.execute("INSTALL spatial; LOAD spatial;") + gtype = con.sql(f"DESCRIBE SELECT geometry FROM read_parquet('{wide}')").fetchall()[0][1].upper() + + r = run_builder(wide, str(tmp_path), "t") + assert r.returncode == 0, f"builder failed ({gtype}):\n{r.stdout}\n{r.stderr}" + + facets = str(tmp_path / "t_sample_facets_v2.parquet") + rows = dict(con.sql(f"SELECT pid, material FROM read_parquet('{facets}')").fetchall()) + + # material selection contract + for pid, expected in EXPECTED.items(): + assert rows.get(pid) == expected, f"[{gtype}] {pid}: got {rows.get(pid)!r}, want {expected!r}" + # NULL-geometry sample excluded from located file + assert "m-nogeo" not in rows, f"[{gtype}] NULL-geometry sample leaked into facets" + # geometry decoded to correct coords in map_lite (m-root-first @ lng=10,lat=40) + ml = str(tmp_path / "t_samples_map_lite.parquet") + lat, lng = con.sql(f"SELECT latitude, longitude FROM read_parquet('{ml}') WHERE pid='m-root-first'").fetchone() + assert abs(lat - 40.0) < 1e-6 and abs(lng - 10.0) < 1e-6, f"[{gtype}] bad coords: {lat},{lng}" + + +def test_no_root_and_pid_uniqueness(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert run_builder(wide, str(tmp_path), "t").returncode == 0 + con = duckdb.connect() + facets = f"read_parquet('{tmp_path / 't_sample_facets_v2.parquet'}')" + assert con.sql(f"SELECT COUNT(*) FROM {facets} WHERE material='{ROOT}'").fetchone()[0] == 0 + dups = con.sql(f"SELECT COUNT(*) FROM (SELECT pid FROM {facets} GROUP BY pid HAVING COUNT(*)>1)").fetchone()[0] + assert dups == 0 + + +def test_place_name_serialized_and_quotes(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert run_builder(wide, str(tmp_path), "t").returncode == 0 + con = duckdb.connect() + pn = con.sql(f"SELECT place_name FROM read_parquet('{tmp_path / 't_sample_facets_v2.parquet'}') " + f"WHERE pid='m-real-first'").fetchone()[0] + assert isinstance(pn, str) and "plc-m-real-first" in pn # VARCHAR, not array, embedded quote survived + + +def test_cli_rejects_unknown_only(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + r = run_builder(wide, str(tmp_path), "t", extra=["--only", "bogus_name"]) + assert r.returncode != 0 and "unknown" in (r.stdout + r.stderr).lower() + + +def test_algebraic_validator_passes_on_fixture(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + # full set incl. h3 so the validator's h3 checks run + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", + "--skip", "wide_h3", "--no-manifest"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + # --min-rows 1 for the tiny fixture; sentinel auto-skips when its pid is absent; + # --wide exercises the SEMANTIC gate (must still pass on a clean rebuild). + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert v.returncode == 0, f"validator failed on fixture:\n{v.stdout}\n{v.stderr}" + + +def _build(tmp_path, wide, tag="t", extra=None): + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", tag, + "--skip", "wide_h3", "--no-manifest"] + (extra or []) + return subprocess.run(cmd, capture_output=True, text=True) + + +def test_semantic_gate_catches_corruption_that_internal_checks_miss(tmp_path): + """The whole point (Codex's attack): corrupt map_lite coords so internal + consistency still holds, but the --wide semantic gate must FAIL.""" + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + ml = str(tmp_path / "t_samples_map_lite.parquet") + + # corrupt: zero out every latitude (passes pid-set/uniqueness/h3-sum checks) + con = duckdb.connect() + tmp_ml = ml + ".tmp" + con.execute(f"""COPY (SELECT pid, label, source, 0.0::DOUBLE AS latitude, longitude, + place_name, result_time, h3_res8, h3_res8_hex FROM read_parquet('{ml}')) + TO '{tmp_ml}' (FORMAT PARQUET)""") + con.close(); os.replace(tmp_ml, ml) + + # internal-only validator: still PASSES (the hole Codex exploited) + internal = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert internal.returncode == 0, "expected internal-only checks to miss coord corruption" + + # semantic gate (--wide): must now FAIL + semantic = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert semantic.returncode != 0, f"semantic gate failed to catch coord corruption:\n{semantic.stdout}" + assert "map_lite == fresh build" in semantic.stdout + + +def test_duplicate_pid_hard_fails(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + con = duckdb.connect(); con.execute("INSTALL spatial; LOAD spatial;") + # append a duplicate of an existing located pid + dup = str(tmp_path / "wide_dup.parquet") + con.execute(f"""COPY ( + SELECT * FROM read_parquet('{wide}') + UNION ALL + SELECT * FROM read_parquet('{wide}') WHERE pid='m-real-first' + ) TO '{dup}' (FORMAT PARQUET)""") + con.close() + r = _build(tmp_path, dup) + assert r.returncode != 0 and "non-unique" in (r.stdout + r.stderr).lower(), \ + f"builder should hard-fail on duplicate pids:\n{r.stdout}\n{r.stderr}" + + +def test_manifest_emitted(tmp_path): + import json + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", "--skip", "wide_h3"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + man = tmp_path / "t_manifest.json" + assert man.exists() + m = json.loads(man.read_text()) + assert m["input"]["sha256"] and m["duckdb_version"] and m["outputs"] + assert any("sample_facets_v2" in k for k in m["outputs"]) + assert all("sha256" in v and "rows" in v for v in m["outputs"].values()) + + +def test_wide_h3_cells_match_map_lite(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 # builds map_lite (skips wide_h3) + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", + "--only", "wide_h3", "--no-manifest"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + con = duckdb.connect() + wh3 = f"read_parquet('{tmp_path / 't_wide_h3.parquet'}')" + ml = f"read_parquet('{tmp_path / 't_samples_map_lite.parquet'}')" + cols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM {wh3}").fetchall()] + assert {"h3_res4", "h3_res6", "h3_res8"} <= set(cols) + # CORRECTNESS: wide_h3 cells must agree with map_lite's for the same located pids + bad = con.sql(f"SELECT COUNT(*) FROM {wh3} w JOIN {ml} m ON w.pid=m.pid " + f"WHERE w.h3_res8 IS DISTINCT FROM m.h3_res8").fetchone()[0] + assert bad == 0, f"{bad} wide_h3 rows have h3_res8 disagreeing with map_lite" + + +def test_semantic_gate_catches_h3_center_and_resolution_corruption(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + h3f = str(tmp_path / "t_h3_summary_res4.parquet") + con = duckdb.connect() + tmp_h3 = h3f + ".tmp" + con.execute(f"""COPY (SELECT h3_cell, sample_count, 0.0::DOUBLE AS center_lat, center_lng, + dominant_source, source_count, 999::INTEGER AS resolution FROM read_parquet('{h3f}')) + TO '{tmp_h3}' (FORMAT PARQUET)"""); con.close(); os.replace(tmp_h3, h3f) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert v.returncode != 0 and "h3 res4" in v.stdout, f"gate missed h3 center/resolution corruption:\n{v.stdout}" + + +def test_h3_center_micro_shift_caught(tmp_path): + """Adversary shifted every H3 centroid ~9m (8e-5 deg) and passed the old 1e-4 + tolerance. The tightened 1e-5 gate must now catch it.""" + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + h3f = str(tmp_path / "t_h3_summary_res4.parquet") + con = duckdb.connect(); tmp_h3 = h3f + ".tmp" + con.execute(f"""COPY (SELECT h3_cell, sample_count, ROUND(center_lat+8e-5,6) AS center_lat, + ROUND(center_lng+8e-5,6) AS center_lng, dominant_source, source_count, resolution + FROM read_parquet('{h3f}')) TO '{tmp_h3}' (FORMAT PARQUET)"""); con.close(); os.replace(tmp_h3, h3f) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert v.returncode != 0 and "centers within" in v.stdout, f"gate missed ~9m centroid shift:\n{v.stdout}" + + +def test_manifest_tamper_caught(tmp_path): + """Adversary corrupted manifest.json sha256s and the validator ignored it. + Manifest integrity must now be a gated check.""" + import json + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", "--skip", "wide_h3"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + man = tmp_path / "t_manifest.json" + m = json.loads(man.read_text()) + for k in m["outputs"]: + m["outputs"][k]["sha256"] = "deadbeef" * 8 + man.write_text(json.dumps(m)) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert v.returncode != 0 and "manifest sha256" in v.stdout, f"gate missed manifest tamper:\n{v.stdout}" + + +def test_scheme_corruption_caught(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + sf = str(tmp_path / "t_facet_summaries.parquet") + con = duckdb.connect(); tmp_s = sf + ".tmp" + con.execute(f"""COPY (SELECT facet_type, facet_value, 7::INTEGER AS scheme, count + FROM read_parquet('{sf}')) TO '{tmp_s}' (FORMAT PARQUET)"""); con.close(); os.replace(tmp_s, sf) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert v.returncode != 0 and "scheme" in v.stdout, f"gate missed scheme corruption:\n{v.stdout}"