Remove async#1671
Conversation
- make MemWriter optional - remove unnecessary async
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
💤 Files with no reviewable changes (1)
✅ Files skipped from review due to trivial changes (1)
WalkthroughConvert ingestion flatten helpers from async to sync and run ingestion on Rayon threads synchronized back to async handlers via Tokio oneshot. Add ENABLE_MEMORY_STAGING flag to gate in-memory staging in streams, and refactor DiskWriter filename/extension creation and unconditional Arrow extension on rename. Minor comment fix in OTEL utils. ChangesStaging and Ingestion Pipeline Refactoring
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)
205-207: ⚡ Quick winConsider making
MemWriterstructurally optional to match the PR description.The PR description states "Makes MemWriter optional," but the current implementation still allocates
MemWriter<4096>in theWriterstruct (writer.rs:94-99) even whenENABLE_MEMORY_STAGINGis false. The feature flag only gates operations, wasting ~4KB per stream.For consistency with the PR description and to avoid unnecessary allocation, consider refactoring
Writer.memtoOption<MemWriter<4096>>and constructing it conditionally based on the flag.♻️ Proposed refactor to make MemWriter allocation optional
In
src/parseable/staging/writer.rs:#[derive(Default)] pub struct Writer { - pub mem: MemWriter<4096>, + pub mem: Option<MemWriter<4096>>, pub disk: HashMap<String, DiskWriter>, disk_pending: HashMap<String, PendingDiskBatch>, }Then update all call sites to unwrap or conditionally access the
Option. Alternatively, use a customDefaultimpl that checks the flag at construction time.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/parseable/streams.rs` around lines 205 - 207, The Writer currently always allocates a MemWriter<4096> despite ENABLE_MEMORY_STAGING; change Writer.mem to Option<MemWriter<4096>> and construct Some(...) only when ENABLE_MEMORY_STAGING is true (e.g., in Writer::new or Default impl), then update call sites such as the mem push in parseable::streams::guard (the guard.mem.push(schema_key, record)? call) and any others to handle the Option (either guard.mem.as_mut().map(|m| m.push(...)) or return early/no-op when None). Ensure types and imports reflect Option<MemWriter<4096>> and any unwraps are safe/guarded by ENABLE_MEMORY_STAGING checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/parseable/staging/writer.rs`:
- Around line 259-266: The clippy MSRV allowance is unnecessary and we need
tests for the ULID-in-filename behavior: remove the
#[allow(clippy::incompatible_msrv)] annotation around the path.add_extension
call in the ONE_PARQUET_PER_ARROW branch (leave the logic that sets
path.set_extension(Ulid::new().to_string()) and
path.add_extension(PART_FILE_EXTENSION) intact), and add a unit/integration test
exercising the writer behavior when ONE_PARQUET_PER_ARROW is enabled that
asserts an input like "file.data.arrows" becomes "file.data.<ulid>.parquet"
(verify the ULID segment is present and preserved through whatever function(s)
in writer.rs produce the final parquet path). Ensure the test references the
ONE_PARQUET_PER_ARROW flag and PART_FILE_EXTENSION so it fails if the naming
contract changes.
---
Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 205-207: The Writer currently always allocates a MemWriter<4096>
despite ENABLE_MEMORY_STAGING; change Writer.mem to Option<MemWriter<4096>> and
construct Some(...) only when ENABLE_MEMORY_STAGING is true (e.g., in
Writer::new or Default impl), then update call sites such as the mem push in
parseable::streams::guard (the guard.mem.push(schema_key, record)? call) and any
others to handle the Option (either guard.mem.as_mut().map(|m| m.push(...)) or
return early/no-op when None). Ensure types and imports reflect
Option<MemWriter<4096>> and any unwraps are safe/guarded by
ENABLE_MEMORY_STAGING checks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5417788f-f239-49ab-ada4-7627b14cf9b2
📒 Files selected for processing (5)
src/handlers/http/ingest.rssrc/handlers/http/kinesis.rssrc/handlers/http/modal/utils/ingest_utils.rssrc/parseable/staging/writer.rssrc/parseable/streams.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/handlers/http/ingest.rs (1)
87-91: 💤 Low valueSimplify redundant
map_or_elsetomap.The expression
.map_or_else(|| None, |h| Some(h.to_string()))is equivalent to.map(|h| h.to_string()).Suggested simplification
- let extract_log = req.headers().get(EXTRACT_LOG_KEY).and_then(|h| { - h.to_str() - .ok() - .map_or_else(|| None, |h| Some(h.to_string())) - }); + let extract_log = req + .headers() + .get(EXTRACT_LOG_KEY) + .and_then(|h| h.to_str().ok()) + .map(|h| h.to_string());🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/handlers/http/ingest.rs` around lines 87 - 91, The current header extraction for extract_log uses a verbose .map_or_else(|| None, |h| Some(h.to_string())); simplify it by replacing that map_or_else with .map(|h| h.to_string()) on the Option returned from headers().get(EXTRACT_LOG_KEY) to make the intent clearer and remove redundant closure wrapping; locate the extract_log assignment in the ingest handler where EXTRACT_LOG_KEY and req.headers().get(...) are used and update that expression accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/handlers/http/ingest.rs`:
- Around line 87-91: The current header extraction for extract_log uses a
verbose .map_or_else(|| None, |h| Some(h.to_string())); simplify it by replacing
that map_or_else with .map(|h| h.to_string()) on the Option returned from
headers().get(EXTRACT_LOG_KEY) to make the intent clearer and remove redundant
closure wrapping; locate the extract_log assignment in the ingest handler where
EXTRACT_LOG_KEY and req.headers().get(...) are used and update that expression
accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 3f3ac3e7-0872-46d4-ab25-e0d18509d996
📒 Files selected for processing (3)
src/handlers/http/ingest.rssrc/handlers/http/modal/utils/ingest_utils.rssrc/otel/otel_utils.rs
✅ Files skipped from review due to trivial changes (1)
- src/otel/otel_utils.rs
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Refactor
Documentation
Chores