diff --git a/src/tinybird_sdk/generator/datasource.py b/src/tinybird_sdk/generator/datasource.py index 4947428..b3202aa 100644 --- a/src/tinybird_sdk/generator/datasource.py +++ b/src/tinybird_sdk/generator/datasource.py @@ -86,6 +86,12 @@ def _generate_engine_config(engine: EngineConfig | None) -> str: return get_engine_clause(engine) +def _generate_backfill(backfill: str | None) -> str | None: + if backfill == "skip": + return "BACKFILL skip" + return None + + def _generate_kafka_config(kafka: Any) -> str: lines = [ f"KAFKA_CONNECTION_NAME {kafka.connection._name}", @@ -164,6 +170,10 @@ def generate_datasource(datasource: DatasourceDefinition) -> GeneratedDatasource parts.append("") parts.append(_generate_engine_config(datasource.options.engine)) + backfill = _generate_backfill(datasource.options.backfill) + if backfill: + parts.append(backfill) + indexes = _generate_indexes(datasource.options.indexes) if indexes: parts.extend(["", indexes]) diff --git a/src/tinybird_sdk/migrate/emit_ts.py b/src/tinybird_sdk/migrate/emit_ts.py index 977f406..f9b304c 100644 --- a/src/tinybird_sdk/migrate/emit_ts.py +++ b/src/tinybird_sdk/migrate/emit_ts.py @@ -202,6 +202,9 @@ def _emit_datasource(ds: DatasourceModel) -> str: if ds.engine: lines.append(f" 'engine': {_emit_engine_options(ds.engine)},") + if ds.backfill: + lines.append(f" 'backfill': {_escape_string(ds.backfill)},") + if ds.indexes: lines.append(" 'indexes': [") for index in ds.indexes: diff --git a/src/tinybird_sdk/migrate/parse_datasource.py b/src/tinybird_sdk/migrate/parse_datasource.py index 62f2cfc..905e713 100644 --- a/src/tinybird_sdk/migrate/parse_datasource.py +++ b/src/tinybird_sdk/migrate/parse_datasource.py @@ -249,6 +249,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: shared_with: list[str] = [] description: str | None = None forward_query: str | None = None + backfill: str | None = None engine_type: str | None = None sorting_key: list[str] = [] @@ -394,6 +395,16 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: import_schedule = parse_quoted_value(value) elif key == "IMPORT_FROM_TIMESTAMP": import_from_timestamp = parse_quoted_value(value) + elif key == "BACKFILL": + normalized = value.strip().lower() + if normalized != "skip": + raise MigrationParseError( + resource.file_path, + "datasource", + resource.name, + f'Invalid BACKFILL value: "{value}"', + ) + backfill = "skip" elif key == "TOKEN": tokens.append(_parse_token(resource.file_path, resource.name, value)) else: @@ -495,6 +506,7 @@ def parse_datasource_file(resource: ResourceFile) -> DatasourceModel: ) if engine_type else None, + backfill=backfill, # type: ignore[arg-type] indexes=indexes, kafka=kafka, s3=imported, diff --git a/src/tinybird_sdk/migrate/types.py b/src/tinybird_sdk/migrate/types.py index f527311..1e64191 100644 --- a/src/tinybird_sdk/migrate/types.py +++ b/src/tinybird_sdk/migrate/types.py @@ -95,6 +95,7 @@ class DatasourceModel: columns: list[DatasourceColumnModel] engine: DatasourceEngineModel | None = None description: str | None = None + backfill: Literal["skip"] | None = None indexes: list[DatasourceIndexModel] = field(default_factory=list) kafka: DatasourceKafkaModel | None = None s3: DatasourceS3Model | None = None diff --git a/src/tinybird_sdk/schema/datasource.py b/src/tinybird_sdk/schema/datasource.py index ccb3883..a372471 100644 --- a/src/tinybird_sdk/schema/datasource.py +++ b/src/tinybird_sdk/schema/datasource.py @@ -2,7 +2,7 @@ import re from dataclasses import dataclass, field -from typing import Any +from typing import Any, Literal from .connection import GCSConnectionDefinition, KafkaConnectionDefinition, S3ConnectionDefinition from .engines import EngineConfig @@ -19,6 +19,7 @@ class ColumnDefinition: SchemaDefinition = dict[str, TypeValidator | ColumnDefinition] +BackfillOption = Literal["skip"] @dataclass(frozen=True, slots=True) @@ -74,6 +75,7 @@ class DatasourceOptions: schema: SchemaDefinition description: str | None = None engine: EngineConfig | None = None + backfill: BackfillOption | None = None tokens: tuple[TokenConfig, ...] = field(default_factory=tuple) shared_with: tuple[str, ...] = field(default_factory=tuple) json_paths: bool = True @@ -120,6 +122,7 @@ def define_datasource(name: str, options: dict[str, Any] | DatasourceOptions) -> description=options.get("description"), schema=options["schema"], engine=options.get("engine"), + backfill=options.get("backfill"), tokens=tokens, shared_with=shared_with, json_paths=options.get("json_paths", True), @@ -134,6 +137,9 @@ def define_datasource(name: str, options: dict[str, Any] | DatasourceOptions) -> if ingestion_count > 1: raise ValueError("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`.") + if normalized.backfill not in {None, "skip"}: + raise ValueError('Invalid datasource backfill value: only "skip" is supported.') + for index in normalized.indexes: if not index.name or any(char.isspace() for char in index.name): raise ValueError( diff --git a/tests/test_phase1_schema_generator_parity.py b/tests/test_phase1_schema_generator_parity.py index 15ba7a1..4e86c60 100644 --- a/tests/test_phase1_schema_generator_parity.py +++ b/tests/test_phase1_schema_generator_parity.py @@ -80,6 +80,28 @@ def test_generate_datasource_includes_indexes_and_store_raw_value() -> None: assert "KAFKA_STORE_RAW_VALUE True" in generated.content +def test_generate_datasource_includes_backfill_skip() -> None: + datasource = define_datasource( + "daily_page_visits", + { + "schema": { + "date": t.date(), + "page_url": t.string(), + "visits": t.uint64(), + }, + "backfill": "skip", + }, + ) + + generated = generate_datasource(datasource) + assert "BACKFILL skip" in generated.content + + +def test_define_datasource_rejects_unsupported_backfill_value() -> None: + with pytest.raises(ValueError, match="Invalid datasource backfill value"): + define_datasource("events", {"schema": {"id": t.int32()}, "backfill": "run"}) + + def test_define_datasource_validates_index_name_and_granularity() -> None: with pytest.raises(ValueError, match="Invalid datasource index name"): define_datasource( diff --git a/tests/test_phase3_migrate_parser_parity.py b/tests/test_phase3_migrate_parser_parity.py index b49556a..fa7c0a9 100644 --- a/tests/test_phase3_migrate_parser_parity.py +++ b/tests/test_phase3_migrate_parser_parity.py @@ -162,6 +162,43 @@ def test_parse_datasource_supports_import_directives() -> None: assert parsed.s3.from_timestamp == "2025-01-01 00:00:00" +def test_parse_datasource_supports_backfill_skip() -> None: + parsed = parse_datasource_file( + _resource( + "datasource", + "daily_page_visits", + "\n".join( + [ + "SCHEMA >", + " date Date", + " page_url String", + " visits UInt64", + "BACKFILL skip", + ] + ), + ) + ) + + assert parsed.backfill == "skip" + + +def test_parse_datasource_rejects_unsupported_backfill_value() -> None: + with pytest.raises(MigrationParseError, match="Invalid BACKFILL value"): + parse_datasource_file( + _resource( + "datasource", + "events", + "\n".join( + [ + "SCHEMA >", + " id Int64", + "BACKFILL run", + ] + ), + ) + ) + + def test_parse_pipe_supports_param_options_and_placeholder_normalization() -> None: parsed = parse_pipe_file( _resource( diff --git a/tests/test_phase4_migrate_runner_emitter_parity.py b/tests/test_phase4_migrate_runner_emitter_parity.py index 6ec1af6..e7148d8 100644 --- a/tests/test_phase4_migrate_runner_emitter_parity.py +++ b/tests/test_phase4_migrate_runner_emitter_parity.py @@ -39,6 +39,7 @@ def test_emit_migration_includes_phase4_connection_and_datasource_fields() -> No ver="version", is_deleted="is_deleted", ), + backfill="skip", indexes=[ DatasourceIndexModel( name="idx_id", @@ -58,6 +59,7 @@ def test_emit_migration_includes_phase4_connection_and_datasource_fields() -> No assert "'schema_registry_url': \"https://registry.example.com\"" in emitted assert "'is_deleted': \"is_deleted\"" in emitted + assert "'backfill': \"skip\"" in emitted assert "'indexes': [" in emitted assert "'store_raw_value': True" in emitted @@ -228,3 +230,31 @@ def test_run_migrate_emits_default_expr_for_sql_function_defaults(tmp_path: Path assert result.output_content is not None assert '\'id\': t.uuid().default_expr("generateUUIDv4()"),' in result.output_content assert "'payload': t.string().default('{}')," in result.output_content + + +def test_run_migrate_emits_backfill_skip(tmp_path: Path) -> None: + (tmp_path / "daily_page_visits.datasource").write_text( + "\n".join( + [ + "SCHEMA >", + " date Date", + " page_url String", + " visits UInt64", + "BACKFILL skip", + ] + ), + encoding="utf-8", + ) + + result = run_migrate( + { + "cwd": str(tmp_path), + "patterns": ["*.datasource"], + "dry_run": True, + } + ) + + assert result.success is True + assert result.errors == [] + assert result.output_content is not None + assert "'backfill': \"skip\"" in result.output_content