Skip to content

Remove async#1671

Open
parmesant wants to merge 3 commits into
parseablehq:mainfrom
parmesant:ingestion-optimization
Open

Remove async#1671
parmesant wants to merge 3 commits into
parseablehq:mainfrom
parmesant:ingestion-optimization

Conversation

@parmesant

@parmesant parmesant commented Jun 9, 2026

Copy link
Copy Markdown
Contributor
  • make MemWriter optional
  • remove unnecessary async
  • spawn CPU intensive ingestion work on Rayon threadpool

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Added a configurable memory staging option to control in-memory buffering (enabled by default).
  • Refactor

    • Moved log ingestion work to background processing to improve throughput and responsiveness.
    • Optimized Kinesis and OTEL log processing for more efficient synchronous execution.
    • Revised staging file handling and naming to improve write/rename efficiency.
  • Documentation

    • Minor comment wording fix in OTEL utilities.
  • Chores

    • Bumped required Rust toolchain version.

- make MemWriter optional
- remove unnecessary async
@coderabbitai

coderabbitai Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 508d8b63-61cd-4c49-ac03-544ac950bd74

📥 Commits

Reviewing files that changed from the base of the PR and between 58aa5d2 and a92e2ee.

📒 Files selected for processing (2)
  • Cargo.toml
  • src/parseable/staging/writer.rs
💤 Files with no reviewable changes (1)
  • src/parseable/staging/writer.rs
✅ Files skipped from review due to trivial changes (1)
  • Cargo.toml

Walkthrough

Convert ingestion flatten helpers from async to sync and run ingestion on Rayon threads synchronized back to async handlers via Tokio oneshot. Add ENABLE_MEMORY_STAGING flag to gate in-memory staging in streams, and refactor DiskWriter filename/extension creation and unconditional Arrow extension on rename. Minor comment fix in OTEL utils.

Changes

Staging and Ingestion Pipeline Refactoring

Layer / File(s) Summary
Handler imports and header parsing updates
src/handlers/http/ingest.rs
Add Tokio oneshot imports and ingest helper wiring; change EXTRACT_LOG_KEY header parsing and pass extract_log.as_deref() for inline schema extraction; validate stream and fetch parseable stream before ingest.
Convert flatten functions to synchronous
src/handlers/http/kinesis.rs, src/handlers/http/modal/utils/ingest_utils.rs, src/handlers/http/ingest.rs
flatten_kinesis_logs and flatten_and_push_logs are made synchronous; handler call sites remove .await and Kinesis branch updated to call synchronously.
ingest_helper and OTEL request processing
src/handlers/http/modal/utils/ingest_utils.rs, src/handlers/http/ingest.rs
Add ingest_helper and process_otel_content helpers that spawn Rayon tasks and return results via Tokio oneshot; handlers spawn these and await the oneshot receiver.
Memory staging flag and gated Stream ops
src/parseable/staging/writer.rs, src/parseable/streams.rs
Add ENABLE_MEMORY_STAGING Lazy (default true) and conditionally enqueue into writer.mem, clone record batches, and clear writer.mem only when enabled.
DiskWriter filename and rename behavior
src/parseable/staging/writer.rs
When ONE_PARQUET_PER_ARROW is enabled, prepend a ULID-based extension before PART_FILE_EXTENSION at creation; on Drop always set ARROW_FILE_EXTENSION unconditionally.
Minor doc comment fix
src/otel/otel_utils.rs
Typo/copy fix in a traversal comment near collect_json_from_anyvalue (no behavior change).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

🐰 I hopped through logs with steady paws,
Offloaded work without a pause.
Memory staging gets a gentle gate,
Filenames tidy, extensions straight.
Rayon hums while oneshot sings— a rabbit refactor with tidy things!

🚥 Pre-merge checks | ✅ 2 | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description provides three key bullet points summarizing the changes, but largely omits the required template sections and leaves all validation checkboxes unchecked without explanation. Complete the description template by filling in specific details about testing performed, code comments added, and documentation updates. Check relevant boxes or explain why they are not applicable.
Docstring Coverage ⚠️ Warning Docstring coverage is 35.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Remove async' is vague and generic, using non-descriptive terminology that does not convey meaningful information about the specific changes made in the changeset. Replace with a more specific title that captures the main objective, such as 'Refactor ingestion to remove async and move CPU work to Rayon threadpool' or similar.
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

205-207: ⚡ Quick win

Consider making MemWriter structurally optional to match the PR description.

The PR description states "Makes MemWriter optional," but the current implementation still allocates MemWriter<4096> in the Writer struct (writer.rs:94-99) even when ENABLE_MEMORY_STAGING is false. The feature flag only gates operations, wasting ~4KB per stream.

For consistency with the PR description and to avoid unnecessary allocation, consider refactoring Writer.mem to Option<MemWriter<4096>> and constructing it conditionally based on the flag.

♻️ Proposed refactor to make MemWriter allocation optional

