Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1548,3 +1548,72 @@ log.tools.csv_read_error = Error reading CSV file: %1$s

# CsvSourceReader — error closing CSV reader
log.tools.csv_close_reader_error = Error closing CSV reader

# --- spark connector ERROR messages ---

error.spark.write_table_required = Writing TsFile table model requires option "table"
error.spark.write_tag_columns_required = Writing TsFile table model requires option "tagColumns"
error.spark.null_tag_policy_unsupported = Only nullTagPolicy=error is supported
error.spark.unsupported_option = Unsupported TsFile connector option: %1$s
error.spark.model_unsupported = Only TsFile table model is supported, but option "model" was %1$s
error.spark.multiple_load_paths_unsupported = Multiple Spark load paths are not supported yet; pass a directory or glob path instead
error.spark.path_required = TsFile connector requires a path
error.spark.positive_option_required = %1$s must be positive
error.spark.timestamp_as_invalid = timestampAs must be either long or timestamp
error.spark.timestamp_precision_invalid = timestampPrecision must be one of ms, us, or ns
error.spark.column_list_empty = Column list option must not be empty
error.spark.no_tsfile_files = No TsFile files found under path: %1$s
error.spark.multiple_tables_requires_table = TsFile %1$s contains multiple tables; specify option "table"
error.spark.table_not_found_in_file = TsFile file %1$s does not contain table %2$s
error.spark.read_table_metadata_failed = Failed to read TsFile table metadata from %1$s
error.spark.incompatible_table_name = Incompatible TsFile table schema in %1$s: expected table %2$s but found %3$s
error.spark.incompatible_column_count = Incompatible TsFile table schema in %1$s: column count differs
error.spark.incompatible_table_schema_column = Incompatible TsFile table schema in %1$s at column %2$s: expected %3$s/%4$s/%5$s but found %6$s/%7$s/%8$s
error.spark.incompatible_tag_column_count = Incompatible TsFile table schema in %1$s: TAG column count differs
error.spark.incompatible_tag_column = Incompatible TsFile table schema in %1$s at TAG column %2$s: expected %3$s/%4$s but found %5$s/%6$s
error.spark.incompatible_field_column_type = Incompatible TsFile table schema in %1$s: FIELD column %2$s has type %4$s but expected %3$s
error.spark.path_not_exist = TsFile input path does not exist: %1$s
error.spark.discover_path_failed = Failed to discover TsFile input path: %1$s
error.spark.input_not_tsfile = Input path is not a .tsfile file: %1$s
error.spark.local_paths_only = Only local TsFile paths are supported: %1$s
error.spark.unknown_projected_column = Unknown projected TsFile table column: %1$s
error.spark.time_tag_projection_requires_field = Time/TAG-only projection requires at least one FIELD column in the TsFile table
error.spark.unsupported_time_filter_literal = Unsupported time filter literal: %1$s
error.spark.duplicate_tsfile_column = Duplicate TsFile table column after lower-case normalization: %1$s
error.spark.duplicate_spark_schema_column = Duplicate Spark schema column after lower-case normalization: %1$s
error.spark.tag_column_must_string_tsfile = TAG column %1$s.%2$s must be TSDataType.STRING, but was %3$s
error.spark.unsupported_column_category = Column category %1$s is not supported for %2$s.%3$s
error.spark.unknown_tsfile_column = Unknown TsFile table column: %1$s
error.spark.time_column_not_null = timeColumn must not be null
error.spark.tag_column_not_null = TAG column must not be null: %1$s
error.spark.unsupported_tsfile_type_write = Unsupported TsFile data type for write: %1$s
error.spark.write_tablet_failed = Failed to write TsFile tablet
error.spark.time_column_missing = Time column does not exist: %1$s
error.spark.time_column_type_invalid = Time column must be LongType or TimestampType
error.spark.duplicate_tag_columns = Duplicate tagColumns after lower-case normalization
error.spark.time_column_in_tag_columns = timeColumn must not be listed in tagColumns
error.spark.duplicate_field_columns = Duplicate fieldColumns after lower-case normalization
error.spark.time_column_in_field_columns = timeColumn must not be listed in fieldColumns
error.spark.column_both_tag_field = Column cannot be both TAG and FIELD: %1$s
error.spark.field_required_write = At least one FIELD column is required for TsFile table writes
error.spark.tag_column_missing = TAG column does not exist: %1$s
error.spark.tag_column_must_string_spark = TAG column must be StringType: %1$s
error.spark.field_column_missing = FIELD column does not exist: %1$s
error.spark.duplicate_dataframe_column = Duplicate DataFrame column after lower-case normalization: %1$s
error.spark.local_output_paths_only = Only local output paths are supported: %1$s
error.spark.output_exists_append = TsFile output file already exists during append: %1$s
error.spark.commit_failed = Failed to commit TsFile Spark write
error.spark.delete_empty_cleanup_failed = Failed to delete empty path during TsFile write cleanup: %1$s
error.spark.delete_cleanup_failed = Failed to delete path during TsFile write cleanup: %1$s
error.spark.query_table_failed = Failed to query TsFile table data from %1$s
error.spark.unsupported_tsfile_type = Unsupported TsFile data type: %1$s
error.spark.unsupported_tsfile_type_connector = Unsupported TsFile data type for Spark connector: %1$s
error.spark.unsupported_spark_field_type = Unsupported Spark SQL type for TsFile FIELD: %1$s
error.spark.unsupported_timestamp_precision = Unsupported timestamp precision: %1$s
error.spark.encoding_invalid = Unsupported encoding: %1$s
error.spark.encoding_not_supported = Encoding %1$s is not supported for type %2$s
error.spark.compression_invalid = Unsupported compression: %1$s
error.spark.duplicate_external_schema_column = Duplicate external Spark schema column after lower-case normalization: %1$s
error.spark.external_schema_column_missing = External Spark schema column does not exist in TsFile table metadata: %1$s
error.spark.external_schema_type_mismatch = External Spark schema column %1$s has type %2$s, but TsFile table metadata has type %3$s
error.spark.catalog_alter_unsupported = TsFile table catalog does not support ALTER TABLE
Original file line number Diff line number Diff line change
Expand Up @@ -1548,3 +1548,72 @@ log.tools.csv_read_error = 读取 CSV 文件出错: %1$s

# CsvSourceReader — error closing CSV reader
log.tools.csv_close_reader_error = 关闭 CSV reader 出错

# --- spark connector ERROR messages ---

error.spark.write_table_required = 写入 TsFile table model 需要指定 option "table"
error.spark.write_tag_columns_required = 写入 TsFile table model 需要指定 option "tagColumns"
error.spark.null_tag_policy_unsupported = 仅支持 nullTagPolicy=error
error.spark.unsupported_option = 不支持的 TsFile connector option: %1$s
error.spark.model_unsupported = 仅支持 TsFile table model,但 option "model" 为 %1$s
error.spark.multiple_load_paths_unsupported = 暂不支持多个 Spark load path;请改为传入目录或 glob path
error.spark.path_required = TsFile connector 需要 path
error.spark.positive_option_required = %1$s 必须为正数
error.spark.timestamp_as_invalid = timestampAs 必须为 long 或 timestamp
error.spark.timestamp_precision_invalid = timestampPrecision 必须为 ms、us 或 ns
error.spark.column_list_empty = 列表 option 不能为空
error.spark.no_tsfile_files = 路径下未找到 TsFile 文件: %1$s
error.spark.multiple_tables_requires_table = TsFile %1$s 包含多个 table;请指定 option "table"
error.spark.table_not_found_in_file = TsFile 文件 %1$s 不包含 table %2$s
error.spark.read_table_metadata_failed = 读取 %1$s 的 TsFile table metadata 失败
error.spark.incompatible_table_name = %1$s 中的 TsFile table schema 不兼容: 期望 table %2$s,但实际为 %3$s
error.spark.incompatible_column_count = %1$s 中的 TsFile table schema 不兼容: column 数量不同
error.spark.incompatible_table_schema_column = %1$s 中的 TsFile table schema 在 column %2$s 处不兼容: 期望 %3$s/%4$s/%5$s,但实际为 %6$s/%7$s/%8$s
error.spark.incompatible_tag_column_count = %1$s 中的 TsFile table schema 不兼容: TAG column 数量不同
error.spark.incompatible_tag_column = %1$s 中的 TsFile table schema 在 TAG column %2$s 处不兼容: 期望 %3$s/%4$s,但实际为 %5$s/%6$s
error.spark.incompatible_field_column_type = %1$s 中的 TsFile table schema 不兼容: FIELD column %2$s 的 type 为 %4$s,但期望为 %3$s
error.spark.path_not_exist = TsFile 输入路径不存在: %1$s
error.spark.discover_path_failed = 发现 TsFile 输入路径失败: %1$s
error.spark.input_not_tsfile = 输入路径不是 .tsfile 文件: %1$s
error.spark.local_paths_only = 仅支持本地 TsFile 路径: %1$s
error.spark.unknown_projected_column = 未知的 projected TsFile table column: %1$s
error.spark.time_tag_projection_requires_field = Time/TAG-only projection 要求 TsFile table 中至少有一个 FIELD column
error.spark.unsupported_time_filter_literal = 不支持的 time filter literal: %1$s
error.spark.duplicate_tsfile_column = lower-case 归一化后 TsFile table column 重复: %1$s
error.spark.duplicate_spark_schema_column = lower-case 归一化后 Spark schema column 重复: %1$s
error.spark.tag_column_must_string_tsfile = TAG column %1$s.%2$s 必须为 TSDataType.STRING,但实际为 %3$s
error.spark.unsupported_column_category = %2$s.%3$s 不支持 Column category %1$s
error.spark.unknown_tsfile_column = 未知的 TsFile table column: %1$s
error.spark.time_column_not_null = timeColumn 不能为 null
error.spark.tag_column_not_null = TAG column 不能为 null: %1$s
error.spark.unsupported_tsfile_type_write = 写入不支持的 TsFile data type: %1$s
error.spark.write_tablet_failed = 写入 TsFile tablet 失败
error.spark.time_column_missing = Time column 不存在: %1$s
error.spark.time_column_type_invalid = Time column 必须为 LongType 或 TimestampType
error.spark.duplicate_tag_columns = lower-case 归一化后 tagColumns 重复
error.spark.time_column_in_tag_columns = timeColumn 不能出现在 tagColumns 中
error.spark.duplicate_field_columns = lower-case 归一化后 fieldColumns 重复
error.spark.time_column_in_field_columns = timeColumn 不能出现在 fieldColumns 中
error.spark.column_both_tag_field = 同一列不能同时为 TAG 和 FIELD: %1$s
error.spark.field_required_write = TsFile table 写入至少需要一个 FIELD column
error.spark.tag_column_missing = TAG column 不存在: %1$s
error.spark.tag_column_must_string_spark = TAG column 必须为 StringType: %1$s
error.spark.field_column_missing = FIELD column 不存在: %1$s
error.spark.duplicate_dataframe_column = lower-case 归一化后 DataFrame column 重复: %1$s
error.spark.local_output_paths_only = 仅支持本地输出路径: %1$s
error.spark.output_exists_append = append 时 TsFile 输出文件已存在: %1$s
error.spark.commit_failed = 提交 TsFile Spark 写入失败
error.spark.delete_empty_cleanup_failed = TsFile 写入清理期间删除空路径失败: %1$s
error.spark.delete_cleanup_failed = TsFile 写入清理期间删除路径失败: %1$s
error.spark.query_table_failed = 查询 %1$s 中的 TsFile table data 失败
error.spark.unsupported_tsfile_type = 不支持的 TsFile data type: %1$s
error.spark.unsupported_tsfile_type_connector = Spark connector 不支持的 TsFile data type: %1$s
error.spark.unsupported_spark_field_type = TsFile FIELD 不支持的 Spark SQL type: %1$s
error.spark.unsupported_timestamp_precision = 不支持的 timestamp precision: %1$s
error.spark.encoding_invalid = 不支持的 encoding: %1$s
error.spark.encoding_not_supported = Encoding %1$s 不支持 type %2$s
error.spark.compression_invalid = 不支持的 compression: %1$s
error.spark.duplicate_external_schema_column = lower-case 归一化后 external Spark schema column 重复: %1$s
error.spark.external_schema_column_missing = TsFile table metadata 中不存在 external Spark schema column: %1$s
error.spark.external_schema_type_mismatch = External Spark schema column %1$s 的 type 为 %2$s,但 TsFile table metadata 中的 type 为 %3$s
error.spark.catalog_alter_unsupported = TsFile table catalog 不支持 ALTER TABLE
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<modules>
<module>common</module>
<module>tsfile</module>
<module>spark-tsfile</module>
<module>examples</module>
<module>tools</module>
</modules>
Expand Down
169 changes: 169 additions & 0 deletions java/spark-tsfile/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->

# TsFile Spark Connector

This module provides a Spark 3.x DataSource V2 connector for TsFile table model
files. The short name is `tsfile`.

## Read

```scala
val df = spark.read
.format("tsfile")
.option("model", "table")
.option("table", "weather")
.load("/data/tsfile/weather")

df.select("time", "city", "temperature")
.where("time >= 1700000000000 and city = 'beijing'")
.show()
```

If every input TsFile contains exactly one table, `table` can be omitted and the
connector will infer it from the TsFile metadata. If an input file contains
multiple tables, `table` must be provided.

The input path can be a single `.tsfile`, a directory containing `.tsfile` files,
or a glob path. The initial connector discovers paths through Hadoop APIs but
opens TsFile data with the local Java reader, so only local file paths are
supported.

## Write

```scala
val rows = Seq(
(1700000000000L, "beijing", 20, 0.32d),
(1700000001000L, "shanghai", 21, 0.35d)
).toDF("time", "city", "temperature", "humidity")

rows.write
.format("tsfile")
.option("model", "table")
.option("table", "weather")
.option("timeColumn", "time")
.option("tagColumns", "city")
.option("compression", "lz4")
.mode("append")
.save("/data/tsfile/weather")
```

Each Spark task writes a separate `part-*.tsfile`. Append mode adds new TsFile
files to the output directory; it does not append rows to an existing TsFile.

TAG columns must be non-null strings. FIELD columns default to all columns that
are not the time column or TAG columns, and null FIELD values are written as
sparse TsFile values.

## SQL Temporary View

```scala
spark.read
.format("tsfile")
.option("table", "weather")
.load("/data/tsfile/weather/*.tsfile")
.createOrReplaceTempView("weather_tsfile")

spark.sql(
"""
|select time, city, temperature
|from weather_tsfile
|where city = 'beijing'
|order by time
|""".stripMargin)
.show()
```

## SQL CTAS Write

SQL CTAS uses the connector's Spark DataSource V2 catalog, configured for the
Spark session.

```scala
spark.conf.set(
"spark.sql.catalog.tsfile_catalog",
"org.apache.tsfile.spark.TsFileTableCatalog")

spark.sql(
"""
|CREATE TABLE tsfile_catalog.weather_copy
|USING tsfile
|TBLPROPERTIES (
| 'path' = '/data/tsfile/weather-copy',
| 'model' = 'table',
| 'table' = 'weather',
| 'tagColumns' = 'city'
|)
|AS SELECT time, city, temperature, humidity
|FROM source_weather
|""".stripMargin)
```

## Options

| Option | Default | Description |
| --- | --- | --- |
| `model` | `table` | Must be `table`. |
| `table` | none | Table name. Required for writes and for multi-table reads. |
| `timeColumn` | `time` | Spark time column name. |
| `tagColumns` | none | Comma-separated TAG columns. Required for writes. |
| `fieldColumns` | inferred | Comma-separated FIELD columns for writes. |
| `timestampAs` | `long` | Use `long` or `timestamp` for TsFile `TIMESTAMP` fields and the Spark time column. |
| `timestampPrecision` | `ms` | Raw TsFile timestamp precision: `ms`, `us`, or `ns`. |
| `mergeSchema` | `false` | When `true`, multi-file reads union compatible FIELD columns. TAG columns must keep the same order and type. |
| `pushdown` | `true` | Enables supported time and TAG equality predicate pushdown. |
| `compression` | default TsFile setting | Compression codec for written FIELD columns. |
| `encoding` | default TsFile setting | Encoding for written FIELD columns. |
| `nullTagPolicy` | `error` | Only `error` is supported. |
| `maxRowsPerTablet` | `1024` | Maximum rows buffered in each TsFile `Tablet` before flushing. |

Table and column names are normalized to lower case to match TsFile table model
metadata behavior.

## Initial Scope

This module is the initial Spark 3.x DataSource V2 connector for TsFile table
model files. It intentionally keeps the first production surface narrow:

- Batch read and batch write are supported; streaming read/write are not.
- Writes are append-only and create new `part-*.tsfile` files. Overwrite and
truncate semantics are not supported in this initial connector.
- Only TsFile table model is supported. Tree model files are outside this
module's scope.
- Input discovery supports a single `.tsfile`, a directory of `.tsfile` files,
and glob paths, but actual TsFile reading and writing is local-file only in
this initial version. Non-`file` Hadoop paths should be handled in a follow-up
change.
- `mergeSchema=true` supports FIELD column union for multi-file reads. TAG
columns must keep the same order and type, and same-name FIELD columns must
keep the same TsFile type.
- User-provided read schemas are validated against TsFile table metadata and
may be used as read projections. They cannot change the stored column types.
- Predicate pushdown is limited to `time =`, `time >`, `time >=`, `time <`,
`time <=`, AND-combined time ranges, and string equality on TAG columns.
Unsupported predicates are returned to Spark as residual filters.
- Unsupported table categories and types fail fast: `ATTRIBUTE`, `TIME`,
`VECTOR`, `UNKNOWN`, and `OBJECT` are not part of the first connector scope.
- TAG columns must be non-null strings. FIELD columns may be null and are
written/read as sparse TsFile values.

Follow-up issues should track non-local filesystem support, a broader predicate
pushdown matrix, streaming semantics, and expanded type or category support.
Loading