Skip to content

Further optimizations for ingestion flow#1668

Merged
nikhilsinhaparseable merged 3 commits into
parseablehq:mainfrom
parmesant:investigate-ingestion
Jun 8, 2026
Merged

Further optimizations for ingestion flow#1668
nikhilsinhaparseable merged 3 commits into
parseablehq:mainfrom
parmesant:investigate-ingestion

Conversation

@parmesant

@parmesant parmesant commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Trying to find more areas for optimization.

  • compress arrow files while writing (lz4_frame or zstd)
  • create new .part file based on size
  • one parquet per arrow file (converted in parallel)
  • separate runtimes to run ingestion tasks, sync and conversion tasks

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • LZ4 compression for staging data.
    • Optional per-file naming/extension behavior when enabled.
  • Performance & Improvements

    • Parallelized Parquet conversion with safer rename semantics (cleanup runs after successful rename).
    • Cached hostname for consistent filenames.
    • Reduced in-memory buffer and size-based eviction/flush of staging writers.
    • Improved runtime scheduling for staging/uploads and added staging diagnostics/logging.

@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

Adds a lazy Tokio runtime for flush/convert work; DiskWriter tracks accumulated bytes and evicts when ARROW_FLUSH_SIZE_LIMIT is reached using LZ4 IPC options; hostname caching; parallel Arrow→Parquet conversion with Rayon; and routes flush/convert and upload tasks onto the new runtime.

Changes

Staging Writer Optimization and Conversion

Layer / File(s) Summary
Flush-Convert Runtime Infrastructure
src/sync.rs
Adds FLUSH_AND_CONVERT_RUNTIME, a process-wide lazy Tokio runtime for dispatching flush/convert and upload tasks.
DiskWriter Size Tracking and Flush Threshold
src/parseable/staging/writer.rs
DiskWriter adds size: usize, write returns accumulated size, try_new uses StreamWriter::try_new_with_options with IpcWriteOptions + CompressionType::LZ4_FRAME, Drop conditionally renames output based on ONE_PARQUET_PER_ARROW, ARROW_FLUSH_SIZE_LIMIT added, and Writer's MemWriter capacity reduced from 16384 to 4096.
Streams: Hostname Cache and Parquet Conversion
src/parseable/streams.rs
Introduces HOSTNAME: OnceCell<String>; filename_by_partition uses cached hostname; convert_disk_files_to_parquet parallelized with Rayon to write .part parquet files and rename on success, skipping cleanup on failure; logging/timing adjusted for user streams.
Runtime-based Task Dispatch and Object Storage Scheduling
src/parseable/streams.rs, src/storage/object_storage.rs
Per-stream flush/convert and per-file uploads are dispatched onto FLUSH_AND_CONVERT_RUNTIME via spawn_blocking_on / spawn_on. process_parquet_files now collects parquet-paths sequentially, extends ACTIVE_OBJECT_STORE_SYNC_FILES, and logs the set.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

Poem

🐰 I nibble bytes and keep their score,
LZ4 hums while batches pour,
Hostname snug in a burrowed chest,
Parquet hops out after a careful rest,
A runtime whistles — staging's best.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description identifies key optimization areas but lacks critical details: no specific issue is referenced, testing checklist items are unchecked, and documentation/comment sections remain unfilled despite substantial code changes. Complete the checklist items (testing, comments, documentation), reference the actual issue if one exists, and provide detailed rationale for each optimization choice (e.g., why LZ4 over zstd, size thresholds).
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Further optimizations for ingestion flow' is clearly related to the main changes in the PR, which include Arrow compression, file rotation, parallel conversion, and separate runtimes for optimization purposes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (5)
src/parseable/staging/writer.rs (2)

36-36: 💤 Low value

Unused import: SizedMessage appears to not be used in this file.