In src/parseable/staging/writer.rs:

 #[derive(Default)]
 pub struct Writer {
-    pub mem: MemWriter<4096>,
+    pub mem: Option<MemWriter<4096>>,
     pub disk: HashMap<String, DiskWriter>,
     disk_pending: HashMap<String, PendingDiskBatch>,
 }

Then update all call sites to unwrap or conditionally access the Option. Alternatively, use a custom Default impl that checks the flag at construction time.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` around lines 205 - 207, The Writer currently always
allocates a MemWriter<4096> despite ENABLE_MEMORY_STAGING; change Writer.mem to
Option<MemWriter<4096>> and construct Some(...) only when ENABLE_MEMORY_STAGING
is true (e.g., in Writer::new or Default impl), then update call sites such as
the mem push in parseable::streams::guard (the guard.mem.push(schema_key,
record)? call) and any others to handle the Option (either
guard.mem.as_mut().map(|m| m.push(...)) or return early/no-op when None). Ensure
types and imports reflect Option<MemWriter<4096>> and any unwraps are
safe/guarded by ENABLE_MEMORY_STAGING checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/parseable/staging/writer.rs`:
- Around line 259-266: The clippy MSRV allowance is unnecessary and we need
tests for the ULID-in-filename behavior: remove the
#[allow(clippy::incompatible_msrv)] annotation around the path.add_extension
call in the ONE_PARQUET_PER_ARROW branch (leave the logic that sets
path.set_extension(Ulid::new().to_string()) and
path.add_extension(PART_FILE_EXTENSION) intact), and add a unit/integration test
exercising the writer behavior when ONE_PARQUET_PER_ARROW is enabled that
asserts an input like "file.data.arrows" becomes "file.data.<ulid>.parquet"
(verify the ULID segment is present and preserved through whatever function(s)
in writer.rs produce the final parquet path). Ensure the test references the
ONE_PARQUET_PER_ARROW flag and PART_FILE_EXTENSION so it fails if the naming
contract changes.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 205-207: The Writer currently always allocates a MemWriter<4096>
despite ENABLE_MEMORY_STAGING; change Writer.mem to Option<MemWriter<4096>> and
construct Some(...) only when ENABLE_MEMORY_STAGING is true (e.g., in
Writer::new or Default impl), then update call sites such as the mem push in
parseable::streams::guard (the guard.mem.push(schema_key, record)? call) and any
others to handle the Option (either guard.mem.as_mut().map(|m| m.push(...)) or
return early/no-op when None). Ensure types and imports reflect
Option<MemWriter<4096>> and any unwraps are safe/guarded by
ENABLE_MEMORY_STAGING checks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5417788f-f239-49ab-ada4-7627b14cf9b2

📥 Commits

Reviewing files that changed from the base of the PR and between a5cc347 and ba7a17e.

📒 Files selected for processing (5)
  • src/handlers/http/ingest.rs
  • src/handlers/http/kinesis.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/parseable/staging/writer.rs
  • src/parseable/streams.rs

Comment thread src/parseable/staging/writer.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/handlers/http/ingest.rs (1)

87-91: 💤 Low value

Simplify redundant map_or_else to map.

The expression .map_or_else(|| None, |h| Some(h.to_string())) is equivalent to .map(|h| h.to_string()).

Suggested simplification
-    let extract_log = req.headers().get(EXTRACT_LOG_KEY).and_then(|h| {
-        h.to_str()
-            .ok()
-            .map_or_else(|| None, |h| Some(h.to_string()))
-    });
+    let extract_log = req
+        .headers()
+        .get(EXTRACT_LOG_KEY)
+        .and_then(|h| h.to_str().ok())
+        .map(|h| h.to_string());
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/handlers/http/ingest.rs` around lines 87 - 91, The current header
extraction for extract_log uses a verbose .map_or_else(|| None, |h|
Some(h.to_string())); simplify it by replacing that map_or_else with .map(|h|
h.to_string()) on the Option returned from headers().get(EXTRACT_LOG_KEY) to
make the intent clearer and remove redundant closure wrapping; locate the
extract_log assignment in the ingest handler where EXTRACT_LOG_KEY and
req.headers().get(...) are used and update that expression accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/handlers/http/ingest.rs`:
- Around line 87-91: The current header extraction for extract_log uses a
verbose .map_or_else(|| None, |h| Some(h.to_string())); simplify it by replacing
that map_or_else with .map(|h| h.to_string()) on the Option returned from
headers().get(EXTRACT_LOG_KEY) to make the intent clearer and remove redundant
closure wrapping; locate the extract_log assignment in the ingest handler where
EXTRACT_LOG_KEY and req.headers().get(...) are used and update that expression
accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 3f3ac3e7-0872-46d4-ab25-e0d18509d996

📥 Commits

Reviewing files that changed from the base of the PR and between ba7a17e and 58aa5d2.

📒 Files selected for processing (3)
  • src/handlers/http/ingest.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/otel/otel_utils.rs
✅ Files skipped from review due to trivial changes (1)
  • src/otel/otel_utils.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant