diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index d2b9e9d04..97dcddc0d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -732,22 +732,23 @@ public TimeseriesMetadata readTimeseriesMetadata( boolean ignoreNotExistDevice, LongConsumer ioSizeConsumer) throws IOException { - readFileMetadata(ioSizeConsumer); - MetadataIndexNode deviceMetadataIndexNode = - tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); - Pair metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeConsumer); - if (metadataIndexPair == null) { - if (ignoreNotExistDevice) { - return null; - } - throw new IOException( - Messages.format("error.read.device_not_in_metadata_file", device, file)); - } - ByteBuffer buffer = - readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer); - MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; - if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + return readTimeseriesMetadata(device, null, measurement, ignoreNotExistDevice, ioSizeConsumer); + } + + public TimeseriesMetadata readTimeseriesMetadata( + IDeviceID device, + long[] deviceMetadataIndexNodeOffset, + String measurement, + boolean ignoreNotExistDevice, + LongConsumer ioSizeConsumer) + throws IOException { + Pair metadataIndexPair; + MetadataIndexNode metadataIndexNode; + ByteBuffer buffer; + if (deviceMetadataIndexNodeOffset != null) { + buffer = + readData( + deviceMetadataIndexNodeOffset[0], deviceMetadataIndexNodeOffset[1], ioSizeConsumer); try { metadataIndexNode = deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize( @@ -759,6 +760,36 @@ public TimeseriesMetadata readTimeseriesMetadata( metadataIndexPair = getMetadataAndEndOffsetOfMeasurementNode( metadataIndexNode, measurement, false, ioSizeConsumer); + } else { + readFileMetadata(ioSizeConsumer); + MetadataIndexNode deviceMetadataIndexNode = + tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); + metadataIndexPair = + getMetadataAndEndOffsetOfDeviceNode( + deviceMetadataIndexNode, device, true, ioSizeConsumer); + if (metadataIndexPair == null) { + if (ignoreNotExistDevice) { + return null; + } + throw new IOException( + Messages.format("error.read.device_not_in_metadata_file", device, file)); + } + buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer); + metadataIndexNode = deviceMetadataIndexNode; + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + try { + metadataIndexNode = + deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize( + buffer, deserializeConfig); + } catch (Exception e) { + logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); + throw e; + } + metadataIndexPair = + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeConsumer); + } } if (metadataIndexPair == null) { return null; @@ -781,13 +812,15 @@ public TimeseriesMetadata readTimeseriesMetadata( } // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList - tsFileInput.position(metadataIndexPair.left.getOffset()); - while (tsFileInput.position() < metadataIndexPair.right) { - try { - timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true)); - } catch (Exception e1) { - logger.error(Messages.get("log.read.sequence_reader_tsm_deserialize_error"), file); - throw e1; + synchronized (this) { + tsFileInput.position(metadataIndexPair.left.getOffset()); + while (tsFileInput.position() < metadataIndexPair.right) { + try { + timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(tsFileInput, true)); + } catch (Exception e1) { + logger.error(Messages.get("log.read.sequence_reader_tsm_deserialize_error"), file); + throw e1; + } } } } @@ -872,8 +905,21 @@ public List readTimeseriesMetadata( boolean ignoreNotExistDevice, LongConsumer ioSizeRecorder) throws IOException { + return readTimeseriesMetadata( + device, null, measurement, allSensors, ignoreNotExistDevice, ioSizeRecorder); + } + + public List readTimeseriesMetadata( + IDeviceID device, + long[] deviceMetadataIndexNodeOffset, + String measurement, + Set allSensors, + boolean ignoreNotExistDevice, + LongConsumer ioSizeRecorder) + throws IOException { Pair metadataIndexPair = - getLeafMetadataIndexPair(device, measurement, ioSizeRecorder); + getLeafMetadataIndexPair( + device, deviceMetadataIndexNodeOffset, measurement, ioSizeRecorder); if (metadataIndexPair == null) { if (ignoreNotExistDevice) { return Collections.emptyList(); @@ -927,19 +973,18 @@ public List readTimeseriesMetadata( /* Get leaf MetadataIndexPair which contains path */ private Pair getLeafMetadataIndexPair( - IDeviceID device, String measurement, LongConsumer ioSizeRecorder) throws IOException { - readFileMetadata(ioSizeRecorder); - MetadataIndexNode deviceMetadataIndexNode = - tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); - Pair metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeRecorder); - if (metadataIndexPair == null) { - return null; - } - ByteBuffer buffer = - readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); - MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; - if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + IDeviceID device, + long[] deviceMetadataIndexNodeOffset, + String measurement, + LongConsumer ioSizeRecorder) + throws IOException { + Pair metadataIndexPair; + MetadataIndexNode metadataIndexNode; + ByteBuffer buffer; + if (deviceMetadataIndexNodeOffset != null) { + buffer = + readData( + deviceMetadataIndexNodeOffset[0], deviceMetadataIndexNodeOffset[1], ioSizeRecorder); try { metadataIndexNode = deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize( @@ -948,10 +993,33 @@ private Pair getLeafMetadataIndexPair( logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); throw e; } + } else { + readFileMetadata(ioSizeRecorder); + MetadataIndexNode deviceMetadataIndexNode = + tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode( - metadataIndexNode, measurement, false, ioSizeRecorder); + getMetadataAndEndOffsetOfDeviceNode( + deviceMetadataIndexNode, device, true, ioSizeRecorder); + if (metadataIndexPair == null) { + return null; + } + buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); + metadataIndexNode = deviceMetadataIndexNode; + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + try { + metadataIndexNode = + deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize( + buffer, deserializeConfig); + } catch (Exception e) { + logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); + throw e; + } + } } + metadataIndexPair = + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeRecorder); return metadataIndexPair; } @@ -1646,7 +1714,7 @@ private void generateMetadataIndexUsingTsFileInput( needChunkMetadata); } - private void generateMetadataIndexUsingTsFileInput( + private synchronized void generateMetadataIndexUsingTsFileInput( IMetadataIndexEntry metadataIndex, long start, long end, diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 0c39f15ec..df6d99b20 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -507,6 +507,10 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { // construct the index tree node for the series currentDevice = currentPath.getIDeviceID(); boolean isTableModel = schema.getTableSchemaMap().containsKey(currentDevice.getTableName()); + String currentTableName = isTableModel ? currentDevice.getTableName() : null; + if (prevDevice == null && currentTableName != null) { + prevTableName = currentTableName; + } if (!currentDevice.equals(prevDevice)) { if (prevDevice != null) { addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); @@ -515,7 +519,6 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { generateRootNode( measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); - String currentTableName = isTableModel ? currentDevice.getTableName() : null; if (!Objects.equals(currentTableName, prevTableName)) { if (prevTableName != null) { long currentTableSize = out.getPosition() - prevTableMetadataStartOffset; diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java index f75b1ed81..894eb983f 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileDeviceIteratorTest.java @@ -24,6 +24,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.common.TimeRange; @@ -102,6 +103,33 @@ public void test() throws IOException { } } + @Test + public void testReadTimeseriesMetadataWithDeviceMetadataIndexNodeOffset() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH))) { + registerTableSchema(writer, "table1"); + generateDevice(writer, "table1", 10); + writer.endFile(); + } + + try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) { + TsFileDeviceIterator deviceIterator = + reader.getTableDevicesIteratorWithIsAligned("table1", null); + Assert.assertTrue(deviceIterator.hasNext()); + Pair currentDevice = deviceIterator.next(); + long[] deviceMetadataIndexNodeOffset = deviceIterator.getCurrentDeviceMeasurementNodeOffset(); + + TimeseriesMetadata metadataWithoutOffset = + reader.readTimeseriesMetadata(currentDevice.getLeft(), "s1", false); + TimeseriesMetadata metadataWithOffset = + reader.readTimeseriesMetadata( + currentDevice.getLeft(), deviceMetadataIndexNodeOffset, "s1", false, null); + + Assert.assertEquals("s1", metadataWithoutOffset.getMeasurementId()); + Assert.assertEquals( + metadataWithoutOffset.getMeasurementId(), metadataWithOffset.getMeasurementId()); + } + } + private void registerTableSchema(TsFileIOWriter writer, String tableName) { List schemas = Arrays.asList( diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index feaa27940..fcd761d3f 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -1280,6 +1280,47 @@ public void calculateTableSize() throws IOException, WriteProcessException { Assert.assertTrue(tableSizeMap.get("table2") >= 1024 * 1024); } + @Test + public void calculateTableSize2() throws IOException, WriteProcessException { + TableSchema tableSchema1 = + new TableSchema( + "table1", + Arrays.asList( + new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD))); + TableSchema tableSchema2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD))); + Tablet tablet1 = + new Tablet( + "table1", + IMeasurementSchema.getMeasurementNameList(tableSchema1.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema1.getColumnSchemas()), + tableSchema1.getColumnTypes()); + tablet1.addTimestamp(0, 0); + tablet1.addValue(0, 0, new byte[1024]); + Tablet tablet2 = + new Tablet( + "table2", + IMeasurementSchema.getMeasurementNameList(tableSchema2.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema2.getColumnSchemas()), + tableSchema2.getColumnTypes()); + tablet2.addTimestamp(0, 0); + tablet2.addValue(0, 0, new byte[1024 * 1024]); + Map tableSizeMap = null; + try (TsFileWriter writer = new TsFileWriter(f)) { + writer.registerTableSchema(tableSchema1); + writer.registerTableSchema(tableSchema2); + writer.writeTable(tablet1); + writer.writeTable(tablet2); + tableSizeMap = writer.getIOWriter().getTableSizeMap(); + } + Assert.assertTrue(tableSizeMap.get("table1") > 1024); + } + @Test public void writeRecord() throws IOException, WriteProcessException, ReadProcessException { setEnv(100 * 1024 * 1024, 10 * 1024);