This import was added but doesn't seem to be referenced anywhere in the file. Consider removing it if it's not needed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/staging/writer.rs` at line 36, Unused import SizedMessage in
src/parseable/staging/writer.rs: remove the unused line referencing SizedMessage
(use datafusion::physical_plan::buffer::SizedMessage;) from the top of
writer.rs; if you intended to use SizedMessage, instead reference it where
needed in the relevant function or scope (e.g., wherever buffering/physical_plan
buffer handling is implemented) or add a purposeful usage, otherwise simply
delete the import to fix the unused-import warning.

266-270: 💤 Low value

Size tracking uses in-memory batch size, not compressed IPC size.

rb.size() returns the in-memory Arrow RecordBatch size, but the actual bytes written to the IPC stream will differ due to LZ4 compression. This means ARROW_FLUSH_SIZE_LIMIT effectively controls in-memory accumulation rather than on-disk file size. If this approximation is intentional for simplicity, a brief comment would help future readers understand the design choice.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/staging/writer.rs` around lines 266 - 270, The current write
method in write(&mut self, rb: &RecordBatch) updates self.size using rb.size(),
which measures in-memory Arrow size not the compressed IPC bytes actually
written by self.inner.write; either (A) replace the size update with the actual
number of bytes written by serializing the RecordBatch to an IPC buffer using
the same compression settings as self.inner (e.g., serialize via the same IPC
writer/stream code used by self.inner and add the resulting byte length to
self.size) or (B) if you intentionally accept the approximation, add a short
clarifying comment above write (referencing write, self.size, rb.size(), and
self.inner.write) explaining that rb.size() is an in-memory estimate and does
not reflect LZ4-compressed IPC size. Make only one of these changes.
src/storage/object_storage.rs (1)

1090-1090: ⚡ Quick win

Consider reducing log level for active files set.

This info! log will output the entire ACTIVE_OBJECT_STORE_SYNC_FILES HashSet on every sync cycle, which could be verbose in production when many files are being tracked. Consider using trace! or debug! level instead, or only logging the count of active files at info! level.

📝 Suggested change
-        tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES);
+        tracing::debug!(ACTIVE_OBJECT_STORE_SYNC_FILES=?guard);

Or for info-level with just the count:

-        tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES);
+        tracing::info!(active_sync_files_count = guard.len());
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/object_storage.rs` at line 1090, The current tracing::info! call
logs the entire ACTIVE_OBJECT_STORE_SYNC_FILES HashSet each sync and is too
verbose; change the call to either tracing::debug! or tracing::trace! to reduce
noise, or keep info-level but log only the count (e.g.,
ACTIVE_OBJECT_STORE_SYNC_FILES.len()) and optionally include a small sample or
summary instead of the full set; update the
tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES)
statement accordingly to use the chosen level or to log the length/summary.
src/parseable/streams.rs (2)

838-890: 💤 Low value

Parallel conversion looks correct, but consider error handling strategy.

The rayon parallel iteration collects all results and then processes them sequentially to extract schemas and propagate the first error. This is a valid approach, though it means all parallel tasks complete before any error is surfaced. This matches the previous sequential behavior where all files would be attempted.

One observation: if multiple parquet conversions fail, only the first error is returned while the rest are silently discarded. This is acceptable for simplicity, but logging all errors before returning the first might help debugging batch failures.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` around lines 838 - 890, The parallel map collects
results into _schemas but only returns the first error while discarding others;
update the post-collection handling of _schemas (from
staging_files.into_par_iter() ... .collect()) to log all Err(e) entries
(including context like parquet_path / arrow_files where available) before
returning the first Err, or aggregate errors into a single diagnostic message;
specifically adjust the loop that matches res in _schemas (and the rename error
logging near write_parquet_part_file and std::fs::rename) to emit error logs for
every Err variant encountered and then return the first Err rather than silently
dropping subsequent errors.

847-847: 💤 Low value

Remove dead commented code.

This commented-out line // schemas.push(merged_schema.clone()); appears to be leftover from the refactoring. It should be removed to keep the code clean.

🧹 Suggested cleanup
                     let props =
                         self.parquet_writer_props(&merged_schema, time_partition, custom_partition);
-                    // schemas.push(merged_schema.clone());
                     let schema = Arc::new(merged_schema.clone());
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` at line 847, Remove the dead commented-out line "//
schemas.push(merged_schema.clone());" from the code to clean up leftover
refactor noise; locate the block where schemas and merged_schema are handled
(references to schemas and merged_schema in streams.rs) and delete that
commented line so only active code remains.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/parseable/staging/writer.rs`:
- Line 36: Unused import SizedMessage in src/parseable/staging/writer.rs: remove
the unused line referencing SizedMessage (use
datafusion::physical_plan::buffer::SizedMessage;) from the top of writer.rs; if
you intended to use SizedMessage, instead reference it where needed in the
relevant function or scope (e.g., wherever buffering/physical_plan buffer
handling is implemented) or add a purposeful usage, otherwise simply delete the
import to fix the unused-import warning.
- Around line 266-270: The current write method in write(&mut self, rb:
&RecordBatch) updates self.size using rb.size(), which measures in-memory Arrow
size not the compressed IPC bytes actually written by self.inner.write; either
(A) replace the size update with the actual number of bytes written by
serializing the RecordBatch to an IPC buffer using the same compression settings
as self.inner (e.g., serialize via the same IPC writer/stream code used by
self.inner and add the resulting byte length to self.size) or (B) if you
intentionally accept the approximation, add a short clarifying comment above
write (referencing write, self.size, rb.size(), and self.inner.write) explaining
that rb.size() is an in-memory estimate and does not reflect LZ4-compressed IPC
size. Make only one of these changes.

