Skip to content

fix: cap poller batch size at config limit and make cleanup/requestShutdown optional#622

Open
jumski wants to merge 1 commit into
feat/slot-aware-backpressurefrom
fix/slot-backpressure-review-fixes
Open

fix: cap poller batch size at config limit and make cleanup/requestShutdown optional#622
jumski wants to merge 1 commit into
feat/slot-aware-backpressurefrom
fix/slot-backpressure-review-fixes

Conversation

@jumski
Copy link
Copy Markdown
Contributor

@jumski jumski commented May 28, 2026

The Worker class previously held a direct reference to a postgres.Sql connection and always closed it on shutdown. This coupling meant the worker unconditionally owned the SQL connection regardless of whether it was externally provided.

This change introduces a WorkerOptions interface that replaces the positional requestShutdown and sql constructor arguments. The worker now accepts an optional cleanup callback instead of a SQL instance directly, and only invokes it if one is provided. Both createFlowWorker and createQueueWorker track whether they created the SQL connection themselves (ownsSql), and only pass a cleanup callback that closes the connection when they own it. When a SQL connection is passed in via config, the worker leaves lifecycle management of that connection to the caller.

requestShutdown on PlatformAdapter is now optional, and both worker factories use optional chaining when binding it. SupabasePlatformAdapter.stopWorker wraps the worker stop call in a try/finally block so the platform-owned SQL connection is always closed even if the worker stop throws.

The batch size limit parameter in ReadWithPollPoller and StepTaskPoller is now capped at the configured batchSize using Math.min, preventing a caller from requesting more messages than the poller is configured to handle.

New tests cover the cleanup callback invocation on worker stop, the ownsSql-conditional SQL teardown behavior, the sql.end guarantee when worker stop rejects, the optional requestShutdown type contract, and the corrected batch size capping logic in both poller implementations.

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 28, 2026

⚠️ No Changeset found

Latest commit: dc209fb

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Copy Markdown
Contributor Author

jumski commented May 28, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@nx-cloud
Copy link
Copy Markdown

nx-cloud Bot commented May 28, 2026

View your CI Pipeline Execution ↗ for commit dc209fb

Command Status Duration Result
nx run edge-worker:test:integration ✅ Succeeded 4m 31s View ↗
nx run edge-worker:e2e ✅ Succeeded 46s View ↗
nx affected -t verify-exports --base=origin/mai... ✅ Succeeded <1s View ↗
nx affected -t build --configuration=production... ✅ Succeeded <1s View ↗
nx affected -t lint typecheck test --parallel -... ✅ Succeeded 20s View ↗

☁️ Nx Cloud last updated this comment at 2026-05-28 10:42:34 UTC

@jumski jumski deployed to preview May 28, 2026 10:42 — with GitHub Actions Active
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant