Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -732,22 +732,23 @@ public TimeseriesMetadata readTimeseriesMetadata(
boolean ignoreNotExistDevice,
LongConsumer ioSizeConsumer)
throws IOException {
readFileMetadata(ioSizeConsumer);
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeConsumer);
if (metadataIndexPair == null) {
if (ignoreNotExistDevice) {
return null;
}
throw new IOException(
Messages.format("error.read.device_not_in_metadata_file", device, file));
}
ByteBuffer buffer =
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer);
MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
return readTimeseriesMetadata(device, null, measurement, ignoreNotExistDevice, ioSizeConsumer);
}

public TimeseriesMetadata readTimeseriesMetadata(
IDeviceID device,
long[] deviceMetadataIndexNodeOffset,
String measurement,
boolean ignoreNotExistDevice,
LongConsumer ioSizeConsumer)
throws IOException {
Pair<IMetadataIndexEntry, Long> metadataIndexPair;
MetadataIndexNode metadataIndexNode;
ByteBuffer buffer;
if (deviceMetadataIndexNodeOffset != null) {
buffer =
readData(
deviceMetadataIndexNodeOffset[0], deviceMetadataIndexNodeOffset[1], ioSizeConsumer);
try {
metadataIndexNode =
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
Expand All @@ -759,6 +760,36 @@ public TimeseriesMetadata readTimeseriesMetadata(
metadataIndexPair =
getMetadataAndEndOffsetOfMeasurementNode(
metadataIndexNode, measurement, false, ioSizeConsumer);
} else {
readFileMetadata(ioSizeConsumer);
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
metadataIndexPair =
getMetadataAndEndOffsetOfDeviceNode(
deviceMetadataIndexNode, device, true, ioSizeConsumer);
if (metadataIndexPair == null) {
if (ignoreNotExistDevice) {
return null;
}
throw new IOException(
Messages.format("error.read.device_not_in_metadata_file", device, file));
}
buffer =
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer);
metadataIndexNode = deviceMetadataIndexNode;
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
try {
metadataIndexNode =
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
buffer, deserializeConfig);
} catch (Exception e) {
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
metadataIndexPair =
getMetadataAndEndOffsetOfMeasurementNode(
metadataIndexNode, measurement, false, ioSizeConsumer);
}
}
if (metadataIndexPair == null) {
return null;
Expand All @@ -781,13 +812,15 @@ public TimeseriesMetadata readTimeseriesMetadata(
}
// when the buffer length is over than Integer.MAX_VALUE,
// using tsFileInput to get timeseriesMetadataList
tsFileInput.position(metadataIndexPair.left.getOffset());
while (tsFileInput.position() < metadataIndexPair.right) {
try {
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true));
} catch (Exception e1) {
logger.error(Messages.get("log.read.sequence_reader_tsm_deserialize_error"), file);
throw e1;
synchronized (this) {
tsFileInput.position(metadataIndexPair.left.getOffset());
while (tsFileInput.position() < metadataIndexPair.right) {
try {
timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true));
} catch (Exception e1) {
logger.error(Messages.get("log.read.sequence_reader_tsm_deserialize_error"), file);
throw e1;
}
}
}
}
Expand Down Expand Up @@ -872,8 +905,21 @@ public List<TimeseriesMetadata> readTimeseriesMetadata(
boolean ignoreNotExistDevice,
LongConsumer ioSizeRecorder)
throws IOException {
return readTimeseriesMetadata(
device, null, measurement, allSensors, ignoreNotExistDevice, ioSizeRecorder);
}

public List<TimeseriesMetadata> readTimeseriesMetadata(
IDeviceID device,
long[] deviceMetadataIndexNodeOffset,
String measurement,
Set<String> allSensors,
boolean ignoreNotExistDevice,
LongConsumer ioSizeRecorder)
throws IOException {
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
getLeafMetadataIndexPair(device, measurement, ioSizeRecorder);
getLeafMetadataIndexPair(
device, deviceMetadataIndexNodeOffset, measurement, ioSizeRecorder);
if (metadataIndexPair == null) {
if (ignoreNotExistDevice) {
return Collections.emptyList();
Expand Down Expand Up @@ -927,19 +973,18 @@ public List<TimeseriesMetadata> readTimeseriesMetadata(

/* Get leaf MetadataIndexPair which contains path */
private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
IDeviceID device, String measurement, LongConsumer ioSizeRecorder) throws IOException {
readFileMetadata(ioSizeRecorder);
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeRecorder);
if (metadataIndexPair == null) {
return null;
}
ByteBuffer buffer =
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder);
MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
IDeviceID device,
long[] deviceMetadataIndexNodeOffset,
String measurement,
LongConsumer ioSizeRecorder)
throws IOException {
Pair<IMetadataIndexEntry, Long> metadataIndexPair;
MetadataIndexNode metadataIndexNode;
ByteBuffer buffer;
if (deviceMetadataIndexNodeOffset != null) {
buffer =
readData(
deviceMetadataIndexNodeOffset[0], deviceMetadataIndexNodeOffset[1], ioSizeRecorder);
try {
metadataIndexNode =
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
Expand All @@ -948,10 +993,33 @@ private Pair<IMetadataIndexEntry, Long> getLeafMetadataIndexPair(
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
} else {
readFileMetadata(ioSizeRecorder);
MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
metadataIndexPair =
getMetadataAndEndOffsetOfMeasurementNode(
metadataIndexNode, measurement, false, ioSizeRecorder);
getMetadataAndEndOffsetOfDeviceNode(
deviceMetadataIndexNode, device, true, ioSizeRecorder);
if (metadataIndexPair == null) {
return null;
}
buffer =
readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder);
metadataIndexNode = deviceMetadataIndexNode;
if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
try {
metadataIndexNode =
deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize(
buffer, deserializeConfig);
} catch (Exception e) {
logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
throw e;
}
}
}
metadataIndexPair =
getMetadataAndEndOffsetOfMeasurementNode(
metadataIndexNode, measurement, false, ioSizeRecorder);
return metadataIndexPair;
}

Expand Down Expand Up @@ -1646,7 +1714,7 @@ private void generateMetadataIndexUsingTsFileInput(
needChunkMetadata);
}

private void generateMetadataIndexUsingTsFileInput(
private synchronized void generateMetadataIndexUsingTsFileInput(
IMetadataIndexEntry metadataIndex,
long start,
long end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
// construct the index tree node for the series
currentDevice = currentPath.getIDeviceID();
boolean isTableModel = schema.getTableSchemaMap().containsKey(currentDevice.getTableName());
String currentTableName = isTableModel ? currentDevice.getTableName() : null;
if (prevDevice == null && currentTableName != null) {
prevTableName = currentTableName;
}
if (!currentDevice.equals(prevDevice)) {
if (prevDevice != null) {
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
Expand All @@ -515,7 +519,6 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
generateRootNode(
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
String currentTableName = isTableModel ? currentDevice.getTableName() : null;
if (!Objects.equals(currentTableName, prevTableName)) {
if (prevTableName != null) {
long currentTableSize = out.getPosition() - prevTableMetadataStartOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
Expand Down Expand Up @@ -102,6 +103,33 @@ public void test() throws IOException {
}
}

@Test
public void testReadTimeseriesMetadataWithDeviceMetadataIndexNodeOffset() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) {
registerTableSchema(writer, "table1");
generateDevice(writer, "table1", 10);
writer.endFile();
}

try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
TsFileDeviceIterator deviceIterator =
reader.getTableDevicesIteratorWithIsAligned("table1", null);
Assert.assertTrue(deviceIterator.hasNext());
Pair<IDeviceID, Boolean> currentDevice = deviceIterator.next();
long[] deviceMetadataIndexNodeOffset = deviceIterator.getCurrentDeviceMeasurementNodeOffset();

TimeseriesMetadata metadataWithoutOffset =
reader.readTimeseriesMetadata(currentDevice.getLeft(), "s1", false);
TimeseriesMetadata metadataWithOffset =
reader.readTimeseriesMetadata(
currentDevice.getLeft(), deviceMetadataIndexNodeOffset, "s1", false, null);

Assert.assertEquals("s1", metadataWithoutOffset.getMeasurementId());
Assert.assertEquals(
metadataWithoutOffset.getMeasurementId(), metadataWithOffset.getMeasurementId());
}
}

private void registerTableSchema(TsFileIOWriter writer, String tableName) {
List<IMeasurementSchema> schemas =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,47 @@ public void calculateTableSize() throws IOException, WriteProcessException {
Assert.assertTrue(tableSizeMap.get("table2") >= 1024 * 1024);
}

@Test
public void calculateTableSize2() throws IOException, WriteProcessException {
TableSchema tableSchema1 =
new TableSchema(
"table1",
Arrays.asList(
new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD)));
TableSchema tableSchema2 =
new TableSchema(
"table2",
Arrays.asList(
new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD)));
Tablet tablet1 =
new Tablet(
"table1",
IMeasurementSchema.getMeasurementNameList(tableSchema1.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema1.getColumnSchemas()),
tableSchema1.getColumnTypes());
tablet1.addTimestamp(0, 0);
tablet1.addValue(0, 0, new byte[1024]);
Tablet tablet2 =
new Tablet(
"table2",
IMeasurementSchema.getMeasurementNameList(tableSchema2.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema2.getColumnSchemas()),
tableSchema2.getColumnTypes());
tablet2.addTimestamp(0, 0);
tablet2.addValue(0, 0, new byte[1024 * 1024]);
Map<String, Long> tableSizeMap = null;
try (TsFileWriter writer = new TsFileWriter(f)) {
writer.registerTableSchema(tableSchema1);
writer.registerTableSchema(tableSchema2);
writer.writeTable(tablet1);
writer.writeTable(tablet2);
tableSizeMap = writer.getIOWriter().getTableSizeMap();
}
Assert.assertTrue(tableSizeMap.get("table1") > 1024);
}

@Test
public void writeRecord() throws IOException, WriteProcessException, ReadProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);
Expand Down
Loading