In `@src/parseable/streams.rs`:
- Around line 838-890: The parallel map collects results into _schemas but only
returns the first error while discarding others; update the post-collection
handling of _schemas (from staging_files.into_par_iter() ... .collect()) to log
all Err(e) entries (including context like parquet_path / arrow_files where
available) before returning the first Err, or aggregate errors into a single
diagnostic message; specifically adjust the loop that matches res in _schemas
(and the rename error logging near write_parquet_part_file and std::fs::rename)
to emit error logs for every Err variant encountered and then return the first
Err rather than silently dropping subsequent errors.
- Line 847: Remove the dead commented-out line "//
schemas.push(merged_schema.clone());" from the code to clean up leftover
refactor noise; locate the block where schemas and merged_schema are handled
(references to schemas and merged_schema in streams.rs) and delete that
commented line so only active code remains.

In `@src/storage/object_storage.rs`:
- Line 1090: The current tracing::info! call logs the entire
ACTIVE_OBJECT_STORE_SYNC_FILES HashSet each sync and is too verbose; change the
call to either tracing::debug! or tracing::trace! to reduce noise, or keep
info-level but log only the count (e.g., ACTIVE_OBJECT_STORE_SYNC_FILES.len())
and optionally include a small sample or summary instead of the full set; update
the
tracing::info!(ACTIVE_OBJECT_STORE_SYNC_FILES=?ACTIVE_OBJECT_STORE_SYNC_FILES)
statement accordingly to use the chosen level or to log the length/summary.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 21d7b0d1-8eae-45fc-be58-ff9f8d179a0e

📥 Commits

Reviewing files that changed from the base of the PR and between 515f0cf and bd00a59.

📒 Files selected for processing (4)
  • src/parseable/staging/writer.rs
  • src/parseable/streams.rs
  • src/storage/object_storage.rs
  • src/sync.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 8, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 8, 2026
parmesant added 2 commits June 7, 2026 23:42
Try to find more areas for optimization.
- compress arrow files while writing (lz4_frame or zstd)
- create new .part file based on size
- one parquet per arrow file (converted in parallel)
- separate runtimes to run ingestion tasks, sync and conversion tasks
@parmesant parmesant force-pushed the investigate-ingestion branch from 22e172a to 5eb1677 Compare June 8, 2026 06:42

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/parseable/streams.rs (2)

850-850: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Separate parquet temp files from Arrow .part recovery.

write_parquet_part_file() now emits parquet temp output with the same .part extension that recover_orphan_part_files() treats as Arrow IPC. After an interrupted rename, startup will classify that parquet temp file as an invalid Arrow part and delete it instead of finalizing or explicitly retrying it. Use a distinct temp suffix or teach recovery to distinguish parquet temp files before deleting them.

Also applies to: 1330-1404

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` at line 850, write_parquet_part_file() currently
emits temporary parquet outputs using parquet_path.with_extension("part"), which
collides with recover_orphan_part_files()’s Arrow IPC ".part" handling and
causes parquet temp files to be misclassified and deleted on startup; change the
temp suffix or add a discriminant so parquet temps are distinguishable (e.g.,
use a distinct extension like ".parquet.part" or add a magic header/filename
prefix) and update recover_orphan_part_files() to recognize and either finalize
or retry parquet temp files (or ignore them) instead of treating them as Arrow
parts; locate and update the code paths in write_parquet_part_file() and
recover_orphan_part_files() (also mirror the same fix in the analogous code
around lines ~1330-1404) so both writers and the recovery routine agree on the
temp-file format.

320-325: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Propagate parquet rename failures instead of logging and continuing.

If Line 862 fails, this closure still returns Ok(Some(merged_schema)). That reports a successful conversion even though the arrow files were not cleaned up, and regular cycles only regroup the current processing_<minute> directory, so the failed batch can sit in an older processing_* directory until init/shutdown.

Suggested fix
-                    if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
-                    error!(
-                        "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
-                    );
-                } else {
-                    self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id);
-                }
-                Ok(Some(merged_schema))
+                    std::fs::rename(&part_path, &parquet_path).map_err(|e| {
+                        error!(
+                            "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
+                        );
+                        StagingError::ObjectStorage(std::io::Error::other(format!(
+                            "failed to rename parquet temp file {} -> {}",
+                            part_path.display(),
+                            parquet_path.display(),
+                        )))
+                    })?;
+                    self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id);
+                    Ok(Some(merged_schema))

Based on learnings, the conversion flow relies on timestamped inprocess directories without a normal retry path, so rename failures need to stay visible.

Also applies to: 862-869

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` around lines 320 - 325, The parquet-file rename
failures inside the closure used by group_inprocess_arrow_files /
group_single_inprocess_arrow_files are currently logged and swallowed, causing
the code path to still return Ok(Some(merged_schema)); change the closure so
that any filesystem rename error is propagated as an Err (use the ? operator or
return Err(anyhow::Error::from(e)) with context) instead of
logging-and-continuing, so the calling functions group_inprocess_arrow_files and
group_single_inprocess_arrow_files will surface the failure to the caller rather
than report success.

