Further optimizations for ingestion flow#1668
Conversation
WalkthroughAdds a lazy Tokio runtime for flush/convert work; DiskWriter tracks accumulated bytes and evicts when ARROW_FLUSH_SIZE_LIMIT is reached using LZ4 IPC options; hostname caching; parallel Arrow→Parquet conversion with Rayon; and routes flush/convert and upload tasks onto the new runtime. ChangesStaging Writer Optimization and Conversion
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (5)
src/parseable/staging/writer.rs (2)
36-36: 💤 Low valueUnused import:
SizedMessageappears to not be used in this file.This import was added but doesn't seem to be referenced anywhere in the file. Consider removing it if it's not needed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/staging/writer.rs` at line 36, Unused import SizedMessage in src/parseable/staging/writer.rs: remove the unused line referencing SizedMessage (use datafusion::physical_plan::buffer::SizedMessage;) from the top of writer.rs; if you intended to use SizedMessage, instead reference it where needed in the relevant function or scope (e.g., wherever buffering/physical_plan buffer handling is implemented) or add a purposeful usage, otherwise simply delete the import to fix the unused-import warning.
266-270: 💤 Low valueSize tracking uses in-memory batch size, not compressed IPC size.
rb.size()returns the in-memory ArrowRecordBatchsize, but the actual bytes written to the IPC stream will differ due to LZ4 compression. This meansARROW_FLUSH_SIZE_LIMITeffectively controls in-memory accumulation rather than on-disk file size. If this approximation is intentional for simplicity, a brief comment would help future readers understand the design choice.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/staging/writer.rs` around lines 266 - 270, The current write method in write(&mut self, rb: &RecordBatch) updates self.size using rb.size(), which measures in-memory Arrow size not the compressed IPC bytes actually written by self.inner.write; either (A) replace the size update with the actual number of bytes written by serializing the RecordBatch to an IPC buffer using the same compression settings as self.inner (e.g., serialize via the same IPC writer/stream code used by self.inner and add the resulting byte length to self.size) or (B) if you intentionally accept the approximation, add a short clarifying comment above write (referencing write, self.size, rb.size(), and self.inner.write) explaining that rb.size() is an in-memory estimate and does not reflect LZ4-compressed IPC size. Make only one of these changes.src/storage/object_storage.rs (1)
1090-1090: ⚡ Quick winConsider reducing log level for active files set.
This
info!log will output the entireACTIVE_OBJECT_STORE_SYNC_FILESHashSet on every sync cycle, which could be verbose in production when many files are being tracked. Consider usingtrace!ordebug!level instead, or only logging the count of active files atinfo!level.📝 Suggested change
- tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES); + tracing::debug!(ACTIVE_OBJECT_STORE_SYNC_FILES=?guard);Or for info-level with just the count:
- tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES); + tracing::info!(active_sync_files_count = guard.len());🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/storage/object_storage.rs` at line 1090, The current tracing::info! call logs the entire ACTIVE_OBJECT_STORE_SYNC_FILES HashSet each sync and is too verbose; change the call to either tracing::debug! or tracing::trace! to reduce noise, or keep info-level but log only the count (e.g., ACTIVE_OBJECT_STORE_SYNC_FILES.len()) and optionally include a small sample or summary instead of the full set; update the tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES) statement accordingly to use the chosen level or to log the length/summary.src/parseable/streams.rs (2)
838-890: 💤 Low valueParallel conversion looks correct, but consider error handling strategy.
The rayon parallel iteration collects all results and then processes them sequentially to extract schemas and propagate the first error. This is a valid approach, though it means all parallel tasks complete before any error is surfaced. This matches the previous sequential behavior where all files would be attempted.
One observation: if multiple parquet conversions fail, only the first error is returned while the rest are silently discarded. This is acceptable for simplicity, but logging all errors before returning the first might help debugging batch failures.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` around lines 838 - 890, The parallel map collects results into _schemas but only returns the first error while discarding others; update the post-collection handling of _schemas (from staging_files.into_par_iter() ... .collect()) to log all Err(e) entries (including context like parquet_path / arrow_files where available) before returning the first Err, or aggregate errors into a single diagnostic message; specifically adjust the loop that matches res in _schemas (and the rename error logging near write_parquet_part_file and std::fs::rename) to emit error logs for every Err variant encountered and then return the first Err rather than silently dropping subsequent errors.
847-847: 💤 Low valueRemove dead commented code.
This commented-out line
// schemas.push(merged_schema.clone());appears to be leftover from the refactoring. It should be removed to keep the code clean.🧹 Suggested cleanup
let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition); - // schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema.clone());🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` at line 847, Remove the dead commented-out line "// schemas.push(merged_schema.clone());" from the code to clean up leftover refactor noise; locate the block where schemas and merged_schema are handled (references to schemas and merged_schema in streams.rs) and delete that commented line so only active code remains.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/parseable/staging/writer.rs`:
- Line 36: Unused import SizedMessage in src/parseable/staging/writer.rs: remove
the unused line referencing SizedMessage (use
datafusion::physical_plan::buffer::SizedMessage;) from the top of writer.rs; if
you intended to use SizedMessage, instead reference it where needed in the
relevant function or scope (e.g., wherever buffering/physical_plan buffer
handling is implemented) or add a purposeful usage, otherwise simply delete the
import to fix the unused-import warning.
- Around line 266-270: The current write method in write(&mut self, rb:
&RecordBatch) updates self.size using rb.size(), which measures in-memory Arrow
size not the compressed IPC bytes actually written by self.inner.write; either
(A) replace the size update with the actual number of bytes written by
serializing the RecordBatch to an IPC buffer using the same compression settings
as self.inner (e.g., serialize via the same IPC writer/stream code used by
self.inner and add the resulting byte length to self.size) or (B) if you
intentionally accept the approximation, add a short clarifying comment above
write (referencing write, self.size, rb.size(), and self.inner.write) explaining
that rb.size() is an in-memory estimate and does not reflect LZ4-compressed IPC
size. Make only one of these changes.
In `@src/parseable/streams.rs`:
- Around line 838-890: The parallel map collects results into _schemas but only
returns the first error while discarding others; update the post-collection
handling of _schemas (from staging_files.into_par_iter() ... .collect()) to log
all Err(e) entries (including context like parquet_path / arrow_files where
available) before returning the first Err, or aggregate errors into a single
diagnostic message; specifically adjust the loop that matches res in _schemas
(and the rename error logging near write_parquet_part_file and std::fs::rename)
to emit error logs for every Err variant encountered and then return the first
Err rather than silently dropping subsequent errors.
- Line 847: Remove the dead commented-out line "//
schemas.push(merged_schema.clone());" from the code to clean up leftover
refactor noise; locate the block where schemas and merged_schema are handled
(references to schemas and merged_schema in streams.rs) and delete that
commented line so only active code remains.
In `@src/storage/object_storage.rs`:
- Line 1090: The current tracing::info! call logs the entire
ACTIVE_OBJECT_STORE_SYNC_FILES HashSet each sync and is too verbose; change the
call to either tracing::debug! or tracing::trace! to reduce noise, or keep
info-level but log only the count (e.g., ACTIVE_OBJECT_STORE_SYNC_FILES.len())
and optionally include a small sample or summary instead of the full set; update
the
tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES)
statement accordingly to use the chosen level or to log the length/summary.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 21d7b0d1-8eae-45fc-be58-ff9f8d179a0e
📒 Files selected for processing (4)
src/parseable/staging/writer.rssrc/parseable/streams.rssrc/storage/object_storage.rssrc/sync.rs
Try to find more areas for optimization. - compress arrow files while writing (lz4_frame or zstd) - create new .part file based on size - one parquet per arrow file (converted in parallel) - separate runtimes to run ingestion tasks, sync and conversion tasks
22e172a to
5eb1677
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/parseable/streams.rs (2)
850-850:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSeparate parquet temp files from Arrow
.partrecovery.
write_parquet_part_file()now emits parquet temp output with the same.partextension thatrecover_orphan_part_files()treats as Arrow IPC. After an interrupted rename, startup will classify that parquet temp file as an invalid Arrow part and delete it instead of finalizing or explicitly retrying it. Use a distinct temp suffix or teach recovery to distinguish parquet temp files before deleting them.Also applies to: 1330-1404
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` at line 850, write_parquet_part_file() currently emits temporary parquet outputs using parquet_path.with_extension("part"), which collides with recover_orphan_part_files()’s Arrow IPC ".part" handling and causes parquet temp files to be misclassified and deleted on startup; change the temp suffix or add a discriminant so parquet temps are distinguishable (e.g., use a distinct extension like ".parquet.part" or add a magic header/filename prefix) and update recover_orphan_part_files() to recognize and either finalize or retry parquet temp files (or ignore them) instead of treating them as Arrow parts; locate and update the code paths in write_parquet_part_file() and recover_orphan_part_files() (also mirror the same fix in the analogous code around lines ~1330-1404) so both writers and the recovery routine agree on the temp-file format.
320-325:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPropagate parquet rename failures instead of logging and continuing.
If Line 862 fails, this closure still returns
Ok(Some(merged_schema)). That reports a successful conversion even though the arrow files were not cleaned up, and regular cycles only regroup the currentprocessing_<minute>directory, so the failed batch can sit in an olderprocessing_*directory until init/shutdown.Suggested fix
- if let Err(e) = std::fs::rename(&part_path, &parquet_path) { - error!( - "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" - ); - } else { - self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id); - } - Ok(Some(merged_schema)) + std::fs::rename(&part_path, &parquet_path).map_err(|e| { + error!( + "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" + ); + StagingError::ObjectStorage(std::io::Error::other(format!( + "failed to rename parquet temp file {} -> {}", + part_path.display(), + parquet_path.display(), + ))) + })?; + self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id); + Ok(Some(merged_schema))Based on learnings, the conversion flow relies on timestamped inprocess directories without a normal retry path, so rename failures need to stay visible.
Also applies to: 862-869
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` around lines 320 - 325, The parquet-file rename failures inside the closure used by group_inprocess_arrow_files / group_single_inprocess_arrow_files are currently logged and swallowed, causing the code path to still return Ok(Some(merged_schema)); change the closure so that any filesystem rename error is propagated as an Err (use the ? operator or return Err(anyhow::Error::from(e)) with context) instead of logging-and-continuing, so the calling functions group_inprocess_arrow_files and group_single_inprocess_arrow_files will surface the failure to the caller rather than report success.Source: Learnings
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)
2078-2193: ⚡ Quick winAdd a cross-tenant regression test for
Streams.These cases only cover the default tenant, but the backing map now keys streams by
tenant_id -> stream_name. Please add a same-name/different-tenant test and assertget_or_create,contains,list, anddeletestay isolated across tenants.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` around lines 2078 - 2193, Add a new test that exercises cross-tenant isolation: create an Arc<Streams> and two distinct tenant IDs (e.g., DEFAULT_TENANT and "other_tenant"), call Streams::get_or_create for the same stream_name under each tenant (use the get_or_create signature shown), assert the returned Arcs are different (not Arc::ptr_eq), then use the map access patterns (streams.read() / .contains_key on each tenant map), Streams::list (if available) to verify each tenant lists only its own stream, and Streams::delete (or the equivalent removal method) to delete the stream from one tenant and assert the other tenant’s stream still exists; reference the existing test helpers/variables like get_or_create, DEFAULT_TENANT, list, contains, and delete to locate where to add assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/parseable/streams.rs`:
- Line 850: write_parquet_part_file() currently emits temporary parquet outputs
using parquet_path.with_extension("part"), which collides with
recover_orphan_part_files()’s Arrow IPC ".part" handling and causes parquet temp
files to be misclassified and deleted on startup; change the temp suffix or add
a discriminant so parquet temps are distinguishable (e.g., use a distinct
extension like ".parquet.part" or add a magic header/filename prefix) and update
recover_orphan_part_files() to recognize and either finalize or retry parquet
temp files (or ignore them) instead of treating them as Arrow parts; locate and
update the code paths in write_parquet_part_file() and
recover_orphan_part_files() (also mirror the same fix in the analogous code
around lines ~1330-1404) so both writers and the recovery routine agree on the
temp-file format.
- Around line 320-325: The parquet-file rename failures inside the closure used
by group_inprocess_arrow_files / group_single_inprocess_arrow_files are
currently logged and swallowed, causing the code path to still return
Ok(Some(merged_schema)); change the closure so that any filesystem rename error
is propagated as an Err (use the ? operator or return
Err(anyhow::Error::from(e)) with context) instead of logging-and-continuing, so
the calling functions group_inprocess_arrow_files and
group_single_inprocess_arrow_files will surface the failure to the caller rather
than report success.
---
Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 2078-2193: Add a new test that exercises cross-tenant isolation:
create an Arc<Streams> and two distinct tenant IDs (e.g., DEFAULT_TENANT and
"other_tenant"), call Streams::get_or_create for the same stream_name under each
tenant (use the get_or_create signature shown), assert the returned Arcs are
different (not Arc::ptr_eq), then use the map access patterns (streams.read() /
.contains_key on each tenant map), Streams::list (if available) to verify each
tenant lists only its own stream, and Streams::delete (or the equivalent removal
method) to delete the stream from one tenant and assert the other tenant’s
stream still exists; reference the existing test helpers/variables like
get_or_create, DEFAULT_TENANT, list, contains, and delete to locate where to add
assertions.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0b0f9fdd-29df-4db7-b5c3-fa9fc1a11ec1
📒 Files selected for processing (1)
src/parseable/streams.rs
Trying to find more areas for optimization.
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Performance & Improvements