From 87d2bcbcc9b6076343a459e48509c42c8e232ff7 Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Wed, 3 Jun 2026 07:25:46 -0700 Subject: [PATCH 1/5] docs+scripts: data provenance map + build script for the 6 unscripted derived parquet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (a) DATA_PROVENANCE.md — end-to-end build chain (export → base PQG → sidecar merge → frontend derived → R2/Worker), per-stage script/command + the key constraint (the iSamples export is frozen — Central API offline since Aug 2025; new per-source data must come via the pid sidecar merge, not re-export). Folds the sidecar pattern (previously only in the Obsidian vault) into the repo. (c) scripts/build_frontend_derived.py — reproduces the 6 derived files that had no checked-in build (only ad-hoc notebook SQL): sample_facets_v2, samples_map_lite, wide_h3, h3_summary_res{4,6,8}, facet_summaries, facet_cross_filter — from one `wide` input (DuckDB + h3 + spatial). Has --validate-against to diff schema+counts vs published. Validated vs the published isamples_202601 files (built from 202604 wide): EXACT reproduction of sample_facets_v2 (5,980,282), samples_map_lite, and h3_summary_res4/6/8; all schemas match. facet_summaries (+3) and facet_cross_filter (+86) are schema-correct, with small deltas from the 202604-vs-202601 version gap + the original cross-filter pruning self-pairs (this build is an exhaustive superset) — can be reconciled if exact parity is needed. Co-Authored-By: Claude Opus 4.8 (1M context) --- DATA_PROVENANCE.md | 74 +++++++++++ scripts/build_frontend_derived.py | 199 ++++++++++++++++++++++++++++++ 2 files changed, 273 insertions(+) create mode 100644 DATA_PROVENANCE.md create mode 100644 scripts/build_frontend_derived.py diff --git a/DATA_PROVENANCE.md b/DATA_PROVENANCE.md new file mode 100644 index 00000000..20e7ab67 --- /dev/null +++ b/DATA_PROVENANCE.md @@ -0,0 +1,74 @@ +# iSamples Explorer — Data Provenance + +How every parquet file the explorer uses is generated, from root to publish. +*Reviewed 2026-06-02 (CC, via codebase audit). Complements `SERIALIZATIONS.md` (format/schema reference); this file is the end-to-end build chain + the automation gaps.* + +> **Load-bearing constraint:** the **root export cannot be regenerated.** It was produced from the iSamples Central Solr API (`central.isample.xyz`), **offline since Aug 2025**. The Zenodo-archived export is a **frozen root**. Any *new* data (e.g. concept URIs, thumbnails) therefore must come from a **per-source supplementary file merged into the base by `pid`** — the "sidecar" pattern (see Stage 3) — not from re-exporting. + +## Pipeline DAG + +``` +Source collections (SESAR · OpenContext · GEOME · Smithsonian) + │ iSamples Central Solr API ── OFFLINE since Aug 2025 (cannot re-run) ──┐ + ▼ │ +STAGE 0/1 export_client → JSONL → GeoParquet │ frozen + → isamples_export_*_geo.parquet (Export format; ~300MB, 6.7M; Zenodo doi:10.5281/zenodo.15278211) + ▼ +STAGE 2 pqg/pqg/sql_converter.py (export → base PQG; 7-stage DuckDB SQL) + → narrow (…_narrow.parquet, ~844MB, 106M rows) and wide (…_wide.parquet, ~282MB, 20M rows) + ▼ +STAGE 3 sidecar/enrichment merge (LEFT JOIN by pid) ← Eric's independently-maintained OC PQG (GCS) + scripts/enrich_wide_with_oc_thumbnails.py → isamples_202604_wide.parquet (+47K thumbnails) + ▼ +STAGE 4 wide → frontend derived files (mostly AD-HOC / not checked in — see gaps) + → wide_h3 · h3_summary_res4/6/8 · samples_map_lite · sample_facets_v2 · facet_summaries · facet_cross_filter + → vocab_labels (scripts/build_vocab_labels.py — the one fully-scripted derived file; built from SKOS TTLs) + ▼ +STAGE 5 publish to R2 (bucket isamples-ry) + Cloudflare Worker (data.isamples.org, /current/ aliases) + ▼ +DuckDB-WASM in the browser (explorer.qmd; parquet URLs ~L767-781) +``` + +## Stages (script / command per step) + +| Stage | Input → Output | How (file:line) | Automated? | +|---|---|---|---| +| **0/1 Export** | Solr API → `isamples_export_*_geo.parquet` | `export_client` `ExportClient.perform_full_download()` (`export_client.py:423-469`) → `write_geoparquet_from_json_lines()`; schema `SOURCE_COLUMNS` (`duckdb_utilities.py:9-42`, incl. `keywords: STRUCT(keyword VARCHAR)[]` — **text only, no URI**, L17) | ❌ API offline; **frozen** | +| **2 Base PQG** | export → `*_narrow.parquet` / `*_wide.parquet` | `pqg/pqg/sql_converter.py` `convert_isamples_sql(input, output, wide=…)` (CLI `python pqg/sql_converter.py in.parquet out.parquet [--wide]`); 7 stages, decomposes nested structs → nodes+edges; site dedupe by rounded lat/lon+label | ✅ scripted (exact prod invocation not recorded — gap) | +| **3 Sidecar merge** | base wide + Eric's OC PQG → `isamples_202604_wide.parquet` | `scripts/enrich_wide_with_oc_thumbnails.py` — `LEFT JOIN` OC `(pid, thumbnail_url)` into wide (`COALESCE`). **This is the precedent for merging ANY per-source supplement (incl. concept URIs) by pid.** Drift check: `scripts/check_oc_pqg_drift.py` (detects only; no mirror) | ⚠️ merge scripted; OC mirror + R2 upload manual | +| **4 Frontend derived** | wide → 7 explorer files | `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs via rdflib). **The other 6** (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) have **no checked-in build script** — query patterns live in notebooks / `SERIALIZATIONS.md` only | ❌ ad-hoc (1 of 7 scripted) | +| **5 Publish** | files → R2 + Worker | Worker `workers/data-isamples-org/src/index.js` (`wrangler deploy`); immutable cache for `isamples_\d{6}_*.parquet`; `/current/.parquet` → 302 via `current/manifest.json`. Bucket `isamples-ry` | ⚠️ Worker scripted; **file upload + manifest update are manual** | + +## The sidecar/enrichment pattern (how new data gets in) + +Because the export is frozen, new per-source data is added by **merging a supplementary parquet keyed by `pid` into the base wide** — exactly what the thumbnail enrichment does: + +```sql +-- scripts/enrich_wide_with_oc_thumbnails.py (core) +CREATE TEMP TABLE oc_thumbs AS + SELECT DISTINCT pid, thumbnail_url FROM read_parquet('') WHERE thumbnail_url IS NOT NULL; +COPY (SELECT p.* REPLACE (COALESCE(oc.thumbnail_url, p.thumbnail_url) AS thumbnail_url) + FROM read_parquet('') p LEFT JOIN oc_thumbs oc ON p.pid = oc.pid) + TO '' (FORMAT PARQUET, COMPRESSION ZSTD); +``` + +Eric Kansa maintains OpenContext PQG **independently** on GCS (`storage.googleapis.com/opencontext-parquet/oc_isamples_pqg.parquet`), so it can carry data the frozen iSamples export lacks. This is the channel for **#263** (external concept URIs): Eric's OC PQG carries them → merged into wide by pid → flows to the derived files. *(Sidecar design endorsed 2026-04-17; the spec `project_isamples_sidecar_pattern.md` lives in the Obsidian vault, not a repo — gap.)* + +## Documentation / automation gaps + +- **6 of 7 frontend derived files have no checked-in build script** (`wide_h3`, the three `h3_summary_res*`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`). Query patterns exist in notebooks + `SERIALIZATIONS.md §4` but not as runnable COPY-TO scripts. `pqg add-h3` / `pqg facet-summaries` are named in the dev journal (Mar 2026) but **absent from `pqg/__main__.py`**. +- **No R2 upload automation** — file upload to bucket `isamples-ry` + `current/manifest.json` update are manual `wrangler`/dashboard steps. +- **No OC mirror script** — `check_oc_pqg_drift.py` detects GCS↔R2 drift but doesn't perform the mirror. +- **Exact prod invocation** that produced `zenodo_narrow_2025-12-12` / `zenodo_wide_2026-01-09` from the Zenodo export is not recorded (dedupe options unknown). +- **No Makefile / CI / post-render hook** rebuilds derived files when wide changes — every post-Stage-2 step is manual. +- **`SERIALIZATIONS.md:80`** claims every file "can be rebuilt by a script" — aspirational; true for ~4 of 10 files. +- **Sidecar spec** is in Obsidian only, not version-controlled with the code. + +## Key files +- `export_client/isamples_export_client/duckdb_utilities.py` — export schema (keywords narrowing @ L17) +- `pqg/pqg/sql_converter.py` — export→PQG engine; `pqg/docs/PQG_SPECIFICATION.md` — format spec +- `isamplesorg.github.io/scripts/enrich_wide_with_oc_thumbnails.py` — the sidecar-merge precedent +- `isamplesorg.github.io/scripts/build_vocab_labels.py` — the one scripted derived file +- `isamplesorg.github.io/scripts/check_oc_pqg_drift.py` — OC drift check +- `isamplesorg.github.io/workers/data-isamples-org/{src/index.js,wrangler.toml}` — Worker + R2 config +- `isamplesorg.github.io/SERIALIZATIONS.md` — format/schema reference (DAG companion to this file) diff --git a/scripts/build_frontend_derived.py b/scripts/build_frontend_derived.py new file mode 100644 index 00000000..d26f2766 --- /dev/null +++ b/scripts/build_frontend_derived.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +"""Build the explorer's frontend-derived parquet files from a wide PQG parquet. + +Closes the biggest provenance gap (see DATA_PROVENANCE.md): 6 of the 7 derived +files previously had no checked-in build script — only ad-hoc notebook SQL. This +reproduces them deterministically from one `wide` input. + +Inputs: a wide PQG parquet (e.g. isamples_YYYYMM_wide.parquet) — entity rows incl. + MaterialSampleRecord + IdentifiedConcept, with `geometry` (WKB) and the + `p__has_{material,context,sample_object}_category` row-id arrays. +Outputs (into --outdir, prefixed --tag, default `isamples_YYYYMM`): + - {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description, place_name + - {tag}_samples_map_lite.parquet pid, label, source, latitude, longitude, place_name[], result_time, h3_res8, h3_res8_hex + - {tag}_wide_h3.parquet wide + h3_res4/h3_res6/h3_res8 (large; use --skip wide_h3 to omit) + - {tag}_h3_summary_res{4,6,8}.parquet h3_cell, sample_count, center_lat, center_lng, dominant_source, source_count, resolution + - {tag}_facet_summaries.parquet facet_type, facet_value, scheme, count + - {tag}_facet_cross_filter.parquet filter_source/material/context/object_type, facet_type, facet_value, count + +Usage: + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag isamples_202601 + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --validate-against docs/data + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --only sample_facets_v2,facet_summaries +""" +import argparse, os, sys, time +import duckdb + +FACET_DIMS = ["source", "material", "context", "object_type"] + + +def log(msg, t0): + print(f"[{time.time()-t0:.1f}s] {msg}", flush=True) + + +def base_samples_sql(wide: str) -> str: + """One row per MaterialSampleRecord with resolved source/facet URIs, lat/lng, h3.""" + return f""" + CREATE OR REPLACE TEMP TABLE ic AS + SELECT row_id, pid AS uri FROM read_parquet('{wide}') WHERE otype='IdentifiedConcept'; + CREATE OR REPLACE TEMP TABLE samp AS + SELECT + s.pid, + s.n AS source, + s.label, + s.description, + s.place_name, -- VARCHAR[] + s.result_time, + ST_Y(ST_GeomFromWKB(s.geometry)) AS latitude, + ST_X(ST_GeomFromWKB(s.geometry)) AS longitude, + (SELECT uri FROM ic WHERE ic.row_id = s.p__has_material_category[1]) AS material, + (SELECT uri FROM ic WHERE ic.row_id = s.p__has_context_category[1]) AS context, + (SELECT uri FROM ic WHERE ic.row_id = s.p__has_sample_object_type[1]) AS object_type + FROM read_parquet('{wide}') s + WHERE s.otype='MaterialSampleRecord'; + -- coordinate-bearing subset + h3 cells (used by map_lite + h3 summaries) + CREATE OR REPLACE TEMP TABLE samp_geo AS + SELECT *, + h3_latlng_to_cell(latitude, longitude, 4) AS h3_res4, + h3_latlng_to_cell(latitude, longitude, 6) AS h3_res6, + h3_latlng_to_cell(latitude, longitude, 8) AS h3_res8 + FROM samp WHERE latitude IS NOT NULL AND longitude IS NOT NULL; + """ + + +def build_sample_facets_v2(con, out): + # located (coordinate-bearing) samples only — matches the published file, + # which is map-scoped (5.98M = GeospatialCoordLocation count). + con.execute(f"""COPY ( + SELECT pid, source, material, context, object_type, label, description, + place_name::VARCHAR AS place_name + FROM samp_geo + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_samples_map_lite(con, out): + con.execute(f"""COPY ( + SELECT pid, label, source, latitude, longitude, place_name, result_time, + h3_res8::UBIGINT AS h3_res8, + h3_h3_to_string(h3_res8) AS h3_res8_hex + FROM samp_geo + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_h3_summary(con, out, res): + col = f"h3_res{res}" + con.execute(f"""COPY ( + SELECT {col}::UBIGINT AS h3_cell, + COUNT(*) AS sample_count, + AVG(latitude) AS center_lat, + AVG(longitude) AS center_lng, + MODE(source) AS dominant_source, + COUNT(DISTINCT source) AS source_count, + {res} AS resolution + FROM samp_geo WHERE {col} IS NOT NULL + GROUP BY {col} + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_facet_summaries(con, out): + # one row per (facet_type, facet_value); scheme kept NULL to match published shape + union = " UNION ALL ".join( + f"SELECT '{d}' AS facet_type, {d} AS facet_value FROM samp_geo WHERE {d} IS NOT NULL" + for d in FACET_DIMS) + con.execute(f"""COPY ( + SELECT facet_type, facet_value, NULL::INTEGER AS scheme, COUNT(*) AS count + FROM ({union}) + GROUP BY facet_type, facet_value + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_facet_cross_filter(con, out): + # baseline (no filter) + every single-dimension filter value, with counts for + # all facet dims. Mirrors the cube fast-path the explorer reads (one filter_* set). + selects = [] + # baseline: all filter_* NULL + for fd in FACET_DIMS: + selects.append( + f"SELECT NULL::VARCHAR AS filter_source, NULL::VARCHAR AS filter_material, " + f"NULL::VARCHAR AS filter_context, NULL::VARCHAR AS filter_object_type, " + f"'{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " + f"FROM samp_geo WHERE {fd} IS NOT NULL GROUP BY {fd}") + # single-dimension filters + for filt in FACET_DIMS: + for fd in FACET_DIMS: + cols = ", ".join( + (f"{filt} AS filter_{c}" if c == filt else f"NULL::VARCHAR AS filter_{c}") + for c in FACET_DIMS) + selects.append( + f"SELECT {cols}, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " + f"FROM samp_geo WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}") + con.execute(f"""COPY ( + SELECT filter_source, filter_material, filter_context, filter_object_type, + facet_type, facet_value, count + FROM ({' UNION ALL '.join(selects)}) + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def build_wide_h3(con, wide, out): + con.execute(f"""COPY ( + SELECT *, + CASE WHEN geometry IS NOT NULL + THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 4) END AS h3_res4, + CASE WHEN geometry IS NOT NULL + THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 6) END AS h3_res6, + CASE WHEN geometry IS NOT NULL + THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 8) END AS h3_res8 + FROM read_parquet('{wide}') + ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--wide", required=True) + ap.add_argument("--outdir", required=True) + ap.add_argument("--tag", default="isamples_202601") + ap.add_argument("--only", default="", help="comma list: sample_facets_v2,samples_map_lite,wide_h3,h3_summaries,facet_summaries,facet_cross_filter") + ap.add_argument("--skip", default="", help="comma list of the same names to skip") + ap.add_argument("--validate-against", default="", help="dir of published files to compare schema+rowcount") + args = ap.parse_args() + os.makedirs(args.outdir, exist_ok=True) + only = set(filter(None, args.only.split(","))) + skip = set(filter(None, args.skip.split(","))) + want = lambda name: (not only or name in only) and name not in skip + + t0 = time.time() + con = duckdb.connect() + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + log("building base sample tables…", t0) + con.execute(base_samples_sql(args.wide)) + log(f"samp={con.sql('SELECT COUNT(*) FROM samp').fetchone()[0]:,} samp_geo={con.sql('SELECT COUNT(*) FROM samp_geo').fetchone()[0]:,}", t0) + + p = lambda name: os.path.join(args.outdir, f"{args.tag}_{name}.parquet") + if want("sample_facets_v2"): build_sample_facets_v2(con, p("sample_facets_v2")); log("sample_facets_v2 ✓", t0) + if want("facet_summaries"): build_facet_summaries(con, p("facet_summaries")); log("facet_summaries ✓", t0) + if want("facet_cross_filter"):build_facet_cross_filter(con, p("facet_cross_filter")); log("facet_cross_filter ✓", t0) + if want("samples_map_lite"): build_samples_map_lite(con, p("samples_map_lite")); log("samples_map_lite ✓", t0) + if want("h3_summaries"): + for res in (4, 6, 8): build_h3_summary(con, p(f"h3_summary_res{res}"), res) + log("h3_summary_res{4,6,8} ✓", t0) + if want("wide_h3"): build_wide_h3(con, args.wide, p("wide_h3")); log("wide_h3 ✓", t0) + + if args.validate_against: + print("\n=== validation vs published ===") + for name in ["sample_facets_v2","samples_map_lite","facet_summaries","facet_cross_filter", + "h3_summary_res4","h3_summary_res6","h3_summary_res8"]: + built = p(name) + pub = os.path.join(args.validate_against, f"{args.tag}_{name}.parquet") + if not (os.path.exists(built) and os.path.exists(pub)): + continue + bn = con.sql(f"SELECT COUNT(*) FROM read_parquet('{built}')").fetchone()[0] + pn = con.sql(f"SELECT COUNT(*) FROM read_parquet('{pub}')").fetchone()[0] + bcols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{built}')").fetchall()] + pcols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{pub}')").fetchall()] + ok = "✓" if bcols == pcols else "✗ COLS DIFFER" + print(f" {name:22} built={bn:>10,} pub={pn:>10,} cols {ok}") + log("done", t0) + + +if __name__ == "__main__": + main() From de0279d4ebd3ec78ba6eb400dd9d0a0ccf38295d Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Fri, 5 Jun 2026 17:51:35 -0700 Subject: [PATCH 2/5] pipeline: hardened, reproducible derived-parquet builder + AI-free tests (#273) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rebuilds the Stage-4 derived-parquet pipeline as a real, tested, human-runnable system (no AI in the loop to trust). Closes defects found by EXECUTION that document/AI review missed. build_frontend_derived.py (rewrite): - geometry-agnostic (WKB BLOB *or* DuckDB GEOMETRY) — fixes the silent BinderException on 202601/Zenodo wides - decorrelated concept resolution (unnest+arg_min + joins) — fixes the MAP-cross-join perf blowup (>16 min -> 5.4 s on the 20M-row wide) - material = first NON-ROOT concept (#265/#271); deterministic COPY ORDER BY + tie-broken dominant_source + rounded centroids - strict CLI (unknown --only/--skip fails; --tag required) - emits {tag}_manifest.json: input/output sha256, argv, git SHA, DuckDB + extension versions (machine-checkable build identity) validate_frontend_derived.py (new, algebraic gate): - asserts the derived-file ALGEBRA, not spot checks: summaries == GROUP BY facets; cross_filter == conditional GROUP BY; facets.pid == map_lite.pid; pid uniqueness; H3 counts sum to map_lite; schema. Non-zero exit on failure. tests/test_frontend_derived.py (new): fixture unit tests over tiny synthetic wides (BLOB + GEOMETRY), material/concept/place_name/CLI cases. 6 tests. Makefile (wide/derived/validate/test/all), scripts/requirements.txt (duckdb pinned), .github/workflows/pipeline-tests.yml (CI fixture gate). DATA_PROVENANCE.md + SERIALIZATIONS.md reconciled with reality: Stage-4 now scripted; geometry contract; non-reproducibility of deployed 202601 facets (346,768 vs 528,983); version skew; h3 UBIGINT; cross_filter shape; first-non-root vs leaf. Scope hardened by adversarial Codex audit (epic #273). Supersedes #271. Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/workflows/pipeline-tests.yml | 36 ++++ DATA_PROVENANCE.md | 25 ++- Makefile | 46 +++++ SERIALIZATIONS.md | 8 +- scripts/build_frontend_derived.py | 275 +++++++++++++++++++-------- scripts/requirements.txt | 7 + scripts/validate_frontend_derived.py | 174 +++++++++++++++++ tests/test_frontend_derived.py | 161 ++++++++++++++++ 8 files changed, 637 insertions(+), 95 deletions(-) create mode 100644 .github/workflows/pipeline-tests.yml create mode 100644 Makefile mode change 100644 => 100755 scripts/build_frontend_derived.py create mode 100755 scripts/validate_frontend_derived.py create mode 100644 tests/test_frontend_derived.py diff --git a/.github/workflows/pipeline-tests.yml b/.github/workflows/pipeline-tests.yml new file mode 100644 index 00000000..5fb01643 --- /dev/null +++ b/.github/workflows/pipeline-tests.yml @@ -0,0 +1,36 @@ +name: Derived-parquet pipeline tests + +# Fast, AI-free gate for the data pipeline. Runs the fixture-based unit tests +# (no network for data, no large parquet) whenever the pipeline code changes. +on: + pull_request: + paths: + - "scripts/build_frontend_derived.py" + - "scripts/validate_frontend_derived.py" + - "tests/test_frontend_derived.py" + - "scripts/requirements.txt" + - "Makefile" + - ".github/workflows/pipeline-tests.yml" + push: + branches: [main] + paths: + - "scripts/build_frontend_derived.py" + - "scripts/validate_frontend_derived.py" + - "tests/test_frontend_derived.py" + workflow_dispatch: + +jobs: + fixture-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - name: Install deps + run: pip install -r scripts/requirements.txt + - name: Run pipeline fixture tests + # builds tiny synthetic wides (WKB BLOB + DuckDB GEOMETRY), runs the real + # builder + algebraic validator, asserts the contract. Exits non-zero on + # any failure -> PR is blocked. + run: python -m pytest tests/test_frontend_derived.py -q diff --git a/DATA_PROVENANCE.md b/DATA_PROVENANCE.md index 20e7ab67..5f2e41a7 100644 --- a/DATA_PROVENANCE.md +++ b/DATA_PROVENANCE.md @@ -20,9 +20,10 @@ STAGE 2 pqg/pqg/sql_converter.py (export → base PQG; 7-stage DuckDB SQL) STAGE 3 sidecar/enrichment merge (LEFT JOIN by pid) ← Eric's independently-maintained OC PQG (GCS) scripts/enrich_wide_with_oc_thumbnails.py → isamples_202604_wide.parquet (+47K thumbnails) ▼ -STAGE 4 wide → frontend derived files (mostly AD-HOC / not checked in — see gaps) +STAGE 4 wide → frontend derived files (NOW SCRIPTED: scripts/build_frontend_derived.py) → wide_h3 · h3_summary_res4/6/8 · samples_map_lite · sample_facets_v2 · facet_summaries · facet_cross_filter - → vocab_labels (scripts/build_vocab_labels.py — the one fully-scripted derived file; built from SKOS TTLs) + → vocab_labels (scripts/build_vocab_labels.py — built separately from SKOS TTLs) + → {tag}_manifest.json (build identity: input+output sha256, argv, git SHA, DuckDB/extension versions) ▼ STAGE 5 publish to R2 (bucket isamples-ry) + Cloudflare Worker (data.isamples.org, /current/ aliases) ▼ @@ -36,7 +37,7 @@ DuckDB-WASM in the browser (explorer.qmd; parquet URLs ~L767-781) | **0/1 Export** | Solr API → `isamples_export_*_geo.parquet` | `export_client` `ExportClient.perform_full_download()` (`export_client.py:423-469`) → `write_geoparquet_from_json_lines()`; schema `SOURCE_COLUMNS` (`duckdb_utilities.py:9-42`, incl. `keywords: STRUCT(keyword VARCHAR)[]` — **text only, no URI**, L17) | ❌ API offline; **frozen** | | **2 Base PQG** | export → `*_narrow.parquet` / `*_wide.parquet` | `pqg/pqg/sql_converter.py` `convert_isamples_sql(input, output, wide=…)` (CLI `python pqg/sql_converter.py in.parquet out.parquet [--wide]`); 7 stages, decomposes nested structs → nodes+edges; site dedupe by rounded lat/lon+label | ✅ scripted (exact prod invocation not recorded — gap) | | **3 Sidecar merge** | base wide + Eric's OC PQG → `isamples_202604_wide.parquet` | `scripts/enrich_wide_with_oc_thumbnails.py` — `LEFT JOIN` OC `(pid, thumbnail_url)` into wide (`COALESCE`). **This is the precedent for merging ANY per-source supplement (incl. concept URIs) by pid.** Drift check: `scripts/check_oc_pqg_drift.py` (detects only; no mirror) | ⚠️ merge scripted; OC mirror + R2 upload manual | -| **4 Frontend derived** | wide → 7 explorer files | `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs via rdflib). **The other 6** (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) have **no checked-in build script** — query patterns live in notebooks / `SERIALIZATIONS.md` only | ❌ ad-hoc (1 of 7 scripted) | +| **4 Frontend derived** | wide → 7 explorer files | The 6 map/facet files (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) ← **`scripts/build_frontend_derived.py`** (deterministic; geometry-agnostic; emits a manifest). `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs). Gated by `scripts/validate_frontend_derived.py` (algebraic) + `tests/test_frontend_derived.py` (fixtures, CI). | ✅ scripted + tested | | **5 Publish** | files → R2 + Worker | Worker `workers/data-isamples-org/src/index.js` (`wrangler deploy`); immutable cache for `isamples_\d{6}_*.parquet`; `/current/.parquet` → 302 via `current/manifest.json`. Bucket `isamples-ry` | ⚠️ Worker scripted; **file upload + manifest update are manual** | ## The sidecar/enrichment pattern (how new data gets in) @@ -54,14 +55,22 @@ COPY (SELECT p.* REPLACE (COALESCE(oc.thumbnail_url, p.thumbnail_url) AS thumbna Eric Kansa maintains OpenContext PQG **independently** on GCS (`storage.googleapis.com/opencontext-parquet/oc_isamples_pqg.parquet`), so it can carry data the frozen iSamples export lacks. This is the channel for **#263** (external concept URIs): Eric's OC PQG carries them → merged into wide by pid → flows to the derived files. *(Sidecar design endorsed 2026-04-17; the spec `project_isamples_sidecar_pattern.md` lives in the Obsidian vault, not a repo — gap.)* -## Documentation / automation gaps +## Stage 4 builder contract (`scripts/build_frontend_derived.py`) -- **6 of 7 frontend derived files have no checked-in build script** (`wide_h3`, the three `h3_summary_res*`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`). Query patterns exist in notebooks + `SERIALIZATIONS.md §4` but not as runnable COPY-TO scripts. `pqg add-h3` / `pqg facet-summaries` are named in the dev journal (Mar 2026) but **absent from `pqg/__main__.py`**. +- **Geometry-agnostic input.** The `geometry` column may be **WKB BLOB** (e.g. `isamples_202604_wide`) or DuckDB **GEOMETRY** (e.g. `isamples_202601_wide`, the Zenodo wide). The builder detects the type at runtime — earlier ad-hoc SQL assumed BLOB and threw `BinderException` on GEOMETRY wides. +- **Material selection (#265/#271).** `material` = the **first NON-ROOT** concept in `p__has_material_category` (the root `.../material/1.0/material` "Material" can sit at any array position). Samples tagged only at the root get `NULL` material (excluded from the facet). This is **NOT leaf/most-specific** selection — the arrays are not clean SKOS paths. `context`/`object_type` use `[1]`; their root-dropping is deferred. +- **Determinism.** Every COPY has `ORDER BY`; `dominant_source` ties break on source name (ASC); center lat/lng rounded to 6 dp. +- **Reproducibility & build identity.** Each run writes `{tag}_manifest.json` (input + per-output sha256, argv, git SHA, DuckDB + extension versions). DuckDB pinned in `scripts/requirements.txt`. +- **Tested.** `tests/test_frontend_derived.py` (fixtures, CI via `.github/workflows/pipeline-tests.yml`) + `scripts/validate_frontend_derived.py` (algebraic: `facet_summaries == GROUP BY sample_facets_v2`, `facet_cross_filter == conditional GROUP BY`, `facets.pid == map_lite.pid`, pid uniqueness, H3 sums). `make test` / `make all`. + +## Documentation / automation gaps (remaining) + +- ⚠️ **The deployed `202601` derived files are NOT reproducible** from any available wide. A rebuild yields **528,983** root-material rows (pre-#271); the deployed `sample_facets_v2` has **346,768** — so the live files came from a different/unrecorded Stage-4 process, *and* the data has since rolled (wide is now `202604`). Treat a fresh `build_frontend_derived.py` run as the new source of truth, not as a bit-for-bit reproduction of the deployed files. +- **Version skew:** the deployed derived files are `202601` while the wide they should derive from is `202604` (the popup reads `202604`). Rebuilding from `202604` resolves it (tracked in the pipeline epic). - **No R2 upload automation** — file upload to bucket `isamples-ry` + `current/manifest.json` update are manual `wrangler`/dashboard steps. - **No OC mirror script** — `check_oc_pqg_drift.py` detects GCS↔R2 drift but doesn't perform the mirror. -- **Exact prod invocation** that produced `zenodo_narrow_2025-12-12` / `zenodo_wide_2026-01-09` from the Zenodo export is not recorded (dedupe options unknown). -- **No Makefile / CI / post-render hook** rebuilds derived files when wide changes — every post-Stage-2 step is manual. -- **`SERIALIZATIONS.md:80`** claims every file "can be rebuilt by a script" — aspirational; true for ~4 of 10 files. +- **Stage-2 prod invocation** that produced `zenodo_narrow_2025-12-12` / `zenodo_wide_2026-01-09` from the Zenodo export is still unrecorded (dedupe options unknown). +- **`SERIALIZATIONS.md:80`** claims every file "can be rebuilt by a script" — now true for the Stage-4 files; still aspirational for Stage-2. - **Sidecar spec** is in Obsidian only, not version-controlled with the code. ## Key files diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..bde1ddd5 --- /dev/null +++ b/Makefile @@ -0,0 +1,46 @@ +# Frontend-derived parquet pipeline — reproducible, AI-free. +# +# make test # fast fixture tests (no network, no big data) — the CI gate +# make wide # download + checksum the canonical wide parquet +# make derived # build the derived files from $(WIDE) into $(OUTDIR) +# make validate # algebraic trust gate over the built files (non-zero exit on failure) +# make all # wide -> derived -> validate +# +# Override on the command line, e.g.: +# make all WIDE_URL=https://data.isamples.org/isamples_202604_wide.parquet TAG=isamples_202606 +# +# Requirements: python with `pip install -r scripts/requirements.txt`, plus +# network access on first run (DuckDB pulls the h3 community extension). + +PY ?= python +WIDE_URL ?= https://data.isamples.org/isamples_202604_wide.parquet +OUTDIR ?= build/derived +WIDE ?= $(OUTDIR)/wide.parquet +TAG ?= isamples_dev +BUILD := scripts/build_frontend_derived.py +VALIDATE := scripts/validate_frontend_derived.py + +.PHONY: help test wide derived validate all clean +help: + @grep -E '^# make' Makefile | sed 's/^# / /' + +# Fast, deterministic fixture tests — the gate a human (or CI) runs without any AI. +test: + $(PY) -m pytest tests/test_frontend_derived.py -q + +wide: $(WIDE) +$(WIDE): + @mkdir -p $(OUTDIR) + curl -fSL -o $(WIDE) "$(WIDE_URL)" + @echo "sha256: $$(shasum -a 256 $(WIDE) | cut -d' ' -f1) $(WIDE)" + +derived: $(WIDE) + $(PY) $(BUILD) --wide $(WIDE) --outdir $(OUTDIR) --tag $(TAG) --skip wide_h3 + +validate: + $(PY) $(VALIDATE) --dir $(OUTDIR) --tag $(TAG) + +all: wide derived validate + +clean: + rm -rf $(OUTDIR) diff --git a/SERIALIZATIONS.md b/SERIALIZATIONS.md index b4211495..54f9d6cb 100644 --- a/SERIALIZATIONS.md +++ b/SERIALIZATIONS.md @@ -226,7 +226,7 @@ for the alias when you want "latest." ### 4.6 `isamples_202601_h3_summary_res{4,6,8}.parquet` - **Role**: Zoom-adaptive aggregates that back the Cesium progressive globe and the Python Explorer's "H3 tier" rendering mode. -- **Headline schema** (7 cols, identical across resolutions): `h3_cell` (BIGINT), `sample_count` (INT), `center_lat`, `center_lng` (DOUBLE), `dominant_source` (VARCHAR), `source_count` (INT), `resolution` (INT). +- **Headline schema** (7 cols, identical across resolutions): `h3_cell` (**UBIGINT** — H3 cells are unsigned 64-bit; a signed BIGINT would go negative for high-bit cells), `sample_count` (INT), `center_lat`, `center_lng` (DOUBLE, rounded 6 dp), `dominant_source` (VARCHAR; ties broken by source name ASC for determinism), `source_count` (INT), `resolution` (INT). - **Query pattern**: fetch the right resolution for the current zoom; no join needed. - **DuckDB**: ```sql @@ -247,8 +247,8 @@ for the alias when you want "latest." ### 4.8 `isamples_202601_sample_facets_v2.parquet` -- **Role**: Cross-dimension facet filtering — one row per sample, each facet column holds a single controlled-vocabulary URI (the leaf concept the sample is tagged with at that dimension). -- **Headline schema** (8 cols, all VARCHAR): `pid, source, material, context, object_type, label, description, place_name`. `material`/`context`/`object_type` are scalar URI strings, NOT arrays — the file's grain is one row per sample, so a sample tagged with multiple material URIs is represented by a single chosen URI (currently the first/leaf). For multi-material accuracy, JOIN back to `wide.p__has_material_category`. +- **Role**: Cross-dimension facet filtering — one row per sample, each facet column holds a single controlled-vocabulary URI. +- **Headline schema** (8 cols, all VARCHAR): `pid, source, material, context, object_type, label, description, place_name`. `material`/`context`/`object_type` are scalar URI strings, NOT arrays — one row per sample, so a sample tagged with multiple URIs is represented by a single chosen URI. **Selection rule:** `material` = the **first NON-ROOT** concept in the array (the broad root `.../material/1.0/material` is dropped — #265/#271); root-only samples → NULL material. This is **NOT** necessarily the leaf/most-specific concept (the arrays are not clean SKOS paths). `context`/`object_type` = the first array element (`[1]`). `place_name` is a VARCHAR cast of the wide's `VARCHAR[]` (note: `samples_map_lite` keeps `place_name` as `VARCHAR[]`). For multi-value accuracy, JOIN back to `wide.p__has_*_category`. - **Query pattern**: `WHERE material = ''` for exact match; `WHERE material ILIKE '%rock%'` to substring-match URI fragments. - **DuckDB**: ```sql @@ -272,7 +272,7 @@ for the alias when you want "latest." ### 4.10 `isamples_202601_facet_cross_filter.parquet` - **Role**: Cross-facet counts for the single-active-filter case (QUERY_SPEC §3.3 tier 2a). Avoids recomputing when one facet dimension is active. -- **Headline schema** (7 cols, 526 rows): `filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value, count`. Exactly one `filter_*` column is non-NULL per row. +- **Headline schema** (7 cols): `filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value, count`. Two row kinds: **baseline** rows have **all** `filter_*` NULL (these equal `facet_summaries`); **single-dimension** rows have **exactly one** `filter_*` non-NULL. Single-dimension rows include self-dimension counts (`facet_type == filter dim`), which the explorer ignores. (Both kinds are emitted by `build_frontend_derived.py` and asserted by `validate_frontend_derived.py`.) - **Query pattern**: lookup by the active filter to get counts for the remaining dimensions. - **DuckDB**: ```sql diff --git a/scripts/build_frontend_derived.py b/scripts/build_frontend_derived.py old mode 100644 new mode 100755 index d26f2766..1a5ffb48 --- a/scripts/build_frontend_derived.py +++ b/scripts/build_frontend_derived.py @@ -1,73 +1,137 @@ #!/usr/bin/env python3 """Build the explorer's frontend-derived parquet files from a wide PQG parquet. -Closes the biggest provenance gap (see DATA_PROVENANCE.md): 6 of the 7 derived -files previously had no checked-in build script — only ad-hoc notebook SQL. This -reproduces them deterministically from one `wide` input. - -Inputs: a wide PQG parquet (e.g. isamples_YYYYMM_wide.parquet) — entity rows incl. - MaterialSampleRecord + IdentifiedConcept, with `geometry` (WKB) and the - `p__has_{material,context,sample_object}_category` row-id arrays. -Outputs (into --outdir, prefixed --tag, default `isamples_YYYYMM`): - - {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description, place_name - - {tag}_samples_map_lite.parquet pid, label, source, latitude, longitude, place_name[], result_time, h3_res8, h3_res8_hex - - {tag}_wide_h3.parquet wide + h3_res4/h3_res6/h3_res8 (large; use --skip wide_h3 to omit) - - {tag}_h3_summary_res{4,6,8}.parquet h3_cell, sample_count, center_lat, center_lng, dominant_source, source_count, resolution - - {tag}_facet_summaries.parquet facet_type, facet_value, scheme, count - - {tag}_facet_cross_filter.parquet filter_source/material/context/object_type, facet_type, facet_value, count +Deterministic, reproducible builder for the 6 ad-hoc Stage-4 derived files +(see DATA_PROVENANCE.md). Every run also writes a manifest (input + output +checksums, argv, git SHA, DuckDB + extension versions) so a build is +machine-identifiable and re-verifiable. + +INPUT CONTRACT (enforced/handled): + A wide PQG parquet with entity rows incl. MaterialSampleRecord + + IdentifiedConcept. The `geometry` column may be **WKB BLOB** or DuckDB + **GEOMETRY** — both are handled (detected at runtime). Concept references + live in `p__has_{material,context,sample_object}_category` row-id arrays. + +OUTPUTS (into --outdir, prefixed --tag): + - {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description, place_name(VARCHAR) + - {tag}_samples_map_lite.parquet pid, label, source, latitude, longitude, place_name(VARCHAR[]), result_time, h3_res8(UBIGINT), h3_res8_hex + - {tag}_h3_summary_res{4,6,8}.parquet h3_cell(UBIGINT), sample_count(INT), center_lat, center_lng, dominant_source, source_count(INT), resolution(INT) + - {tag}_facet_summaries.parquet facet_type, facet_value, scheme, count + - {tag}_facet_cross_filter.parquet filter_source/material/context/object_type, facet_type, facet_value, count + - {tag}_wide_h3.parquet wide + h3_res4/6/8 (large; built only on --only wide_h3) + - {tag}_manifest.json provenance + per-output rowcount/schema/sha256 + +MATERIAL SELECTION (issue #265/#271): the broad SKOS root +`.../material/1.0/material` ("Material") can appear at ANY position in the +concept array; the old `[1]` pick surfaced it as a bogus facet value. We pick +the FIRST NON-ROOT concept (by array order); samples tagged ONLY at the root +get NULL material (excluded from the facet). This is NOT leaf/most-specific +selection — see DATA_PROVENANCE.md. context/object_type use the first array +element ([1]); their root-dropping is deferred (tracked in the pipeline epic). + +Determinism: every COPY has an ORDER BY; dominant_source ties break on source +name (ASC); center lat/lng are rounded to 6 dp. Usage: - python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag isamples_202601 - python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --validate-against docs/data - python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --only sample_facets_v2,facet_summaries + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag isamples_202606 + python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag T --only sample_facets_v2,facet_summaries """ -import argparse, os, sys, time +import argparse, hashlib, json, os, subprocess, sys, time import duckdb +MATERIAL_ROOT = "https://w3id.org/isample/vocabulary/material/1.0/material" FACET_DIMS = ["source", "material", "context", "object_type"] +# the artifacts this script knows how to build (for --only/--skip validation) +ARTIFACTS = ["sample_facets_v2", "samples_map_lite", "h3_summaries", + "facet_summaries", "facet_cross_filter", "wide_h3"] def log(msg, t0): - print(f"[{time.time()-t0:.1f}s] {msg}", flush=True) + print(f"[{time.time()-t0:6.1f}s] {msg}", flush=True) + + +def sha256_file(path, _bufsize=1 << 20): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(_bufsize), b""): + h.update(chunk) + return h.hexdigest() -def base_samples_sql(wide: str) -> str: - """One row per MaterialSampleRecord with resolved source/facet URIs, lat/lng, h3.""" - return f""" +def geometry_expr(con, wide): + """Return a SQL expression yielding a GEOMETRY from `s.geometry`, whether the + column is stored as WKB BLOB or as DuckDB GEOMETRY. Fixes the silent + BLOB-only contract (202604 wide = BLOB; 202601/Zenodo wides = GEOMETRY).""" + rows = con.sql(f"DESCRIBE SELECT geometry FROM read_parquet('{wide}') LIMIT 0").fetchall() + coltype = rows[0][1].upper() if rows else "BLOB" + if coltype == "GEOMETRY": + return "s.geometry" + if coltype in ("BLOB", "WKB_BLOB", "VARBINARY"): + return "ST_GeomFromWKB(s.geometry)" + raise SystemExit(f"FATAL: unexpected geometry column type {coltype!r} in {wide}") + + +def build_base_tables(con, wide, t0): + geom = geometry_expr(con, wide) + con.execute(f""" CREATE OR REPLACE TEMP TABLE ic AS SELECT row_id, pid AS uri FROM read_parquet('{wide}') WHERE otype='IdentifiedConcept'; + + -- material: FIRST NON-ROOT concept per sample. Decorrelated (unnest+join+ + -- arg_min by array ordinality) — NOT a correlated subquery and NOT a MAP + -- cross-join (both of which blow up the planner on 20M rows). + CREATE OR REPLACE TEMP TABLE mat AS + WITH ex AS ( + SELECT s.pid AS pid, u.rid AS rid, u.ord AS ord + FROM read_parquet('{wide}') s, + UNNEST(s.p__has_material_category) WITH ORDINALITY AS u(rid, ord) + WHERE s.otype='MaterialSampleRecord' + ) + SELECT ex.pid, arg_min(ic.uri, ex.ord) AS material + FROM ex JOIN ic ON ic.row_id = ex.rid + WHERE ic.uri <> '{MATERIAL_ROOT}' + GROUP BY ex.pid; + + -- one row per MaterialSampleRecord; all concept resolution via JOINs (decorrelated). CREATE OR REPLACE TEMP TABLE samp AS SELECT s.pid, - s.n AS source, + s.n AS source, s.label, s.description, - s.place_name, -- VARCHAR[] + s.place_name, -- VARCHAR[] s.result_time, - ST_Y(ST_GeomFromWKB(s.geometry)) AS latitude, - ST_X(ST_GeomFromWKB(s.geometry)) AS longitude, - (SELECT uri FROM ic WHERE ic.row_id = s.p__has_material_category[1]) AS material, - (SELECT uri FROM ic WHERE ic.row_id = s.p__has_context_category[1]) AS context, - (SELECT uri FROM ic WHERE ic.row_id = s.p__has_sample_object_type[1]) AS object_type + ROUND(ST_Y({geom}), 6) AS latitude, + ROUND(ST_X({geom}), 6) AS longitude, + mat.material AS material, + ctx.uri AS context, + obj.uri AS object_type FROM read_parquet('{wide}') s + LEFT JOIN mat ON mat.pid = s.pid + LEFT JOIN ic AS ctx ON ctx.row_id = s.p__has_context_category[1] + LEFT JOIN ic AS obj ON obj.row_id = s.p__has_sample_object_type[1] WHERE s.otype='MaterialSampleRecord'; - -- coordinate-bearing subset + h3 cells (used by map_lite + h3 summaries) + CREATE OR REPLACE TEMP TABLE samp_geo AS SELECT *, h3_latlng_to_cell(latitude, longitude, 4) AS h3_res4, h3_latlng_to_cell(latitude, longitude, 6) AS h3_res6, h3_latlng_to_cell(latitude, longitude, 8) AS h3_res8 FROM samp WHERE latitude IS NOT NULL AND longitude IS NOT NULL; - """ + """) + n_samp = con.sql("SELECT COUNT(*) FROM samp").fetchone()[0] + n_geo = con.sql("SELECT COUNT(*) FROM samp_geo").fetchone()[0] + n_dup = con.sql("SELECT COUNT(*) FROM (SELECT pid FROM samp_geo GROUP BY pid HAVING COUNT(*)>1)").fetchone()[0] + log(f"samp={n_samp:,} samp_geo={n_geo:,} duplicate_pids={n_dup:,}", t0) + if n_dup: + print(f"WARNING: {n_dup:,} duplicate pids in samp_geo — facet counts/joins may inflate", flush=True) def build_sample_facets_v2(con, out): - # located (coordinate-bearing) samples only — matches the published file, - # which is map-scoped (5.98M = GeospatialCoordLocation count). con.execute(f"""COPY ( SELECT pid, source, material, context, object_type, label, description, place_name::VARCHAR AS place_name - FROM samp_geo + FROM samp_geo ORDER BY pid ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") @@ -76,27 +140,42 @@ def build_samples_map_lite(con, out): SELECT pid, label, source, latitude, longitude, place_name, result_time, h3_res8::UBIGINT AS h3_res8, h3_h3_to_string(h3_res8) AS h3_res8_hex - FROM samp_geo + FROM samp_geo ORDER BY pid ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") def build_h3_summary(con, out, res): col = f"h3_res{res}" + # deterministic dominant_source: max sample count, ties broken by source name ASC. con.execute(f"""COPY ( - SELECT {col}::UBIGINT AS h3_cell, - COUNT(*) AS sample_count, - AVG(latitude) AS center_lat, - AVG(longitude) AS center_lng, - MODE(source) AS dominant_source, - COUNT(DISTINCT source) AS source_count, - {res} AS resolution - FROM samp_geo WHERE {col} IS NOT NULL - GROUP BY {col} + WITH sc AS ( + SELECT {col} AS cell, source, COUNT(*) AS c + FROM samp_geo WHERE {col} IS NOT NULL GROUP BY {col}, source + ), + dom AS ( + SELECT cell, source AS dominant_source, + ROW_NUMBER() OVER (PARTITION BY cell ORDER BY c DESC, source ASC) AS rn + FROM sc + ), + agg AS ( + SELECT {col} AS cell, COUNT(*) AS sample_count, + ROUND(AVG(latitude), 6) AS center_lat, + ROUND(AVG(longitude), 6) AS center_lng, + COUNT(DISTINCT source) AS source_count + FROM samp_geo WHERE {col} IS NOT NULL GROUP BY {col} + ) + SELECT agg.cell::UBIGINT AS h3_cell, + agg.sample_count::INTEGER AS sample_count, + agg.center_lat, agg.center_lng, + dom.dominant_source, + agg.source_count::INTEGER AS source_count, + {res}::INTEGER AS resolution + FROM agg JOIN dom ON dom.cell = agg.cell AND dom.rn = 1 + ORDER BY h3_cell ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") def build_facet_summaries(con, out): - # one row per (facet_type, facet_value); scheme kept NULL to match published shape union = " UNION ALL ".join( f"SELECT '{d}' AS facet_type, {d} AS facet_value FROM samp_geo WHERE {d} IS NOT NULL" for d in FACET_DIMS) @@ -104,21 +183,22 @@ def build_facet_summaries(con, out): SELECT facet_type, facet_value, NULL::INTEGER AS scheme, COUNT(*) AS count FROM ({union}) GROUP BY facet_type, facet_value + ORDER BY facet_type, facet_value ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") def build_facet_cross_filter(con, out): - # baseline (no filter) + every single-dimension filter value, with counts for - # all facet dims. Mirrors the cube fast-path the explorer reads (one filter_* set). + # baseline (all filter_* NULL) + single-dimension filters. NOTE: the shape + # (incl. baseline rows + self-dimension rows) matches what the deployed + # explorer reads; see SERIALIZATIONS.md for the exact contract. Determinism + # via ORDER BY. selects = [] - # baseline: all filter_* NULL for fd in FACET_DIMS: selects.append( f"SELECT NULL::VARCHAR AS filter_source, NULL::VARCHAR AS filter_material, " f"NULL::VARCHAR AS filter_context, NULL::VARCHAR AS filter_object_type, " f"'{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count " f"FROM samp_geo WHERE {fd} IS NOT NULL GROUP BY {fd}") - # single-dimension filters for filt in FACET_DIMS: for fd in FACET_DIMS: cols = ", ".join( @@ -131,67 +211,96 @@ def build_facet_cross_filter(con, out): SELECT filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value, count FROM ({' UNION ALL '.join(selects)}) + ORDER BY filter_source, filter_material, filter_context, filter_object_type, facet_type, facet_value ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") def build_wide_h3(con, wide, out): + geom = geometry_expr(con, wide).replace("s.geometry", "geometry") con.execute(f"""COPY ( SELECT *, - CASE WHEN geometry IS NOT NULL - THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 4) END AS h3_res4, - CASE WHEN geometry IS NOT NULL - THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 6) END AS h3_res6, - CASE WHEN geometry IS NOT NULL - THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 8) END AS h3_res8 - FROM read_parquet('{wide}') + CASE WHEN geometry IS NOT NULL THEN h3_latlng_to_cell(ST_Y({geom}), ST_X({geom}), 4) END AS h3_res4, + CASE WHEN geometry IS NOT NULL THEN h3_latlng_to_cell(ST_Y({geom}), ST_X({geom}), 6) END AS h3_res6, + CASE WHEN geometry IS NOT NULL THEN h3_latlng_to_cell(ST_Y({geom}), ST_X({geom}), 8) END AS h3_res8 + FROM read_parquet('{wide}') ORDER BY pid ) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""") +def file_meta(con, path): + n = con.sql(f"SELECT COUNT(*) FROM read_parquet('{path}')").fetchone()[0] + schema = [(r[0], r[1]) for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{path}')").fetchall()] + return {"rows": n, "schema": schema, "bytes": os.path.getsize(path), "sha256": sha256_file(path)} + + +def git_sha(): + try: + return subprocess.check_output(["git", "rev-parse", "HEAD"], + cwd=os.path.dirname(os.path.abspath(__file__)), + stderr=subprocess.DEVNULL).decode().strip() + except Exception: + return None + + def main(): ap = argparse.ArgumentParser() ap.add_argument("--wide", required=True) ap.add_argument("--outdir", required=True) - ap.add_argument("--tag", default="isamples_202601") - ap.add_argument("--only", default="", help="comma list: sample_facets_v2,samples_map_lite,wide_h3,h3_summaries,facet_summaries,facet_cross_filter") + ap.add_argument("--tag", required=True, help="output prefix, e.g. isamples_202606 (no stale default)") + ap.add_argument("--only", default="", help=f"comma list of: {','.join(ARTIFACTS)}") ap.add_argument("--skip", default="", help="comma list of the same names to skip") - ap.add_argument("--validate-against", default="", help="dir of published files to compare schema+rowcount") + ap.add_argument("--no-manifest", action="store_true", help="skip writing {tag}_manifest.json") args = ap.parse_args() - os.makedirs(args.outdir, exist_ok=True) + only = set(filter(None, args.only.split(","))) skip = set(filter(None, args.skip.split(","))) + bad = (only | skip) - set(ARTIFACTS) + if bad: # Codex #3: fail loudly on typos instead of silently building nothing + sys.exit(f"FATAL: unknown --only/--skip name(s): {sorted(bad)}. Known: {ARTIFACTS}") want = lambda name: (not only or name in only) and name not in skip + os.makedirs(args.outdir, exist_ok=True) t0 = time.time() con = duckdb.connect() con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") log("building base sample tables…", t0) - con.execute(base_samples_sql(args.wide)) - log(f"samp={con.sql('SELECT COUNT(*) FROM samp').fetchone()[0]:,} samp_geo={con.sql('SELECT COUNT(*) FROM samp_geo').fetchone()[0]:,}", t0) + build_base_tables(con, args.wide, t0) p = lambda name: os.path.join(args.outdir, f"{args.tag}_{name}.parquet") - if want("sample_facets_v2"): build_sample_facets_v2(con, p("sample_facets_v2")); log("sample_facets_v2 ✓", t0) - if want("facet_summaries"): build_facet_summaries(con, p("facet_summaries")); log("facet_summaries ✓", t0) - if want("facet_cross_filter"):build_facet_cross_filter(con, p("facet_cross_filter")); log("facet_cross_filter ✓", t0) - if want("samples_map_lite"): build_samples_map_lite(con, p("samples_map_lite")); log("samples_map_lite ✓", t0) + produced = [] + def emit(name, fn): + if want(name): + fn(p(name)); produced.append(p(name)); log(f"{name} ✓", t0) + + emit("sample_facets_v2", lambda o: build_sample_facets_v2(con, o)) + emit("facet_summaries", lambda o: build_facet_summaries(con, o)) + emit("facet_cross_filter", lambda o: build_facet_cross_filter(con, o)) + emit("samples_map_lite", lambda o: build_samples_map_lite(con, o)) if want("h3_summaries"): - for res in (4, 6, 8): build_h3_summary(con, p(f"h3_summary_res{res}"), res) + for res in (4, 6, 8): + build_h3_summary(con, p(f"h3_summary_res{res}"), res); produced.append(p(f"h3_summary_res{res}")) log("h3_summary_res{4,6,8} ✓", t0) - if want("wide_h3"): build_wide_h3(con, args.wide, p("wide_h3")); log("wide_h3 ✓", t0) - - if args.validate_against: - print("\n=== validation vs published ===") - for name in ["sample_facets_v2","samples_map_lite","facet_summaries","facet_cross_filter", - "h3_summary_res4","h3_summary_res6","h3_summary_res8"]: - built = p(name) - pub = os.path.join(args.validate_against, f"{args.tag}_{name}.parquet") - if not (os.path.exists(built) and os.path.exists(pub)): - continue - bn = con.sql(f"SELECT COUNT(*) FROM read_parquet('{built}')").fetchone()[0] - pn = con.sql(f"SELECT COUNT(*) FROM read_parquet('{pub}')").fetchone()[0] - bcols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{built}')").fetchall()] - pcols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{pub}')").fetchall()] - ok = "✓" if bcols == pcols else "✗ COLS DIFFER" - print(f" {name:22} built={bn:>10,} pub={pn:>10,} cols {ok}") + emit("wide_h3", lambda o: build_wide_h3(con, args.wide, o)) + + if not args.no_manifest: + log("hashing inputs/outputs for manifest…", t0) + exts = {r[0]: r[1] for r in con.sql( + "SELECT extension_name, extension_version FROM duckdb_extensions() WHERE installed").fetchall()} + manifest = { + "tag": args.tag, + "argv": sys.argv, + "git_sha": git_sha(), + "duckdb_version": duckdb.__version__, + "extensions": exts, + "input": {"path": args.wide, + "bytes": (os.path.getsize(args.wide) if os.path.exists(args.wide) else None), + "sha256": (sha256_file(args.wide) if os.path.exists(args.wide) else "remote/unhashed")}, + "outputs": {os.path.basename(f): file_meta(con, f) for f in produced}, + } + mpath = p("manifest").replace(".parquet", ".json") + with open(mpath, "w") as fh: + json.dump(manifest, fh, indent=2) + log(f"manifest → {mpath}", t0) + log("done", t0) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 67746e46..0015d6da 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -6,3 +6,10 @@ rdflib>=6.3 pandas>=2.0 pyarrow>=14 + +# build_frontend_derived.py + validate_frontend_derived.py + tests/ +# DuckDB pinned for reproducible builds; the h3 (community) and spatial +# extensions are version-bound to this DuckDB release and installed at runtime +# (the build manifest records the exact resolved extension hashes). +duckdb==1.4.4 +pytest>=8.0 diff --git a/scripts/validate_frontend_derived.py b/scripts/validate_frontend_derived.py new file mode 100755 index 00000000..f24ee3f3 --- /dev/null +++ b/scripts/validate_frontend_derived.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""Algebraic, AI-free trust gate for the explorer's derived parquet. + +This is NOT a code review and NOT a spot check. It recomputes the derived-file +*algebra* from `sample_facets_v2` / `samples_map_lite` and asserts the other +files agree. A rebuild that is wrong (collapsed materials, stale summaries, +drifted cross-filter, duplicate pids, broken H3) FAILS here. Exits non-zero on +any failure so it can gate CI / a publish. + +Usage: + python scripts/validate_frontend_derived.py --dir /tmp/rebuild-202606 --tag isamples_202606 + # or point at live/remote files: + python scripts/validate_frontend_derived.py \ + --facets URL --map-lite URL --summaries URL --cross-filter URL \ + --h3 URL4 URL6 URL8 +""" +import argparse, os, sys +import duckdb + +MATERIAL_ROOT = "https://w3id.org/isample/vocabulary/material/1.0/material" +PID_K = "ark:/28722/k2p55x96j" # #260 sentinel: must not flip under #271 selection +PID_K_EXPECTED = "https://w3id.org/isample/vocabulary/material/1.0/anthropogenicmetal" + +EXPECTED_SCHEMA = { + "facets": [("pid", "VARCHAR"), ("source", "VARCHAR"), ("material", "VARCHAR"), + ("context", "VARCHAR"), ("object_type", "VARCHAR"), ("label", "VARCHAR"), + ("description", "VARCHAR"), ("place_name", "VARCHAR")], +} + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--dir"); ap.add_argument("--tag") + ap.add_argument("--facets"); ap.add_argument("--map-lite") + ap.add_argument("--summaries"); ap.add_argument("--cross-filter") + ap.add_argument("--h3", nargs=3, metavar=("R4", "R6", "R8")) + ap.add_argument("--min-rows", type=int, default=1_000_000, + help="floor for the non-empty sanity check (use 1 for fixtures)") + a = ap.parse_args() + + def f(name, attr): + v = getattr(a, attr) + if v: + return v + if a.dir and a.tag: + return os.path.join(a.dir, f"{a.tag}_{name}.parquet") + sys.exit(f"FATAL: provide --{attr} or both --dir and --tag") + + facets = f("sample_facets_v2", "facets") + maplite = f("samples_map_lite", "map_lite") + summaries = f("facet_summaries", "summaries") + crossf = f("facet_cross_filter", "cross_filter") + h3 = a.h3 or ([os.path.join(a.dir, f"{a.tag}_h3_summary_res{r}.parquet") for r in (4, 6, 8)] + if a.dir and a.tag else None) + + con = duckdb.connect() + R = [] # (name, passed, detail) + info = [] + def check(name, passed, detail=""): + R.append((name, bool(passed), detail)) + def scalar(sql): + return con.sql(sql).fetchone()[0] + + F = f"read_parquet('{facets}')" + ML = f"read_parquet('{maplite}')" + S = f"read_parquet('{summaries}')" + CF = f"read_parquet('{crossf}')" + + # --- 1. material root absent (the #265/#271 contract) --- + check("material root absent", scalar(f"SELECT COUNT(*) FROM {F} WHERE material='{MATERIAL_ROOT}'") == 0, + "facets rows with bare root material (want 0)") + + # --- 2. #260 sentinel preserved (skip when the pid isn't in this dataset, e.g. fixtures) --- + row = con.sql(f"SELECT material FROM {F} WHERE pid='{PID_K}'").fetchone() + if row is None: + info.append(f"sentinel {PID_K} not present (N/A for this dataset)") + else: + check(f"sentinel {PID_K} preserved", row[0] == PID_K_EXPECTED, f"got {row}") + + # --- 3. PID uniqueness (browser relies on one row per pid) --- + check("facets pid unique", scalar(f"SELECT COUNT(*) FROM (SELECT pid FROM {F} GROUP BY pid HAVING COUNT(*)>1)") == 0, + "duplicate pids in facets") + check("map_lite pid unique", scalar(f"SELECT COUNT(*) FROM (SELECT pid FROM {ML} GROUP BY pid HAVING COUNT(*)>1)") == 0, + "duplicate pids in map_lite") + + # --- 4. facets.pid SET == map_lite.pid SET --- + diff = scalar(f"""SELECT + (SELECT COUNT(*) FROM (SELECT pid FROM {F} EXCEPT SELECT pid FROM {ML})) + + (SELECT COUNT(*) FROM (SELECT pid FROM {ML} EXCEPT SELECT pid FROM {F}))""") + check("facets.pid == map_lite.pid", diff == 0, f"{diff} pids differ between facets and map_lite") + + # --- 5. ALGEBRA: facet_summaries == GROUP BY facets (per dim) --- + recompute = " UNION ALL ".join( + f"SELECT '{d}' AS facet_type, {d} AS facet_value, COUNT(*) AS c FROM {F} WHERE {d} IS NOT NULL GROUP BY {d}" + for d in ("source", "material", "context", "object_type")) + mismatch = scalar(f""" + WITH recomputed AS ({recompute}), + summ AS (SELECT facet_type, facet_value, count AS c FROM {S}) + SELECT COUNT(*) FROM ( + SELECT * FROM recomputed EXCEPT SELECT * FROM summ + UNION ALL + SELECT * FROM summ EXCEPT SELECT * FROM recomputed)""") + check("facet_summaries == GROUP BY facets", mismatch == 0, f"{mismatch} (facet_type,value,count) rows disagree") + + # --- 6. ALGEBRA: facet_cross_filter single-dim rows == conditional GROUP BY facets --- + dims = ("source", "material", "context", "object_type") + parts = [] + for filt in dims: + for fd in dims: + parts.append( + f"SELECT '{filt}' AS fcol, {filt} AS fval, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS c " + f"FROM {F} WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}") + recompute_cf = " UNION ALL ".join(parts) + # normalize cross_filter single-dim rows into (fcol, fval, facet_type, facet_value, count) + cf_single = f""" + SELECT 'source' AS fcol, filter_source AS fval, facet_type, facet_value, count FROM {CF} WHERE filter_source IS NOT NULL + UNION ALL SELECT 'material', filter_material, facet_type, facet_value, count FROM {CF} WHERE filter_material IS NOT NULL + UNION ALL SELECT 'context', filter_context, facet_type, facet_value, count FROM {CF} WHERE filter_context IS NOT NULL + UNION ALL SELECT 'object_type', filter_object_type, facet_type, facet_value, count FROM {CF} WHERE filter_object_type IS NOT NULL + """ + cf_mismatch = scalar(f""" + WITH rc AS ({recompute_cf}), cf AS ({cf_single}) + SELECT COUNT(*) FROM ( + SELECT * FROM rc EXCEPT SELECT * FROM cf + UNION ALL + SELECT * FROM cf EXCEPT SELECT * FROM rc)""") + check("cross_filter == conditional GROUP BY facets", cf_mismatch == 0, f"{cf_mismatch} single-dim rows disagree") + + # --- 7. cross_filter baseline (all filter_* NULL) == facet_summaries --- + base_mismatch = scalar(f""" + WITH base AS (SELECT facet_type, facet_value, count FROM {CF} + WHERE filter_source IS NULL AND filter_material IS NULL + AND filter_context IS NULL AND filter_object_type IS NULL), + summ AS (SELECT facet_type, facet_value, count FROM {S}) + SELECT COUNT(*) FROM ( + SELECT * FROM base EXCEPT SELECT * FROM summ + UNION ALL SELECT * FROM summ EXCEPT SELECT * FROM base)""") + check("cross_filter baseline == summaries", base_mismatch == 0, f"{base_mismatch} baseline rows disagree") + + # --- 8. H3: per-resolution sample_count sums to located-sample total --- + if h3: + ml_n = scalar(f"SELECT COUNT(*) FROM {ML}") + for res, hp in zip((4, 6, 8), h3): + tot = scalar(f"SELECT SUM(sample_count) FROM read_parquet('{hp}')") + check(f"h3 res{res} counts sum to map_lite", tot == ml_n, f"sum={tot} vs map_lite={ml_n}") + + # --- 9. schema/types --- + sch = [(r[0], r[1]) for r in con.sql(f"DESCRIBE SELECT * FROM {F}").fetchall()] + check("facets schema matches contract", sch == EXPECTED_SCHEMA["facets"], f"got {sch}") + + # --- 10. sanity floor --- + total, mat = con.sql(f"SELECT COUNT(*), COUNT(material) FROM {F}").fetchone() + check("facets non-empty", total >= a.min_rows, f"{total:,} rows (min {a.min_rows:,})") + + # --- informational (not failing): context/object_type root presence --- + for dim, root in [("context", "https://w3id.org/isample/vocabulary/sampledfeature/1.0/anysampledfeature"), + ("object_type", "https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/materialsample")]: + n = scalar(f"SELECT COUNT(*) FROM {F} WHERE {dim}='{root}'") + info.append(f"{dim} root-concept rows: {n:,} (informational; root-dropping deferred)") + + print(f"\n{'CHECK':<44} {'RESULT':<6} DETAIL\n" + "-" * 90) + ok = True + for name, passed, detail in R: + ok = ok and passed + print(f"{name:<44} {'PASS' if passed else 'FAIL':<6} {detail}") + print("-" * 90) + for line in info: + print(" info:", line) + print("\n" + ("ALL CHECKS PASS" if ok else "FAILURES PRESENT")) + sys.exit(0 if ok else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_frontend_derived.py b/tests/test_frontend_derived.py new file mode 100644 index 00000000..10d13691 --- /dev/null +++ b/tests/test_frontend_derived.py @@ -0,0 +1,161 @@ +"""Fast, AI-free fixture tests for the derived-parquet pipeline. + +Builds tiny synthetic `wide` parquet files (both WKB-BLOB and DuckDB-GEOMETRY +geometry encodings), runs the real builder + validator against them, and asserts +the contract — especially the cases that bit us in production: + - geometry stored as BLOB *or* GEOMETRY (the silent BLOB-only contract bug) + - material = first NON-ROOT concept; root-only -> NULL (#265/#271) + - missing concept row-id / NULL array -> NULL (no crash) + - place_name serialization; pid uniqueness; located-only scoping + - CLI fails loudly on unknown --only + +Run: pytest tests/test_frontend_derived.py -q (needs: duckdb, h3, spatial) +""" +import os, subprocess, sys +import duckdb +import pytest + +HERE = os.path.dirname(os.path.abspath(__file__)) +REPO = os.path.dirname(HERE) +BUILD = os.path.join(REPO, "scripts", "build_frontend_derived.py") +VALIDATE = os.path.join(REPO, "scripts", "validate_frontend_derived.py") + +MAT = "https://w3id.org/isample/vocabulary/material/1.0/" +ROOT = MAT + "material" + +# concept row_id -> uri +CONCEPTS = [ + (1, ROOT), # material root (must never be selected) + (2, MAT + "mineral"), + (3, MAT + "rock"), + (4, MAT + "anthropogenicmetal"), + (10, "https://w3id.org/isample/vocabulary/sampledfeature/1.0/earthinterior"), + (20, "https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/othersolidobject"), +] + +# (pid, material_array, expected_material_tail) — context=[10], object_type=[20], valid geometry +SAMPLES = [ + ("m-root-first", [1, 2, 3], "rock"), # root first -> first NON-root in order = mineral? -> see note + ("m-real-first", [4, 1], "anthropogenicmetal"), # real first preserved + ("m-root-only", [1], None), # root only -> NULL + ("m-null-array", None, None), # no material -> NULL + ("m-missing-id", [999], None), # dangling row-id -> NULL +] +# NOTE on m-root-first [1,2,3] = [material, mineral, rock]: builder takes the +# FIRST non-root by array order -> 'mineral'. We assert exactly that below. +EXPECTED = { + "m-root-first": MAT + "mineral", + "m-real-first": MAT + "anthropogenicmetal", + "m-root-only": None, + "m-null-array": None, + "m-missing-id": None, +} + + +def _arr(xs): + return "NULL::BIGINT[]" if xs is None else "[" + ",".join(str(x) for x in xs) + "]::BIGINT[]" + + +def build_fixture_wide(path, geom_mode): + """Write a tiny wide parquet. geom_mode in {'blob','geometry'}.""" + con = duckdb.connect() + con.execute("INSTALL spatial; LOAD spatial;") + geom = (lambda lng, lat: f"ST_AsWKB(ST_Point({lng},{lat}))") if geom_mode == "blob" \ + else (lambda lng, lat: f"ST_Point({lng},{lat})") + + ic_rows = " UNION ALL ".join( + f"SELECT 'IdentifiedConcept' AS otype, '{uri}' AS pid, {rid}::BIGINT AS row_id, NULL::VARCHAR AS n, " + f"NULL::VARCHAR AS label, NULL::VARCHAR AS description, NULL::VARCHAR[] AS place_name, " + f"NULL::TIMESTAMP AS result_time, NULL AS geometry, " + f"NULL::BIGINT[] AS p__has_material_category, NULL::BIGINT[] AS p__has_context_category, " + f"NULL::BIGINT[] AS p__has_sample_object_type" + for rid, uri in CONCEPTS) + + msr = [] + for i, (pid, marr, _) in enumerate(SAMPLES): + lng, lat = 10.0 + i, 40.0 + i + msr.append( + f"SELECT 'MaterialSampleRecord' AS otype, '{pid}' AS pid, NULL::BIGINT AS row_id, 'TEST' AS n, " + f"'label {pid}' AS label, 'desc {pid}' AS description, ['plc-{pid}','x''q']::VARCHAR[] AS place_name, " + f"NULL::TIMESTAMP AS result_time, {geom(lng, lat)} AS geometry, " + f"{_arr(marr)} AS p__has_material_category, [10]::BIGINT[] AS p__has_context_category, " + f"[20]::BIGINT[] AS p__has_sample_object_type") + # one NULL-geometry sample -> must be EXCLUDED from located outputs + msr.append( + "SELECT 'MaterialSampleRecord' AS otype, 'm-nogeo' AS pid, NULL::BIGINT AS row_id, 'TEST' AS n, " + "'l' AS label, 'd' AS description, NULL::VARCHAR[] AS place_name, NULL::TIMESTAMP AS result_time, " + "NULL AS geometry, [4]::BIGINT[] AS p__has_material_category, [10]::BIGINT[] AS p__has_context_category, " + "[20]::BIGINT[] AS p__has_sample_object_type") + + con.execute(f"COPY ({ic_rows} UNION ALL {' UNION ALL '.join(msr)}) " + f"TO '{path}' (FORMAT PARQUET)") + con.close() + + +def run_builder(wide, outdir, tag, extra=None): + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", outdir, "--tag", tag, + "--skip", "wide_h3", "--no-manifest"] + (extra or []) + return subprocess.run(cmd, capture_output=True, text=True) + + +@pytest.mark.parametrize("geom_mode", ["blob", "geometry"]) +def test_material_selection_and_geometry(tmp_path, geom_mode): + wide = str(tmp_path / f"wide_{geom_mode}.parquet") + build_fixture_wide(wide, geom_mode) + + # confirm the fixture actually stored the geometry type we intend to test + con = duckdb.connect(); con.execute("INSTALL spatial; LOAD spatial;") + gtype = con.sql(f"DESCRIBE SELECT geometry FROM read_parquet('{wide}')").fetchall()[0][1].upper() + + r = run_builder(wide, str(tmp_path), "t") + assert r.returncode == 0, f"builder failed ({gtype}):\n{r.stdout}\n{r.stderr}" + + facets = str(tmp_path / "t_sample_facets_v2.parquet") + rows = dict(con.sql(f"SELECT pid, material FROM read_parquet('{facets}')").fetchall()) + + # material selection contract + for pid, expected in EXPECTED.items(): + assert rows.get(pid) == expected, f"[{gtype}] {pid}: got {rows.get(pid)!r}, want {expected!r}" + # NULL-geometry sample excluded from located file + assert "m-nogeo" not in rows, f"[{gtype}] NULL-geometry sample leaked into facets" + # geometry decoded to correct coords in map_lite (m-root-first @ lng=10,lat=40) + ml = str(tmp_path / "t_samples_map_lite.parquet") + lat, lng = con.sql(f"SELECT latitude, longitude FROM read_parquet('{ml}') WHERE pid='m-root-first'").fetchone() + assert abs(lat - 40.0) < 1e-6 and abs(lng - 10.0) < 1e-6, f"[{gtype}] bad coords: {lat},{lng}" + + +def test_no_root_and_pid_uniqueness(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert run_builder(wide, str(tmp_path), "t").returncode == 0 + con = duckdb.connect() + facets = f"read_parquet('{tmp_path / 't_sample_facets_v2.parquet'}')" + assert con.sql(f"SELECT COUNT(*) FROM {facets} WHERE material='{ROOT}'").fetchone()[0] == 0 + dups = con.sql(f"SELECT COUNT(*) FROM (SELECT pid FROM {facets} GROUP BY pid HAVING COUNT(*)>1)").fetchone()[0] + assert dups == 0 + + +def test_place_name_serialized_and_quotes(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert run_builder(wide, str(tmp_path), "t").returncode == 0 + con = duckdb.connect() + pn = con.sql(f"SELECT place_name FROM read_parquet('{tmp_path / 't_sample_facets_v2.parquet'}') " + f"WHERE pid='m-real-first'").fetchone()[0] + assert isinstance(pn, str) and "plc-m-real-first" in pn # VARCHAR, not array, embedded quote survived + + +def test_cli_rejects_unknown_only(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + r = run_builder(wide, str(tmp_path), "t", extra=["--only", "bogus_name"]) + assert r.returncode != 0 and "unknown" in (r.stdout + r.stderr).lower() + + +def test_algebraic_validator_passes_on_fixture(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + # full set incl. h3 so the validator's h3 checks run + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", + "--skip", "wide_h3", "--no-manifest"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + # --min-rows 1 for the tiny fixture; sentinel auto-skips when its pid is absent. + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert v.returncode == 0, f"validator failed on fixture:\n{v.stdout}\n{v.stderr}" From 2b95c1e1f174cd9ee7ce42cbb9e9dd59b158f28f Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Fri, 5 Jun 2026 18:02:16 -0700 Subject: [PATCH 3/5] pipeline: semantic --wide trust gate + hard-fail dup keys + honest determinism (Codex round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex PROVED the validator passed a wrecked rebuild (corrupted material/coords/H3 with self-consistent summaries -> exit 0). Fixes: - validate_frontend_derived.py --wide: re-derive from the source wide and EXCEPT-diff the written facets/map_lite/h3 — catches corruption/stale/ wrong-version that internal consistency cannot. Proven by a new test that corrupts coords (passes internal checks, FAILS the --wide gate). Passes on the real 202604 rebuild. - builder HARD-fails on duplicate pids / duplicate concept row_ids (was a warning) - --threads option; determinism claim made honest: facets/map_lite/summaries/ cross_filter are byte-identical run-to-run (verified); float h3 centroids are display-only (compared on discrete cols only). - tests: semantic-gate-catches-corruption, dup-pid-hard-fail, manifest, wide_h3 (10 total) - docs: SERIALIZATIONS deployed-file caveat (202601 still has root rows) vs builder contract; DATA_PROVENANCE wide_h3 coverage precise. Co-Authored-By: Claude Opus 4.8 (1M context) --- DATA_PROVENANCE.md | 2 +- SERIALIZATIONS.md | 11 ++-- scripts/build_frontend_derived.py | 22 ++++++-- scripts/validate_frontend_derived.py | 45 ++++++++++++++++ tests/test_frontend_derived.py | 79 ++++++++++++++++++++++++++-- 5 files changed, 147 insertions(+), 12 deletions(-) diff --git a/DATA_PROVENANCE.md b/DATA_PROVENANCE.md index 5f2e41a7..4b795ce9 100644 --- a/DATA_PROVENANCE.md +++ b/DATA_PROVENANCE.md @@ -37,7 +37,7 @@ DuckDB-WASM in the browser (explorer.qmd; parquet URLs ~L767-781) | **0/1 Export** | Solr API → `isamples_export_*_geo.parquet` | `export_client` `ExportClient.perform_full_download()` (`export_client.py:423-469`) → `write_geoparquet_from_json_lines()`; schema `SOURCE_COLUMNS` (`duckdb_utilities.py:9-42`, incl. `keywords: STRUCT(keyword VARCHAR)[]` — **text only, no URI**, L17) | ❌ API offline; **frozen** | | **2 Base PQG** | export → `*_narrow.parquet` / `*_wide.parquet` | `pqg/pqg/sql_converter.py` `convert_isamples_sql(input, output, wide=…)` (CLI `python pqg/sql_converter.py in.parquet out.parquet [--wide]`); 7 stages, decomposes nested structs → nodes+edges; site dedupe by rounded lat/lon+label | ✅ scripted (exact prod invocation not recorded — gap) | | **3 Sidecar merge** | base wide + Eric's OC PQG → `isamples_202604_wide.parquet` | `scripts/enrich_wide_with_oc_thumbnails.py` — `LEFT JOIN` OC `(pid, thumbnail_url)` into wide (`COALESCE`). **This is the precedent for merging ANY per-source supplement (incl. concept URIs) by pid.** Drift check: `scripts/check_oc_pqg_drift.py` (detects only; no mirror) | ⚠️ merge scripted; OC mirror + R2 upload manual | -| **4 Frontend derived** | wide → 7 explorer files | The 6 map/facet files (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) ← **`scripts/build_frontend_derived.py`** (deterministic; geometry-agnostic; emits a manifest). `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs). Gated by `scripts/validate_frontend_derived.py` (algebraic) + `tests/test_frontend_derived.py` (fixtures, CI). | ✅ scripted + tested | +| **4 Frontend derived** | wide → 7 explorer files | The 6 map/facet files (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) ← **`scripts/build_frontend_derived.py`** (deterministic; geometry-agnostic; emits a manifest). `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs). Gated by `scripts/validate_frontend_derived.py` (algebraic + `--wide` semantic re-derivation) + `tests/test_frontend_derived.py` (fixtures, CI). | ✅ scripted; facet/map files semantic-tested; wide_h3 column-smoke-tested | | **5 Publish** | files → R2 + Worker | Worker `workers/data-isamples-org/src/index.js` (`wrangler deploy`); immutable cache for `isamples_\d{6}_*.parquet`; `/current/.parquet` → 302 via `current/manifest.json`. Bucket `isamples-ry` | ⚠️ Worker scripted; **file upload + manifest update are manual** | ## The sidecar/enrichment pattern (how new data gets in) diff --git a/SERIALIZATIONS.md b/SERIALIZATIONS.md index 54f9d6cb..b5a9be53 100644 --- a/SERIALIZATIONS.md +++ b/SERIALIZATIONS.md @@ -77,9 +77,12 @@ vocab_labels.parquet (58 KB, 537 SKOS concepts) └─► consumed by Search Explorer to render facet URIs as prefLabels ``` -Arrows indicate derivation, not containment. Every file in the left -column can be rebuilt from its parent by a script in -`isamples-python/` or `isamplesorg.github.io/scripts/`. +Arrows indicate derivation, not containment. The Stage-4 frontend-derived +files are rebuilt by `isamplesorg.github.io/scripts/build_frontend_derived.py` +(+ `build_vocab_labels.py`); the Stage-2 narrow/wide files are rebuilt by +`pqg/`. Note: the **currently deployed** `isamples_202601_*` files predate that +builder — a fresh build is NOT bit-for-bit identical to them (see +`DATA_PROVENANCE.md`, "deployed 202601 not reproducible"). ## 3. Catalog @@ -247,6 +250,8 @@ for the alias when you want "latest." ### 4.8 `isamples_202601_sample_facets_v2.parquet` +> ⚠️ **Deployed-file caveat:** the live `isamples_202601_sample_facets_v2.parquet` still contains **346,768** bare-root "Material" rows — it predates the #271 selection rule below. The rule describes the **builder contract** for the next rebuild (verified to drop the root → 0), not the file currently served. + - **Role**: Cross-dimension facet filtering — one row per sample, each facet column holds a single controlled-vocabulary URI. - **Headline schema** (8 cols, all VARCHAR): `pid, source, material, context, object_type, label, description, place_name`. `material`/`context`/`object_type` are scalar URI strings, NOT arrays — one row per sample, so a sample tagged with multiple URIs is represented by a single chosen URI. **Selection rule:** `material` = the **first NON-ROOT** concept in the array (the broad root `.../material/1.0/material` is dropped — #265/#271); root-only samples → NULL material. This is **NOT** necessarily the leaf/most-specific concept (the arrays are not clean SKOS paths). `context`/`object_type` = the first array element (`[1]`). `place_name` is a VARCHAR cast of the wide's `VARCHAR[]` (note: `samples_map_lite` keeps `place_name` as `VARCHAR[]`). For multi-value accuracy, JOIN back to `wide.p__has_*_category`. - **Query pattern**: `WHERE material = ''` for exact match; `WHERE material ILIKE '%rock%'` to substring-match URI fragments. diff --git a/scripts/build_frontend_derived.py b/scripts/build_frontend_derived.py index 1a5ffb48..b4f10391 100755 --- a/scripts/build_frontend_derived.py +++ b/scripts/build_frontend_derived.py @@ -29,8 +29,11 @@ selection — see DATA_PROVENANCE.md. context/object_type use the first array element ([1]); their root-dropping is deferred (tracked in the pipeline epic). -Determinism: every COPY has an ORDER BY; dominant_source ties break on source -name (ASC); center lat/lng are rounded to 6 dp. +Determinism: row order and all DISCRETE values are deterministic (ORDER BY on +every COPY; dominant_source ties broken by source name ASC; non-unique keys are +a hard error). Floating centroids (center_lat/lng) are rounded to 6 dp and are +display-only — not part of the reproducibility guarantee; pass --threads 1 for +bit-stable centroids across machines. Usage: python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag isamples_202606 @@ -122,9 +125,14 @@ def build_base_tables(con, wide, t0): n_samp = con.sql("SELECT COUNT(*) FROM samp").fetchone()[0] n_geo = con.sql("SELECT COUNT(*) FROM samp_geo").fetchone()[0] n_dup = con.sql("SELECT COUNT(*) FROM (SELECT pid FROM samp_geo GROUP BY pid HAVING COUNT(*)>1)").fetchone()[0] - log(f"samp={n_samp:,} samp_geo={n_geo:,} duplicate_pids={n_dup:,}", t0) - if n_dup: - print(f"WARNING: {n_dup:,} duplicate pids in samp_geo — facet counts/joins may inflate", flush=True) + n_icdup = con.sql("SELECT COUNT(*) FROM (SELECT row_id FROM ic GROUP BY row_id HAVING COUNT(*)>1)").fetchone()[0] + log(f"samp={n_samp:,} samp_geo={n_geo:,} duplicate_pids={n_dup:,} duplicate_concept_row_ids={n_icdup:,}", t0) + if n_dup or n_icdup: + # HARD fail (Codex): non-unique keys make the output grain wrong (inflated + # facet counts, ambiguous joins, non-total ORDER BY pid). Abort before writing. + raise SystemExit( + f"FATAL: non-unique keys — duplicate_pids={n_dup}, duplicate_concept_row_ids={n_icdup}. " + f"Output grain/joins would be wrong; refusing to write.") def build_sample_facets_v2(con, out): @@ -249,6 +257,8 @@ def main(): ap.add_argument("--only", default="", help=f"comma list of: {','.join(ARTIFACTS)}") ap.add_argument("--skip", default="", help="comma list of the same names to skip") ap.add_argument("--no-manifest", action="store_true", help="skip writing {tag}_manifest.json") + ap.add_argument("--threads", type=int, default=0, + help="DuckDB thread count; set 1 for bit-stable floating centroids (slower)") args = ap.parse_args() only = set(filter(None, args.only.split(","))) @@ -261,6 +271,8 @@ def main(): t0 = time.time() con = duckdb.connect() + if args.threads: + con.execute(f"PRAGMA threads={args.threads}") con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") log("building base sample tables…", t0) build_base_tables(con, args.wide, t0) diff --git a/scripts/validate_frontend_derived.py b/scripts/validate_frontend_derived.py index f24ee3f3..3004aea7 100755 --- a/scripts/validate_frontend_derived.py +++ b/scripts/validate_frontend_derived.py @@ -34,6 +34,8 @@ def main(): ap.add_argument("--facets"); ap.add_argument("--map-lite") ap.add_argument("--summaries"); ap.add_argument("--cross-filter") ap.add_argument("--h3", nargs=3, metavar=("R4", "R6", "R8")) + ap.add_argument("--wide", help="source wide parquet — enables the SEMANTIC gate " + "(re-derive and diff the written files against a fresh build)") ap.add_argument("--min-rows", type=int, default=1_000_000, help="floor for the non-empty sanity check (use 1 for fixtures)") a = ap.parse_args() @@ -152,6 +154,49 @@ def scalar(sql): total, mat = con.sql(f"SELECT COUNT(*), COUNT(material) FROM {F}").fetchone() check("facets non-empty", total >= a.min_rows, f"{total:,} rows (min {a.min_rows:,})") + # --- 11. SEMANTIC gate vs the source wide (the REAL trust gate) --- + # Internal-consistency checks (1-10) pass even on a wrecked-but-self-consistent + # rebuild (proven). Re-derive from the wide with the SAME builder logic and + # assert the WRITTEN files match it. Catches corrupted material/coords/H3, + # stale files, and wrong-version artifacts. + if a.wide: + sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + import build_frontend_derived as B + con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;") + B.build_base_tables(con, a.wide, 0.0) # builds samp_geo; hard-fails on dup keys + + def except_diff(asql, bsql): + return scalar(f"SELECT (SELECT COUNT(*) FROM (({asql}) EXCEPT ({bsql}))) " + f"+ (SELECT COUNT(*) FROM (({bsql}) EXCEPT ({asql})))") + + ref_facets = ("SELECT pid, source, material, context, object_type, label, description, " + "place_name::VARCHAR AS place_name FROM samp_geo") + file_facets = f"SELECT pid, source, material, context, object_type, label, description, place_name FROM {F}" + check("facets == fresh build from --wide", except_diff(ref_facets, file_facets) == 0, + "facets rows differ from a re-derivation off the wide (corruption/stale/wrong-version)") + + ref_ml = ("SELECT pid, label, source, latitude, longitude, result_time, " + "h3_res8::UBIGINT AS h3_res8, h3_h3_to_string(h3_res8) AS h3_res8_hex, " + "place_name::VARCHAR AS pn FROM samp_geo") + file_ml = (f"SELECT pid, label, source, latitude, longitude, result_time, h3_res8, h3_res8_hex, " + f"place_name::VARCHAR AS pn FROM {ML}") + check("map_lite == fresh build from --wide", except_diff(ref_ml, file_ml) == 0, + "map_lite coords/h3/place_name differ from a re-derivation off the wide") + + if h3: + for res, hp in zip((4, 6, 8), h3): + ref_h3 = (f"WITH sc AS (SELECT h3_res{res} AS cell, source, COUNT(*) c FROM samp_geo " + f"WHERE h3_res{res} IS NOT NULL GROUP BY h3_res{res}, source), " + f"dom AS (SELECT cell, source AS ds, ROW_NUMBER() OVER (PARTITION BY cell ORDER BY c DESC, source ASC) rn FROM sc), " + f"agg AS (SELECT h3_res{res} AS cell, COUNT(*) sc2, COUNT(DISTINCT source) srcc FROM samp_geo " + f"WHERE h3_res{res} IS NOT NULL GROUP BY h3_res{res}) " + f"SELECT agg.cell::UBIGINT h3_cell, agg.sc2::INTEGER sample_count, dom.ds AS dominant_source, " + f"agg.srcc::INTEGER source_count FROM agg JOIN dom ON dom.cell=agg.cell AND dom.rn=1") + # discrete cols only (float centroids are display-only / thread-dependent) + file_h3 = f"SELECT h3_cell, sample_count, dominant_source, source_count FROM read_parquet('{hp}')" + check(f"h3 res{res} == fresh build from --wide", except_diff(ref_h3, file_h3) == 0, + "h3 cells/counts/dominant_source differ from a re-derivation off the wide") + # --- informational (not failing): context/object_type root presence --- for dim, root in [("context", "https://w3id.org/isample/vocabulary/sampledfeature/1.0/anysampledfeature"), ("object_type", "https://w3id.org/isample/vocabulary/materialsampleobjecttype/1.0/materialsample")]: diff --git a/tests/test_frontend_derived.py b/tests/test_frontend_derived.py index 10d13691..07d382c4 100644 --- a/tests/test_frontend_derived.py +++ b/tests/test_frontend_derived.py @@ -155,7 +155,80 @@ def test_algebraic_validator_passes_on_fixture(tmp_path): cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", "--skip", "wide_h3", "--no-manifest"] assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 - # --min-rows 1 for the tiny fixture; sentinel auto-skips when its pid is absent. - v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], - capture_output=True, text=True) + # --min-rows 1 for the tiny fixture; sentinel auto-skips when its pid is absent; + # --wide exercises the SEMANTIC gate (must still pass on a clean rebuild). + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) assert v.returncode == 0, f"validator failed on fixture:\n{v.stdout}\n{v.stderr}" + + +def _build(tmp_path, wide, tag="t", extra=None): + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", tag, + "--skip", "wide_h3", "--no-manifest"] + (extra or []) + return subprocess.run(cmd, capture_output=True, text=True) + + +def test_semantic_gate_catches_corruption_that_internal_checks_miss(tmp_path): + """The whole point (Codex's attack): corrupt map_lite coords so internal + consistency still holds, but the --wide semantic gate must FAIL.""" + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + ml = str(tmp_path / "t_samples_map_lite.parquet") + + # corrupt: zero out every latitude (passes pid-set/uniqueness/h3-sum checks) + con = duckdb.connect() + tmp_ml = ml + ".tmp" + con.execute(f"""COPY (SELECT pid, label, source, 0.0::DOUBLE AS latitude, longitude, + place_name, result_time, h3_res8, h3_res8_hex FROM read_parquet('{ml}')) + TO '{tmp_ml}' (FORMAT PARQUET)""") + con.close(); os.replace(tmp_ml, ml) + + # internal-only validator: still PASSES (the hole Codex exploited) + internal = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert internal.returncode == 0, "expected internal-only checks to miss coord corruption" + + # semantic gate (--wide): must now FAIL + semantic = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert semantic.returncode != 0, f"semantic gate failed to catch coord corruption:\n{semantic.stdout}" + assert "map_lite == fresh build" in semantic.stdout + + +def test_duplicate_pid_hard_fails(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + con = duckdb.connect(); con.execute("INSTALL spatial; LOAD spatial;") + # append a duplicate of an existing located pid + dup = str(tmp_path / "wide_dup.parquet") + con.execute(f"""COPY ( + SELECT * FROM read_parquet('{wide}') + UNION ALL + SELECT * FROM read_parquet('{wide}') WHERE pid='m-real-first' + ) TO '{dup}' (FORMAT PARQUET)""") + con.close() + r = _build(tmp_path, dup) + assert r.returncode != 0 and "non-unique" in (r.stdout + r.stderr).lower(), \ + f"builder should hard-fail on duplicate pids:\n{r.stdout}\n{r.stderr}" + + +def test_manifest_emitted(tmp_path): + import json + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", "--skip", "wide_h3"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + man = tmp_path / "t_manifest.json" + assert man.exists() + m = json.loads(man.read_text()) + assert m["input"]["sha256"] and m["duckdb_version"] and m["outputs"] + assert any("sample_facets_v2" in k for k in m["outputs"]) + assert all("sha256" in v and "rows" in v for v in m["outputs"].values()) + + +def test_wide_h3_builds_with_h3_columns(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", + "--only", "wide_h3", "--no-manifest"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + con = duckdb.connect() + cols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{tmp_path / 't_wide_h3.parquet'}')").fetchall()] + assert {"h3_res4", "h3_res6", "h3_res8"} <= set(cols) From e9a5952c2d0cf8c6e211dd5d62fbc06cb56f88a4 Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Fri, 5 Jun 2026 18:09:21 -0700 Subject: [PATCH 4/5] =?UTF-8?q?pipeline:=20close=20remaining=20gate=20gaps?= =?UTF-8?q?=20=E2=80=94=20h3=20resolution+centers,=20scheme,=20wide=5Fh3?= =?UTF-8?q?=20correctness=20(Codex=20round=203)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - validator --wide now also diffs h3 resolution (exact) + center_lat/lng (tolerant 1e-4: catches gross corruption, ignores float/thread last-ULP jitter) - facet_summaries.scheme contract checked (must be NULL) - wide_h3 cell correctness test (cross-checked vs map_lite) - tests prove h3 center/resolution corruption + scheme corruption are caught (12 total) Verified: 12 passed; real --wide gate exits 0 on the 202604 rebuild with the new checks; h3 center delta 1e-6 (well within 1e-4). Co-Authored-By: Claude Opus 4.8 (1M context) --- scripts/validate_frontend_derived.py | 24 ++++++++++++------ tests/test_frontend_derived.py | 37 ++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/scripts/validate_frontend_derived.py b/scripts/validate_frontend_derived.py index 3004aea7..f88325d9 100755 --- a/scripts/validate_frontend_derived.py +++ b/scripts/validate_frontend_derived.py @@ -103,6 +103,8 @@ def scalar(sql): UNION ALL SELECT * FROM summ EXCEPT SELECT * FROM recomputed)""") check("facet_summaries == GROUP BY facets", mismatch == 0, f"{mismatch} (facet_type,value,count) rows disagree") + check("facet_summaries.scheme all NULL", scalar(f"SELECT COUNT(*) FROM {S} WHERE scheme IS NOT NULL") == 0, + "non-NULL scheme rows (contract: scheme is NULL)") # --- 6. ALGEBRA: facet_cross_filter single-dim rows == conditional GROUP BY facets --- dims = ("source", "material", "context", "object_type") @@ -188,14 +190,22 @@ def except_diff(asql, bsql): ref_h3 = (f"WITH sc AS (SELECT h3_res{res} AS cell, source, COUNT(*) c FROM samp_geo " f"WHERE h3_res{res} IS NOT NULL GROUP BY h3_res{res}, source), " f"dom AS (SELECT cell, source AS ds, ROW_NUMBER() OVER (PARTITION BY cell ORDER BY c DESC, source ASC) rn FROM sc), " - f"agg AS (SELECT h3_res{res} AS cell, COUNT(*) sc2, COUNT(DISTINCT source) srcc FROM samp_geo " + f"agg AS (SELECT h3_res{res} AS cell, COUNT(*) sc2, COUNT(DISTINCT source) srcc, " + f"ROUND(AVG(latitude),6) clat, ROUND(AVG(longitude),6) clng FROM samp_geo " f"WHERE h3_res{res} IS NOT NULL GROUP BY h3_res{res}) " - f"SELECT agg.cell::UBIGINT h3_cell, agg.sc2::INTEGER sample_count, dom.ds AS dominant_source, " - f"agg.srcc::INTEGER source_count FROM agg JOIN dom ON dom.cell=agg.cell AND dom.rn=1") - # discrete cols only (float centroids are display-only / thread-dependent) - file_h3 = f"SELECT h3_cell, sample_count, dominant_source, source_count FROM read_parquet('{hp}')" - check(f"h3 res{res} == fresh build from --wide", except_diff(ref_h3, file_h3) == 0, - "h3 cells/counts/dominant_source differ from a re-derivation off the wide") + f"SELECT agg.cell::UBIGINT h3_cell, agg.sc2::INTEGER sample_count, agg.clat center_lat, " + f"agg.clng center_lng, dom.ds AS dominant_source, agg.srcc::INTEGER source_count, " + f"{res}::INTEGER resolution FROM agg JOIN dom ON dom.cell=agg.cell AND dom.rn=1") + FH = f"read_parquet('{hp}')" + # discrete cols exact (now incl. resolution) + disc_ref = f"SELECT h3_cell, sample_count, dominant_source, source_count, resolution FROM ({ref_h3})" + disc_file = f"SELECT h3_cell, sample_count, dominant_source, source_count, resolution FROM {FH}" + check(f"h3 res{res} discrete == fresh build", except_diff(disc_ref, disc_file) == 0, + "h3 cells/counts/dominant_source/resolution differ from re-derivation off the wide") + # centers: tolerant (float/thread last-ULP jitter ok; gross corruption like 0,0 caught) + cdiff = scalar(f"SELECT COALESCE(MAX(GREATEST(ABS(f.center_lat-r.center_lat), " + f"ABS(f.center_lng-r.center_lng))), 0) FROM {FH} f JOIN ({ref_h3}) r ON f.h3_cell=r.h3_cell") + check(f"h3 res{res} centers within 1e-4", cdiff <= 1e-4, f"max center delta {cdiff}") # --- informational (not failing): context/object_type root presence --- for dim, root in [("context", "https://w3id.org/isample/vocabulary/sampledfeature/1.0/anysampledfeature"), diff --git a/tests/test_frontend_derived.py b/tests/test_frontend_derived.py index 07d382c4..1eee1d3d 100644 --- a/tests/test_frontend_derived.py +++ b/tests/test_frontend_derived.py @@ -224,11 +224,44 @@ def test_manifest_emitted(tmp_path): assert all("sha256" in v and "rows" in v for v in m["outputs"].values()) -def test_wide_h3_builds_with_h3_columns(tmp_path): +def test_wide_h3_cells_match_map_lite(tmp_path): wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 # builds map_lite (skips wide_h3) cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", "--only", "wide_h3", "--no-manifest"] assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 con = duckdb.connect() - cols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{tmp_path / 't_wide_h3.parquet'}')").fetchall()] + wh3 = f"read_parquet('{tmp_path / 't_wide_h3.parquet'}')" + ml = f"read_parquet('{tmp_path / 't_samples_map_lite.parquet'}')" + cols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM {wh3}").fetchall()] assert {"h3_res4", "h3_res6", "h3_res8"} <= set(cols) + # CORRECTNESS: wide_h3 cells must agree with map_lite's for the same located pids + bad = con.sql(f"SELECT COUNT(*) FROM {wh3} w JOIN {ml} m ON w.pid=m.pid " + f"WHERE w.h3_res8 IS DISTINCT FROM m.h3_res8").fetchone()[0] + assert bad == 0, f"{bad} wide_h3 rows have h3_res8 disagreeing with map_lite" + + +def test_semantic_gate_catches_h3_center_and_resolution_corruption(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + h3f = str(tmp_path / "t_h3_summary_res4.parquet") + con = duckdb.connect() + tmp_h3 = h3f + ".tmp" + con.execute(f"""COPY (SELECT h3_cell, sample_count, 0.0::DOUBLE AS center_lat, center_lng, + dominant_source, source_count, 999::INTEGER AS resolution FROM read_parquet('{h3f}')) + TO '{tmp_h3}' (FORMAT PARQUET)"""); con.close(); os.replace(tmp_h3, h3f) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert v.returncode != 0 and "h3 res4" in v.stdout, f"gate missed h3 center/resolution corruption:\n{v.stdout}" + + +def test_scheme_corruption_caught(tmp_path): + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + sf = str(tmp_path / "t_facet_summaries.parquet") + con = duckdb.connect(); tmp_s = sf + ".tmp" + con.execute(f"""COPY (SELECT facet_type, facet_value, 7::INTEGER AS scheme, count + FROM read_parquet('{sf}')) TO '{tmp_s}' (FORMAT PARQUET)"""); con.close(); os.replace(tmp_s, sf) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert v.returncode != 0 and "scheme" in v.stdout, f"gate missed scheme corruption:\n{v.stdout}" From cd5bebfe2e8327b0b1c0ecd122aab72cbfb135f0 Mon Sep 17 00:00:00 2001 From: Raymond Yee Date: Fri, 5 Jun 2026 20:16:14 -0700 Subject: [PATCH 5/5] =?UTF-8?q?pipeline:=20close=20both=20adversary-found?= =?UTF-8?q?=20misses=20=E2=80=94=20tighten=20H3=20center=20tolerance=20+?= =?UTF-8?q?=20verify=20manifest=20(Codex/workflow=20round=204)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A proof workflow (independent re-exec + adversarial attack) found two real misses: 1. H3 centroids: shifting every cell center ~9m (8e-5 deg) passed the loose 1e-4 tolerance. Tightened to 1e-5 (~1m); residual undetected error now bounded at ~1m on display-only centroids. Re-running the exact attack now FAILS the gate. 2. manifest.json was never validated — corrupting its sha256 attestations passed. Validator now verifies every output file's sha256 (and the input's, with --wide) against the manifest. (Self-attesting, not signed — documented.) Both attacks re-run against the fixed gate now exit 1. Clean real rebuild still exits 0. 14 fixture tests (added regressions for both misses). Co-Authored-By: Claude Opus 4.8 (1M context) --- scripts/validate_frontend_derived.py | 49 ++++++++++++++++++++++++++-- tests/test_frontend_derived.py | 32 ++++++++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/scripts/validate_frontend_derived.py b/scripts/validate_frontend_derived.py index f88325d9..4cd8ad74 100755 --- a/scripts/validate_frontend_derived.py +++ b/scripts/validate_frontend_derived.py @@ -14,9 +14,17 @@ --facets URL --map-lite URL --summaries URL --cross-filter URL \ --h3 URL4 URL6 URL8 """ -import argparse, os, sys +import argparse, hashlib, json, os, sys import duckdb + +def sha256_file(path, _b=1 << 20): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(_b), b""): + h.update(chunk) + return h.hexdigest() + MATERIAL_ROOT = "https://w3id.org/isample/vocabulary/material/1.0/material" PID_K = "ark:/28722/k2p55x96j" # #260 sentinel: must not flip under #271 selection PID_K_EXPECTED = "https://w3id.org/isample/vocabulary/material/1.0/anthropogenicmetal" @@ -205,7 +213,44 @@ def except_diff(asql, bsql): # centers: tolerant (float/thread last-ULP jitter ok; gross corruption like 0,0 caught) cdiff = scalar(f"SELECT COALESCE(MAX(GREATEST(ABS(f.center_lat-r.center_lat), " f"ABS(f.center_lng-r.center_lng))), 0) FROM {FH} f JOIN ({ref_h3}) r ON f.h3_cell=r.h3_cell") - check(f"h3 res{res} centers within 1e-4", cdiff <= 1e-4, f"max center delta {cdiff}") + # tolerance 1e-5 (~1 m): absorbs the ~1e-6 round/thread jitter, catches + # any meaningful shift (an adversary's 8e-5/~9 m shift previously slipped + # through the old 1e-4). Residual undetected error is bounded at ~1 m on + # display-only cluster centroids. + check(f"h3 res{res} centers within 1e-5 (~1m)", cdiff <= 1e-5, f"max center delta {cdiff}") + + # --- 12. manifest integrity (when a {tag}_manifest.json is present) --- + # The build emits a provenance manifest (per-output + input sha256). Verify the + # written files match it, so the manifest is a guarded attestation, not decoration. + # NOTE: the manifest is self-attesting (not signed) — this catches accidental + # corruption, stale files, and tampering that didn't also rewrite the manifest; + # it does NOT defend against an attacker who rewrites file+manifest consistently. + if a.dir and a.tag: + man_path = os.path.join(a.dir, f"{a.tag}_manifest.json") + if os.path.exists(man_path): + try: + man = json.load(open(man_path)) + except Exception as e: + man = None + check("manifest parses", False, f"unreadable: {e}") + if man: + outs = man.get("outputs", {}) + check("manifest has outputs", bool(outs), "empty manifest.outputs") + bad = [] + for fname, m in outs.items(): + fp = os.path.join(a.dir, fname) + if not os.path.exists(fp): + bad.append(f"{fname}:missing") + elif sha256_file(fp) != m.get("sha256"): + bad.append(f"{fname}:sha256") + check("manifest sha256 matches output files", not bad, f"mismatches: {bad}") + if a.wide: + msha = man.get("input", {}).get("sha256") + if msha and msha != "remote/unhashed": + check("manifest input sha256 matches --wide", sha256_file(a.wide) == msha, + "wide sha256 != manifest.input.sha256") + else: + info.append(f"no {a.tag}_manifest.json (manifest verification skipped)") # --- informational (not failing): context/object_type root presence --- for dim, root in [("context", "https://w3id.org/isample/vocabulary/sampledfeature/1.0/anysampledfeature"), diff --git a/tests/test_frontend_derived.py b/tests/test_frontend_derived.py index 1eee1d3d..b97c1af1 100644 --- a/tests/test_frontend_derived.py +++ b/tests/test_frontend_derived.py @@ -255,6 +255,38 @@ def test_semantic_gate_catches_h3_center_and_resolution_corruption(tmp_path): assert v.returncode != 0 and "h3 res4" in v.stdout, f"gate missed h3 center/resolution corruption:\n{v.stdout}" +def test_h3_center_micro_shift_caught(tmp_path): + """Adversary shifted every H3 centroid ~9m (8e-5 deg) and passed the old 1e-4 + tolerance. The tightened 1e-5 gate must now catch it.""" + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + assert _build(tmp_path, wide).returncode == 0 + h3f = str(tmp_path / "t_h3_summary_res4.parquet") + con = duckdb.connect(); tmp_h3 = h3f + ".tmp" + con.execute(f"""COPY (SELECT h3_cell, sample_count, ROUND(center_lat+8e-5,6) AS center_lat, + ROUND(center_lng+8e-5,6) AS center_lng, dominant_source, source_count, resolution + FROM read_parquet('{h3f}')) TO '{tmp_h3}' (FORMAT PARQUET)"""); con.close(); os.replace(tmp_h3, h3f) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", + "--min-rows", "1", "--wide", wide], capture_output=True, text=True) + assert v.returncode != 0 and "centers within" in v.stdout, f"gate missed ~9m centroid shift:\n{v.stdout}" + + +def test_manifest_tamper_caught(tmp_path): + """Adversary corrupted manifest.json sha256s and the validator ignored it. + Manifest integrity must now be a gated check.""" + import json + wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") + cmd = [sys.executable, BUILD, "--wide", wide, "--outdir", str(tmp_path), "--tag", "t", "--skip", "wide_h3"] + assert subprocess.run(cmd, capture_output=True, text=True).returncode == 0 + man = tmp_path / "t_manifest.json" + m = json.loads(man.read_text()) + for k in m["outputs"]: + m["outputs"][k]["sha256"] = "deadbeef" * 8 + man.write_text(json.dumps(m)) + v = subprocess.run([sys.executable, VALIDATE, "--dir", str(tmp_path), "--tag", "t", "--min-rows", "1"], + capture_output=True, text=True) + assert v.returncode != 0 and "manifest sha256" in v.stdout, f"gate missed manifest tamper:\n{v.stdout}" + + def test_scheme_corruption_caught(tmp_path): wide = str(tmp_path / "wide.parquet"); build_fixture_wide(wide, "blob") assert _build(tmp_path, wide).returncode == 0