Source: Learnings

🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

2078-2193: ⚡ Quick win

Add a cross-tenant regression test for Streams.

These cases only cover the default tenant, but the backing map now keys streams by tenant_id -> stream_name. Please add a same-name/different-tenant test and assert get_or_create, contains, list, and delete stay isolated across tenants.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` around lines 2078 - 2193, Add a new test that
exercises cross-tenant isolation: create an Arc<Streams> and two distinct tenant
IDs (e.g., DEFAULT_TENANT and "other_tenant"), call Streams::get_or_create for
the same stream_name under each tenant (use the get_or_create signature shown),
assert the returned Arcs are different (not Arc::ptr_eq), then use the map
access patterns (streams.read() / .contains_key on each tenant map),
Streams::list (if available) to verify each tenant lists only its own stream,
and Streams::delete (or the equivalent removal method) to delete the stream from
one tenant and assert the other tenant’s stream still exists; reference the
existing test helpers/variables like get_or_create, DEFAULT_TENANT, list,
contains, and delete to locate where to add assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/parseable/streams.rs`:
- Line 850: write_parquet_part_file() currently emits temporary parquet outputs
using parquet_path.with_extension("part"), which collides with
recover_orphan_part_files()’s Arrow IPC ".part" handling and causes parquet temp
files to be misclassified and deleted on startup; change the temp suffix or add
a discriminant so parquet temps are distinguishable (e.g., use a distinct
extension like ".parquet.part" or add a magic header/filename prefix) and update
recover_orphan_part_files() to recognize and either finalize or retry parquet
temp files (or ignore them) instead of treating them as Arrow parts; locate and
update the code paths in write_parquet_part_file() and
recover_orphan_part_files() (also mirror the same fix in the analogous code
around lines ~1330-1404) so both writers and the recovery routine agree on the
temp-file format.
- Around line 320-325: The parquet-file rename failures inside the closure used
by group_inprocess_arrow_files / group_single_inprocess_arrow_files are
currently logged and swallowed, causing the code path to still return
Ok(Some(merged_schema)); change the closure so that any filesystem rename error
is propagated as an Err (use the ? operator or return
Err(anyhow::Error::from(e)) with context) instead of logging-and-continuing, so
the calling functions group_inprocess_arrow_files and
group_single_inprocess_arrow_files will surface the failure to the caller rather
than report success.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 2078-2193: Add a new test that exercises cross-tenant isolation:
create an Arc<Streams> and two distinct tenant IDs (e.g., DEFAULT_TENANT and
"other_tenant"), call Streams::get_or_create for the same stream_name under each
tenant (use the get_or_create signature shown), assert the returned Arcs are
different (not Arc::ptr_eq), then use the map access patterns (streams.read() /
.contains_key on each tenant map), Streams::list (if available) to verify each
tenant lists only its own stream, and Streams::delete (or the equivalent removal
method) to delete the stream from one tenant and assert the other tenant’s
stream still exists; reference the existing test helpers/variables like
get_or_create, DEFAULT_TENANT, list, contains, and delete to locate where to add
assertions.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 0b0f9fdd-29df-4db7-b5c3-fa9fc1a11ec1

📥 Commits

Reviewing files that changed from the base of the PR and between 5eb1677 and 33b5418.

📒 Files selected for processing (1)
  • src/parseable/streams.rs

@nikhilsinhaparseable nikhilsinhaparseable merged commit a5cc347 into parseablehq:main Jun 8, 2026
12 checks passed
@parmesant parmesant deleted the investigate-ingestion branch June 8, 2026 08:27
@coderabbitai coderabbitai Bot mentioned this pull request Jun 9, 2026
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants