From 1f8d17cb7f9e683b5914475778b1c786b2076880 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 07:58:55 -0700 Subject: [PATCH 1/9] PLT-458: open-loop scheduler (replace closed-loop dequeue) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make transaction arrival open-loop to fix coordinated omission: tx i is issued at t₀ + i/λ independent of in-flight completion, so a slow SUT no longer slows the generator and hides backlog in latency. - sender/scheduler.go: openLoopScheduler owns t₀ and the monotonic sequence index i, derives λ from the shared rate.Limiter as a clock source (sampled per tick to honor a ramping λ; telescopes to t₀ + i/λ at fixed λ), and stamps IntendedSendTime at the true scheduled instant. - Overflow is bounded-in-flight + drop-and-count: a non-blocking semaphore TryAcquire admits the tx or drops-and-counts it; the arrival clock is never blocked on capacity (REL8/REL9 load shedding). - One rate authority preserved: the ramper still drives λ via limiter.SetLimit; the worker's busy-spin Allow() gate is replaced by a blocking Wait (closed-loop only) and disabled under open-loop. - Behind config flag arrival-model (default closed_loop, the regression baseline) + max-in-flight; arrival_model and run_txs_dropped_total are recorded at run end. - types.LoadTx gains SequenceIndex (scheduler-owned, single-write per the documented concurrency contract) for PLT-463 schedule-lag attribution; dropped txs carry zero InclusionTime and are kept out of inclusion-rate denominators. Tests: schedule-accuracy (tracks t₀ + i/λ within tolerance), clock not throttled by a slow sender (overrun dropped not blocked), ramped-λ gap shrink, and stamp-before-handoff under -race. go build + go test -race green. Co-Authored-By: Claude Opus 4.8 (1M context) --- config/settings.go | 23 +++++ config/settings_test.go | 4 + main.go | 27 ++++- sender/dispatcher.go | 102 +++++++++++++++++- sender/scheduler.go | 131 +++++++++++++++++++++++ sender/scheduler_test.go | 218 +++++++++++++++++++++++++++++++++++++++ sender/sharded_sender.go | 11 ++ sender/worker.go | 22 +++- stats/metrics.go | 5 + stats/run_summary.go | 19 +++- types/scenario.go | 35 +++++-- utils/semaphore.go | 13 +++ 12 files changed, 588 insertions(+), 22 deletions(-) create mode 100644 sender/scheduler.go create mode 100644 sender/scheduler_test.go diff --git a/config/settings.go b/config/settings.go index 5f31dcd..3cb9834 100644 --- a/config/settings.go +++ b/config/settings.go @@ -28,8 +28,23 @@ type Settings struct { TargetGas uint64 `json:"targetGas,omitempty"` NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"` PostSummaryFlushDelay Duration `json:"postSummaryFlushDelay,omitempty"` + // ArrivalModel selects the transaction arrival model: "open_loop" schedules + // tx i at t₀ + i/λ independent of sender availability (the + // coordinated-omission fix), "closed_loop" (default) keeps the legacy + // generate-then-send lockstep as the regression baseline. + ArrivalModel string `json:"arrivalModel,omitempty"` + // MaxInFlight bounds concurrent in-flight sends in the open-loop model; + // txs that would exceed it at their scheduled instant are dropped and + // counted rather than throttling the arrival clock. Ignored in closed-loop. + MaxInFlight int `json:"maxInFlight,omitempty"` } +// Arrival model identifiers for the ArrivalModel setting. +const ( + ArrivalModelClosedLoop = "closed_loop" + ArrivalModelOpenLoop = "open_loop" +) + // DefaultSettings returns the default configuration values func DefaultSettings() Settings { return Settings{ @@ -49,6 +64,8 @@ func DefaultSettings() Settings { TargetGas: 10_000_000, NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), + ArrivalModel: ArrivalModelClosedLoop, + MaxInFlight: 10_000, } } @@ -72,6 +89,8 @@ func InitializeViper(cmd *cobra.Command) error { "targetGas": "target-gas", "numBlocksToWrite": "num-blocks-to-write", "postSummaryFlushDelay": "post-summary-flush-delay", + "arrivalModel": "arrival-model", + "maxInFlight": "max-in-flight", } for viperKey, flagName := range flagBindings { @@ -98,6 +117,8 @@ func InitializeViper(cmd *cobra.Command) error { viper.SetDefault("targetGas", defaults.TargetGas) viper.SetDefault("numBlocksToWrite", defaults.NumBlocksToWrite) viper.SetDefault("postSummaryFlushDelay", defaults.PostSummaryFlushDelay.ToDuration()) + viper.SetDefault("arrivalModel", defaults.ArrivalModel) + viper.SetDefault("maxInFlight", defaults.MaxInFlight) return nil } @@ -140,5 +161,7 @@ func ResolveSettings() Settings { TargetGas: viper.GetUint64("targetGas"), NumBlocksToWrite: viper.GetInt("numBlocksToWrite"), PostSummaryFlushDelay: Duration(viper.GetDuration("postSummaryFlushDelay")), + ArrivalModel: viper.GetString("arrivalModel"), + MaxInFlight: viper.GetInt("maxInFlight"), } } diff --git a/config/settings_test.go b/config/settings_test.go index 8773159..29e6a03 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -97,6 +97,8 @@ func TestArgumentPrecedence(t *testing.T) { cmd.Flags().Uint64("target-gas", 0, "Target gas") cmd.Flags().Int("num-blocks-to-write", 0, "Number of blocks to write") cmd.Flags().Duration("post-summary-flush-delay", 0, "Post-summary flush delay") + cmd.Flags().String("arrival-model", "", "Arrival model") + cmd.Flags().Int("max-in-flight", 0, "Max in-flight") // Parse CLI args if len(tt.cliArgs) > 0 { @@ -141,6 +143,8 @@ func TestDefaultSettings(t *testing.T) { TargetGas: 10_000_000, NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), + ArrivalModel: ArrivalModelClosedLoop, + MaxInFlight: 10_000, } if defaults != expected { diff --git a/main.go b/main.go index e89257e..2fae33f 100644 --- a/main.go +++ b/main.go @@ -73,6 +73,8 @@ func init() { rootCmd.Flags().Int("num-blocks-to-write", 100, "Number of blocks to write") rootCmd.Flags().Duration("post-summary-flush-delay", 25*time.Second, "In-process delay after run-summary metrics are recorded, allowing Prometheus to scrape them before exit") rootCmd.Flags().Duration("duration", 0, "Run duration (0 = until SIGTERM/SIGINT)") + rootCmd.Flags().String("arrival-model", config.ArrivalModelClosedLoop, "Transaction arrival model: open_loop (schedule t0+i/lambda, drop on overrun) or closed_loop (legacy generate-then-send)") + rootCmd.Flags().Int("max-in-flight", 10_000, "Open-loop only: max concurrent in-flight sends before overdue txs are dropped") // Initialize Viper with proper error handling if err := config.InitializeViper(rootCmd); err != nil { @@ -205,6 +207,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { collector := stats.NewCollector() logger := stats.NewLogger(collector, settings.StatsInterval.ToDuration(), settings.ReportPath, settings.Debug) var ramper *sender.Ramper + var dispatcher *sender.Dispatcher err = service.Run(ctx, func(ctx context.Context, s service.Scope) error { // Create the generator from the config struct @@ -287,7 +290,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { } // Create dispatcher - var dispatcher *sender.Dispatcher if settings.TxsDir != "" { // get latest height ethclient, err := ethclient.Dial(cfg.Endpoints[0]) @@ -310,6 +312,19 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Set statistics collector for dispatcher dispatcher.SetStatsCollector(collector) + // Open-loop arrival: the scheduler owns the rate (via the shared + // limiter, which the ramper still drives) and drops on overrun, so the + // workers must not also rate-limit. Only applies to the live-send path; + // the txs writer path has no arrival clock. + openLoop := settings.ArrivalModel == config.ArrivalModelOpenLoop + if openLoop && settings.TxsDir == "" { + dispatcher.SetOpenLoop(sharedLimiter, settings.MaxInFlight) + snd.SetRateLimited(false) + log.Printf("📤 Arrival model: open_loop (max in-flight: %d)", settings.MaxInFlight) + } else { + log.Printf("📤 Arrival model: closed_loop") + } + // Set up prewarming if enabled if settings.Prewarm { log.Printf("🔥 Creating prewarm generator...") @@ -373,7 +388,15 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if settings.RampUp && ramper != nil { ramper.LogFinalStats() } - collector.EmitRunSummary(ctx) + summary := stats.RunSummary{ArrivalModel: config.ArrivalModelClosedLoop} + if dispatcher != nil { + summary.ArrivalModel = string(dispatcher.ArrivalModel()) + summary.Dropped = dispatcher.GetStats().Dropped + if summary.Dropped > 0 { + log.Printf("⚠️ Open-loop dropped %d txs (in-flight saturated; not throttled)", summary.Dropped) + } + } + collector.EmitRunSummary(ctx, summary) if d := settings.PostSummaryFlushDelay.ToDuration(); d > 0 { log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d) time.Sleep(d) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index b3614c9..f377871 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -7,9 +7,26 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" +) + +// ArrivalModel selects how the dispatcher times transaction arrival. +type ArrivalModel string + +const ( + // ArrivalClosedLoop is the legacy model: a tx is generated and sent only + // when a sender is free, so a slow SUT slows the generator (coordinated + // omission). Kept reachable as the regression baseline. + ArrivalClosedLoop ArrivalModel = "closed_loop" + // ArrivalOpenLoop schedules tx i at t₀ + i/λ independent of sender + // availability; overdue txs are dropped and counted. See scheduler.go. + ArrivalOpenLoop ArrivalModel = "open_loop" ) // Dispatcher continuously generates transactions and dispatches them to the sender @@ -18,18 +35,48 @@ type Dispatcher struct { prewarmGen utils.Option[generator.Generator] // Optional prewarm generator sender TxSender + // Open-loop arrival configuration. arrivalModel defaults to closed-loop; + // limiter and maxInFlight are only consulted in open-loop mode. + arrivalModel ArrivalModel + limiter *rate.Limiter + maxInFlight int + // Statistics totalSent uint64 + dropped uint64 mu sync.RWMutex collector *stats.Collector } -// NewDispatcher creates a new dispatcher +// NewDispatcher creates a new dispatcher in the legacy closed-loop arrival model. func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { return &Dispatcher{ - generator: gen, - sender: sender, + generator: gen, + sender: sender, + arrivalModel: ArrivalClosedLoop, + } +} + +// SetOpenLoop switches the dispatcher to the open-loop arrival model, driven by +// the shared limiter (the one rate authority, also driven by the ramper) and +// bounded by maxInFlight concurrent sends. A non-positive maxInFlight is treated +// as 1 so admission control is always live. +func (d *Dispatcher) SetOpenLoop(limiter *rate.Limiter, maxInFlight int) { + if maxInFlight < 1 { + maxInFlight = 1 } + d.mu.Lock() + defer d.mu.Unlock() + d.arrivalModel = ArrivalOpenLoop + d.limiter = limiter + d.maxInFlight = maxInFlight +} + +// ArrivalModel reports the configured arrival model (for recording/reporting). +func (d *Dispatcher) ArrivalModel() ArrivalModel { + d.mu.RLock() + defer d.mu.RUnlock() + return d.arrivalModel } // SetStatsCollector sets the statistics collector for this dispatcher @@ -86,8 +133,18 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { return nil } -// Start begins the dispatcher's transaction generation and sending loop +// Run begins the dispatcher's transaction generation and sending loop, using +// the configured arrival model. func (d *Dispatcher) Run(ctx context.Context) error { + if d.ArrivalModel() == ArrivalOpenLoop { + return d.runOpenLoop(ctx) + } + return d.runClosedLoop(ctx) +} + +// runClosedLoop is the legacy model: generate-then-send in lockstep, so a slow +// SUT back-pressures the generator. Kept as the regression baseline. +func (d *Dispatcher) runClosedLoop(ctx context.Context) error { for ctx.Err() == nil { // Generate a transaction from main generator tx, ok := d.generator.Generate() @@ -98,6 +155,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Stamp before hand-off: the dispatcher is sole owner here (tx just // returned by the generator, not yet enqueued), so this write is race-free. + // This is the back-pressured enqueue time, not a true schedule instant. tx.IntendedSendTime = time.Now() // Send the transaction @@ -111,6 +169,38 @@ func (d *Dispatcher) Run(ctx context.Context) error { return ctx.Err() } +// runOpenLoop drives the open-loop scheduler (see scheduler.go), which owns the +// arrival clock (t₀, sequence index i) and the in-flight bound. Send tasks are +// spawned into a scope so they all complete on shutdown. +func (d *Dispatcher) runOpenLoop(ctx context.Context) error { + d.mu.RLock() + limiter, maxInFlight := d.limiter, d.maxInFlight + d.mu.RUnlock() + + sched := newOpenLoopScheduler(d.generator, d.sender, limiter, maxInFlight, d.onSent) + err := service.Run(ctx, func(ctx context.Context, s service.Scope) error { + return sched.Run(ctx, s) + }) + // Fold the scheduler's drop count into the dispatcher's accounting so the + // final summary can report it. + d.mu.Lock() + d.dropped = sched.Dropped() + d.mu.Unlock() + return err +} + +// onSent records a completed open-loop send. A successful send advances +// totalSent; the scheduler counts drops separately. +func (d *Dispatcher) onSent(tx *types.LoadTx, err error) { + if err != nil { + log.Printf("Scheduler: send failed (seq %d): %v", tx.SequenceIndex, err) + return + } + d.mu.Lock() + d.totalSent++ + d.mu.Unlock() +} + // StartBatch generates and sends a specific number of transactions then stops func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { if count <= 0 { @@ -145,10 +235,14 @@ func (d *Dispatcher) GetStats() DispatcherStats { return DispatcherStats{ TotalSent: d.totalSent, + Dropped: d.dropped, } } // DispatcherStats contains statistics for the dispatcher type DispatcherStats struct { TotalSent uint64 + // Dropped is the number of open-loop txs shed because in-flight was + // saturated at their scheduled instant. Always 0 in closed-loop mode. + Dropped uint64 } diff --git a/sender/scheduler.go b/sender/scheduler.go new file mode 100644 index 0000000..b917fd3 --- /dev/null +++ b/sender/scheduler.go @@ -0,0 +1,131 @@ +package sender + +import ( + "context" + "log" + "sync/atomic" + "time" + + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" +) + +// openLoopScheduler issues transactions on an open-loop arrival clock: tx i is +// scheduled at t₀ + i/λ regardless of whether any sender is free. This is the +// coordinated-omission fix — when the SUT slows, the arrival clock does NOT +// slow with it; overdue txs are dropped and counted instead (REL8/REL9 load +// shedding), so measured latency reflects the backlog rather than hiding it. +// +// λ comes from the shared rate.Limiter, which the ramper drives via SetLimit +// (one rate authority). The limiter is read as a clock source here, not as a +// permit gate: the schedule advances by 1/λ per tx, sampling λ each step so a +// ramping λ is honored. At a fixed λ this telescopes to exactly t₀ + i/λ. +// +// In-flight work is bounded by a semaphore. A tx that cannot acquire a permit +// without blocking is dropped (the senders are saturated); the scheduler never +// blocks on capacity, which is what keeps the arrival clock unthrottled. +type openLoopScheduler struct { + generator generator.Generator + sender TxSender + limiter *rate.Limiter + inflight *utils.Semaphore + onSent func(tx *types.LoadTx, err error) + maxInFlight int + + // dropped counts txs shed because in-flight was saturated at their + // scheduled instant. Read after Run returns, or concurrently via Dropped. + dropped atomic.Uint64 +} + +// minScheduleRate floors λ when computing the inter-arrival gap so a near-zero +// limit (e.g. the ramper's recovery-phase rate.Limit(1), or a misconfigured 0) +// cannot produce an unbounded sleep that wedges the scheduler. +const minScheduleRate = 1e-9 + +func newOpenLoopScheduler( + gen generator.Generator, + snd TxSender, + limiter *rate.Limiter, + maxInFlight int, + onSent func(tx *types.LoadTx, err error), +) *openLoopScheduler { + return &openLoopScheduler{ + generator: gen, + sender: snd, + limiter: limiter, + inflight: utils.NewSemaphore(maxInFlight), + onSent: onSent, + maxInFlight: maxInFlight, + } +} + +// Dropped returns the number of txs shed so far because in-flight was saturated. +func (s *openLoopScheduler) Dropped() uint64 { return s.dropped.Load() } + +// Run drives the open-loop arrival clock until the context is canceled or the +// generator is exhausted. Each accepted tx is sent on its own task spawned into +// scope, bounded by the in-flight semaphore; the send task releases the permit +// on completion, so the bound covers true in-flight (enqueue + send), not just +// queue depth. +func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error { + t0 := time.Now() + nextSend := t0 + var i uint64 + + for ctx.Err() == nil { + // Advance the schedule by one inter-arrival gap. Sampling λ here (not + // once up front) honors a ramping limit; at fixed λ the running sum is + // exactly t₀ + i/λ. + lambda := float64(s.limiter.Limit()) + if lambda < minScheduleRate { + lambda = minScheduleRate + } + gap := time.Duration(float64(time.Second) / lambda) + + // Sleep until this tx's scheduled instant. Sleeping to an absolute + // instant (not "gap from now") prevents per-tx scheduling slop from + // accumulating into clock drift. + if err := utils.SleepUntil(ctx, nextSend); err != nil { + return err + } + + tx, ok := s.generator.Generate() + if !ok { + log.Print("Scheduler: generator returned no more transactions") + return nil + } + + // Stamp the TRUE scheduled instant and the arrival index while we are + // the sole owner of tx (see LoadTx concurrency contract). + tx.IntendedSendTime = nextSend + tx.SequenceIndex = i + + nextSend = nextSend.Add(gap) + i++ + + // Non-blocking admission: if senders are saturated, drop and count + // rather than block — blocking here would throttle the arrival clock + // and reintroduce coordinated omission. + release, ok := s.inflight.TryAcquire() + if !ok { + s.dropped.Add(1) + continue + } + scope.Spawn(func() error { + defer release() + err := s.sender.Send(ctx, tx) + if s.onSent != nil { + s.onSent(tx, err) + } + // A send error must not tear down the campaign; the closed-loop + // path logs-and-continues identically. Drops/errors are surfaced + // via counters, not by returning here. + return nil + }) + } + return ctx.Err() +} diff --git a/sender/scheduler_test.go b/sender/scheduler_test.go new file mode 100644 index 0000000..c1057af --- /dev/null +++ b/sender/scheduler_test.go @@ -0,0 +1,218 @@ +package sender + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils/service" +) + +// fakeGenerator hands out blank LoadTx values until count is exhausted. It +// records the IntendedSendTime/SequenceIndex the scheduler stamped, since those +// are the open-loop schedule under test. +type fakeGenerator struct { + mu sync.Mutex + remaining int + issued []*types.LoadTx +} + +func newFakeGenerator(n int) *fakeGenerator { return &fakeGenerator{remaining: n} } + +func (g *fakeGenerator) Generate() (*types.LoadTx, bool) { + g.mu.Lock() + defer g.mu.Unlock() + if g.remaining == 0 { + return nil, false + } + g.remaining-- + tx := &types.LoadTx{Scenario: &types.TxScenario{Name: "fake"}} + g.issued = append(g.issued, tx) + return tx, true +} + +func (g *fakeGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *fakeGenerator) GetAccountPools() []types.AccountPool { + return nil +} + +func (g *fakeGenerator) issuedTxs() []*types.LoadTx { + g.mu.Lock() + defer g.mu.Unlock() + out := make([]*types.LoadTx, len(g.issued)) + copy(out, g.issued) + return out +} + +// fakeSender records send count and optionally blocks for delay, modeling a +// slow SUT so the in-flight bound is exercised. +type fakeSender struct { + delay time.Duration + sent atomic.Uint64 +} + +func (s *fakeSender) Send(ctx context.Context, _ *types.LoadTx) error { + s.sent.Add(1) + if s.delay > 0 { + t := time.NewTimer(s.delay) + defer t.Stop() + select { + case <-t.C: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +// runScheduler drives the scheduler in its own scope until the context expires, +// returning the scheduler so the caller can read Dropped(). +func runScheduler(ctx context.Context, sched *openLoopScheduler) { + _ = service.Run(ctx, func(ctx context.Context, s service.Scope) error { + return sched.Run(ctx, s) + }) +} + +// TestOpenLoopSchedule_TracksT0PlusIOverLambda is the core Done-criterion test: +// at a fixed λ against a fast sender, the IntendedSendTime stamped on tx i must +// track t₀ + i/λ within tolerance, independent of completion. +func TestOpenLoopSchedule_TracksT0PlusIOverLambda(t *testing.T) { + const lambda = 200.0 // tx/s → 5ms gap + gen := newFakeGenerator(40) + snd := &fakeSender{} + limiter := rate.NewLimiter(rate.Limit(lambda), 1) + sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + start := time.Now() + runScheduler(ctx, sched) + + issued := gen.issuedTxs() + require.GreaterOrEqual(t, len(issued), 30, "scheduler should issue most txs within the window") + + gap := time.Second / time.Duration(lambda) + // t₀ is the scheduler's internal start; bound it to [start, start+gap]. + t0 := issued[0].IntendedSendTime + require.WithinDuration(t, start, t0, gap, "t₀ must be the campaign start") + + const tol = 2 * time.Millisecond + for i, tx := range issued { + require.Equal(t, uint64(i), tx.SequenceIndex, "sequence index must be monotonic from 0") + want := t0.Add(time.Duration(i) * gap) + require.WithinDuration(t, want, tx.IntendedSendTime, tol, + "tx %d IntendedSendTime must track t₀ + i/λ", i) + } +} + +// TestOpenLoopSchedule_NotThrottledBySlowSender proves the arrival clock is not +// dragged by a slow SUT: with a sender far slower than the in-flight bound can +// absorb, the schedule must still advance at λ and the overrun must be dropped, +// not absorbed by blocking. +func TestOpenLoopSchedule_NotThrottledBySlowSender(t *testing.T) { + const lambda = 500.0 // 2ms gap + gen := newFakeGenerator(200) + // Each send takes 100ms; with maxInFlight=4 the senders can sustain only + // ~40 tx/s, an order of magnitude under λ → most txs must be dropped. + snd := &fakeSender{delay: 100 * time.Millisecond} + limiter := rate.NewLimiter(rate.Limit(lambda), 1) + sched := newOpenLoopScheduler(gen, snd, limiter, 4, nil) + + ctx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond) + defer cancel() + start := time.Now() + runScheduler(ctx, sched) + + issued := gen.issuedTxs() + gap := time.Second / time.Duration(lambda) + + // The clock must have kept advancing at λ despite the slow sender: the + // schedule should have walked far past what the senders could absorb. + require.GreaterOrEqual(t, len(issued), 100, + "arrival clock must not be throttled by the slow sender") + + // Schedule accuracy still holds for the issued txs. + t0 := issued[0].IntendedSendTime + require.WithinDuration(t, start, t0, gap) + const tol = 3 * time.Millisecond + for i, tx := range issued { + want := t0.Add(time.Duration(i) * gap) + require.WithinDuration(t, want, tx.IntendedSendTime, tol, + "tx %d schedule must hold under a slow sender", i) + } + + // Overrun is dropped-and-counted, not blocked on. + require.Positive(t, sched.Dropped(), "overrun must be counted as dropped") + require.Equal(t, uint64(len(issued)), sched.Dropped()+snd.sent.Load(), + "every issued tx is either sent or dropped exactly once") +} + +// TestOpenLoopSchedule_HonorsRampedLambda verifies the schedule responds to a +// λ change applied via the shared limiter (the ramper's rate authority): after +// SetLimit, the inter-arrival gap tracks the new λ. +func TestOpenLoopSchedule_HonorsRampedLambda(t *testing.T) { + gen := newFakeGenerator(1000) + snd := &fakeSender{} + // Start slow so the first gaps are large and easy to distinguish. + limiter := rate.NewLimiter(rate.Limit(50), 1) // 20ms gap + sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) + + ctx, cancel := context.WithTimeout(t.Context(), 600*time.Millisecond) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runScheduler(ctx, sched) + }() + + // Let it run at 50 tps, then ramp to 500 tps and let it run more. + time.Sleep(200 * time.Millisecond) + limiter.SetLimit(rate.Limit(500)) // 2ms gap + wg.Wait() + + issued := gen.issuedTxs() + require.GreaterOrEqual(t, len(issued), 2, "scheduler must issue txs") + + // The min gap observed in the back half must reflect the faster λ: with a + // 2ms target the later gaps are far under the initial 20ms gap. + var minGap time.Duration = time.Hour + for i := 1; i < len(issued); i++ { + g := issued[i].IntendedSendTime.Sub(issued[i-1].IntendedSendTime) + if g < minGap { + minGap = g + } + } + require.Less(t, minGap, 10*time.Millisecond, + "ramped-up λ must shrink the inter-arrival gap below the initial 20ms") +} + +// TestOpenLoopSchedule_StampsBeforeHandoff guards the LoadTx concurrency +// contract: the scheduler stamps IntendedSendTime and SequenceIndex before the +// send task can touch the tx. Run under -race to catch a regression. +func TestOpenLoopSchedule_StampsBeforeHandoff(t *testing.T) { + gen := newFakeGenerator(50) + snd := &fakeSender{} + limiter := rate.NewLimiter(rate.Limit(1000), 1) + + var checked atomic.Uint64 + onSent := func(tx *types.LoadTx, err error) { + require.NoError(t, err) + require.False(t, tx.IntendedSendTime.IsZero(), "schedule must be stamped before send") + checked.Add(1) + } + sched := newOpenLoopScheduler(gen, snd, limiter, 64, onSent) + + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + runScheduler(ctx, sched) + + require.Positive(t, checked.Load(), "onSent must observe stamped txs") +} diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index 8bc7c8d..d48d1e1 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -120,6 +120,17 @@ func (s *ShardedSender) SetDebug(debug bool) { } } +// SetRateLimited enables or disables worker-side rate limiting across all +// workers. Disable it when an open-loop scheduler is the rate authority. +func (s *ShardedSender) SetRateLimited(rateLimited bool) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, worker := range s.workers { + worker.SetRateLimited(rateLimited) + } +} + // SetTrackReceipts sets the track-receipts flag for the sender and its workers func (s *ShardedSender) SetTrackReceipts(trackReceipts bool) { s.mu.Lock() diff --git a/sender/worker.go b/sender/worker.go index 9b48a8a..42c44ec 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -42,6 +42,11 @@ type Worker struct { workers int trackReceipts bool limiter *rate.Limiter // Shared rate limiter for transaction sending + // rateLimited gates worker-side rate limiting. True for the legacy + // closed-loop model, where the worker is the rate authority. False in the + // open-loop model, where the scheduler owns the arrival clock and gating + // here too would double-throttle the rate. + rateLimited bool } // HttpClientOption configures the Transport used by newHttpClient. @@ -101,6 +106,7 @@ func NewWorker(id int, seiChainID string, endpoint string, bufferSize int, worke workers: workers, trackReceipts: false, limiter: limiter, + rateLimited: true, } meterWorkerQueueLength(w) return w @@ -144,6 +150,12 @@ func (w *Worker) SetTrackReceipts(trackReceipts bool) { w.trackReceipts = trackReceipts } +// SetRateLimited enables or disables worker-side rate limiting. Disable it when +// an upstream open-loop scheduler is the rate authority. +func (w *Worker) SetRateLimited(rateLimited bool) { + w.rateLimited = rateLimited +} + func (w *Worker) watchTransactions(ctx context.Context) error { if w.dryRun || !w.trackReceipts { return nil @@ -226,10 +238,12 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * // processTransactions is the main worker loop that processes transactions func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error { for ctx.Err() == nil { - // Apply rate limiting before getting the next transaction - if w.limiter != nil { - if !w.limiter.Allow() { - continue + // Closed-loop rate limiting: block until the limiter releases a permit + // rather than busy-spinning on Allow(). Skipped when an open-loop + // scheduler is the rate authority (it would otherwise double-throttle). + if w.rateLimited && w.limiter != nil { + if err := w.limiter.Wait(ctx); err != nil { + return err } } diff --git a/stats/metrics.go b/stats/metrics.go index 75bce3d..b7f3460 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -41,6 +41,11 @@ var ( "run_txs_accepted_total", metric.WithDescription("Total transactions accepted by endpoints over this run (emitted once at run end)"), metric.WithUnit("{transactions}"))) + + runTxsDroppedTotal = must(meter.Int64Gauge( + "run_txs_dropped_total", + metric.WithDescription("Total open-loop transactions dropped because in-flight was saturated at their scheduled instant (emitted once at run end)"), + metric.WithUnit("{transactions}"))) ) func must[V any](v V, err error) V { diff --git a/stats/run_summary.go b/stats/run_summary.go index c8480df..652dce4 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -3,10 +3,25 @@ package stats import ( "context" "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) +// RunSummary carries the arrival-model accounting the collector does not track +// itself (the dispatcher owns it) into the run-summary gauges. +type RunSummary struct { + // ArrivalModel is "open_loop" or "closed_loop"; tags the dropped gauge so a + // nonzero drop count is attributable to the model that produced it. + ArrivalModel string + // Dropped is the count of open-loop txs shed on in-flight saturation. These + // never reach the inclusion tracker, so they carry a zero InclusionTime; + // keep them out of later inclusion-rate denominators (PLT-463 forward-note). + Dropped uint64 +} + // EmitRunSummary records the run-summary gauges. Call once at shutdown. -func (c *Collector) EmitRunSummary(ctx context.Context) { +func (c *Collector) EmitRunSummary(ctx context.Context, summary RunSummary) { c.mu.RLock() duration := time.Since(c.startTime) totalTxs := c.totalTxs @@ -16,4 +31,6 @@ func (c *Collector) EmitRunSummary(ctx context.Context) { runDurationSeconds.Record(ctx, duration.Seconds()) runTPSFinal.Record(ctx, finalTPS) runTxsAcceptedTotal.Record(ctx, int64(totalTxs)) + runTxsDroppedTotal.Record(ctx, int64(summary.Dropped), + metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) } diff --git a/types/scenario.go b/types/scenario.go index db5e6a7..9d2dcc8 100644 --- a/types/scenario.go +++ b/types/scenario.go @@ -12,24 +12,37 @@ import ( // LoadTx is a wrapper that has pre-encoded json rpc payload and eth transaction. // -// Lifecycle timestamp concurrency contract: a *LoadTx is passed by pointer -// through buffered channels (txChan, sentTxs). Each lifecycle timestamp is -// written at most once, by whichever goroutine owns the tx at that stage, and -// is immutable thereafter; ownership transfers with the pointer across the -// channels, so the writes need no locking. A zero timestamp means "not -// recorded" (e.g. prewarm txs, or a stage not yet reached) — consumers must -// treat it as untracked, never as the zero epoch. +// Lifecycle field concurrency contract: a *LoadTx is passed by pointer through +// buffered channels (txChan, sentTxs). Each lifecycle field (the timestamps and +// SequenceIndex) is written at most once, by whichever goroutine owns the tx at +// that stage, and is immutable thereafter; ownership transfers with the pointer +// across the channels, so the writes need no locking. The open-loop scheduler +// writes IntendedSendTime and SequenceIndex while it solely owns the tx (before +// the worker hand-off); the worker writes AttemptedSendTime; the inclusion +// tracker writes InclusionTime. A zero timestamp means "not recorded" (e.g. +// prewarm txs, or a stage not yet reached) — consumers must treat it as +// untracked, never as the zero epoch. type LoadTx struct { EthTx *ethtypes.Transaction JSONRPCPayload []byte Payload []byte Scenario *TxScenario - // IntendedSendTime is when the tx was scheduled to be sent, written by the - // dispatcher before the tx is enqueued. It currently holds the enqueue time, - // which is back-pressured under load; until an open-loop scheduler sets it to - // the intended schedule instant, it must not be used to derive latency. + // IntendedSendTime is when the tx was scheduled to be sent. In the open-loop + // arrival model the scheduler writes the true scheduled instant t₀ + i/λ + // here (independent of when a sender is free), which is the basis for + // coordinated-omission-free latency. In the legacy closed-loop model it + // instead holds the back-pressured enqueue time and must not be used to + // derive latency. SequenceIndex disambiguates which model produced it: a + // nonzero SequenceIndex (or index 0 from a campaign start) marks the + // open-loop scheduler as the writer. IntendedSendTime time.Time + // SequenceIndex is the monotonic per-campaign arrival index i assigned by + // the open-loop scheduler, which schedules tx i at t₀ + i/λ. It attributes + // per-tx schedule lag (IntendedSendTime vs AttemptedSendTime) back to a + // position in the arrival sequence. Zero in the legacy closed-loop model, + // where no scheduler assigns it. + SequenceIndex uint64 // AttemptedSendTime is when the send was actually attempted, written by the // worker goroutine that owns the tx between dequeue and the sentTxs hand-off. AttemptedSendTime time.Time diff --git a/utils/semaphore.go b/utils/semaphore.go index 728c12a..85f1b02 100644 --- a/utils/semaphore.go +++ b/utils/semaphore.go @@ -22,3 +22,16 @@ func (s *Semaphore) Acquire(ctx context.Context) (relase func(), err error) { } return func() { <-s.ch }, nil } + +// TryAcquire acquires a permit without blocking. It returns the release func +// and true if a permit was available, or nil and false if all permits are held. +// Used by callers that must never block waiting for capacity (e.g. an open-loop +// scheduler that drops rather than throttling its clock). +func (s *Semaphore) TryAcquire() (release func(), ok bool) { + select { + case s.ch <- struct{}{}: + return func() { <-s.ch }, true + default: + return nil, false + } +} From 883d464742c70ff55fc039d11c75cf2207a8d061 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 08:12:33 -0700 Subject: [PATCH 2/9] =?UTF-8?q?PLT-458:=20review=20fixes=20=E2=80=94=20rej?= =?UTF-8?q?ect=20degenerate=20open-loop,=20honest=20in-flight=20bound?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit B1: reject open_loop without a finite positive arrival rate. With TPS=0 and no ramp, λ=rate.Inf, the inter-arrival gap collapses to 0, IntendedSendTime never advances past t₀, and the scheduler spins and drops everything. Add config.Settings.Validate (TPS>0 or --ramp-up required for open_loop) and call it after ResolveSettings; fail fast with a clear error. minScheduleRate stays as divide-by-zero/+Inf defense-in-depth only. B2: tie the in-flight permit to real send completion, not enqueue. Worker.Send (via ShardedSender) returns at enqueue, so the prior defer release() bounded enqueue backlog, not unacked sends — and dropped reflected buffer geometry. Thread a LoadTx.OnComplete hook the worker invokes after sendTransaction; the scheduler stamps it to release the permit so maxInFlight bounds true in-flight and dropped measures genuine load-shed. Enqueue-failure path completes inline. schedule_lag (PLT-463) remains the primary CO-detection gate. Also: reject unrecognized --arrival-model at config load; drop the SequenceIndex self-disambiguation claim (gate on run-level arrival_model); state the dropped-tx inclusion-denominator invariant plainly; fix relase typo. Tests: replace the synchronous fakeSender with an async enqueue-and-complete sender so the slow-sender drop test exercises production semantics; add the permit-held-until-completion guard, a conservation invariant, and config validation rejection tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- config/settings.go | 23 +++++ config/settings_test.go | 54 +++++++++++ main.go | 3 + sender/scheduler.go | 44 +++++++-- sender/scheduler_test.go | 197 +++++++++++++++++++++++++++++++++------ sender/worker.go | 6 ++ stats/run_summary.go | 8 +- types/scenario.go | 20 +++- utils/semaphore.go | 2 +- 9 files changed, 312 insertions(+), 45 deletions(-) diff --git a/config/settings.go b/config/settings.go index 3cb9834..dd98ef9 100644 --- a/config/settings.go +++ b/config/settings.go @@ -45,6 +45,29 @@ const ( ArrivalModelOpenLoop = "open_loop" ) +// Validate checks resolved settings for self-consistent run configuration, +// failing fast on combinations that would otherwise produce a silently +// degenerate run. Call once after ResolveSettings. +func (s Settings) Validate() error { + switch s.ArrivalModel { + case ArrivalModelClosedLoop, ArrivalModelOpenLoop: + default: + return fmt.Errorf("invalid arrival-model %q: must be %q or %q", + s.ArrivalModel, ArrivalModelOpenLoop, ArrivalModelClosedLoop) + } + + // Open-loop derives the inter-arrival gap as 1/λ. With no finite positive + // arrival rate, λ is rate.Inf, the gap collapses to 0, IntendedSendTime + // never advances past t₀, and the scheduler spins and drops everything — + // the latency anchor degenerates to "time since campaign start". A finite λ + // comes from either a configured TPS>0 or a ramp curve (RampUp), which the + // ramper drives to finite limits. Reject the degenerate case up front. + if s.ArrivalModel == ArrivalModelOpenLoop && s.TPS <= 0 && !s.RampUp { + return fmt.Errorf("arrival-model %q requires a finite positive arrival rate: set --tps>0 or --ramp-up", ArrivalModelOpenLoop) + } + return nil +} + // DefaultSettings returns the default configuration values func DefaultSettings() Settings { return Settings{ diff --git a/config/settings_test.go b/config/settings_test.go index 29e6a03..6e9a542 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -151,3 +151,57 @@ func TestDefaultSettings(t *testing.T) { t.Errorf("DefaultSettings mismatch.\nExpected: %+v\nGot: %+v", expected, defaults) } } + +func TestSettingsValidate(t *testing.T) { + tests := []struct { + name string + settings Settings + wantErr string + }{ + { + name: "closed-loop with no rate is fine", + settings: Settings{ArrivalModel: ArrivalModelClosedLoop, TPS: 0}, + }, + { + name: "open-loop with finite TPS is fine", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 100}, + }, + { + name: "open-loop with ramp-up is fine (finite ramp curve λ)", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 0, RampUp: true}, + }, + { + // B1: open-loop with TPS=0 and no ramp ⇒ λ=Inf ⇒ degenerate anchor. + name: "open-loop with zero TPS and no ramp is rejected", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 0}, + wantErr: "finite positive arrival rate", + }, + { + name: "open-loop with negative TPS is rejected", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: -1}, + wantErr: "finite positive arrival rate", + }, + { + name: "unrecognized arrival-model is rejected", + settings: Settings{ArrivalModel: "burst", TPS: 100}, + wantErr: "invalid arrival-model", + }, + { + name: "empty arrival-model is rejected", + settings: Settings{ArrivalModel: "", TPS: 100}, + wantErr: "invalid arrival-model", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.settings.Validate() + if tt.wantErr == "" { + require.NoError(t, err) + return + } + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + }) + } +} diff --git a/main.go b/main.go index 2fae33f..eea7193 100644 --- a/main.go +++ b/main.go @@ -110,6 +110,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Get resolved settings from the config package settings := config.ResolveSettings() + if err := settings.Validate(); err != nil { + return fmt.Errorf("invalid settings: %w", err) + } // Handle --nodes flag to limit number of endpoints nodes, _ := cmd.Flags().GetInt("nodes") diff --git a/sender/scheduler.go b/sender/scheduler.go index b917fd3..453d62a 100644 --- a/sender/scheduler.go +++ b/sender/scheduler.go @@ -3,6 +3,7 @@ package sender import ( "context" "log" + "sync" "sync/atomic" "time" @@ -28,6 +29,13 @@ import ( // In-flight work is bounded by a semaphore. A tx that cannot acquire a permit // without blocking is dropped (the senders are saturated); the scheduler never // blocks on capacity, which is what keeps the arrival clock unthrottled. +// +// The permit is held until the actual network send completes, not until the tx +// is enqueued: the scheduler stamps tx.OnComplete with the release, and the +// worker invokes it after sendTransaction returns. So maxInFlight bounds true +// unacked in-flight sends and dropped measures genuine load-shed, not buffer +// geometry (the worker's send is async enqueue-and-return). Note: schedule_lag +// (PLT-463), not the drop count, remains the primary coordinated-omission gate. type openLoopScheduler struct { generator generator.Generator sender TxSender @@ -41,9 +49,11 @@ type openLoopScheduler struct { dropped atomic.Uint64 } -// minScheduleRate floors λ when computing the inter-arrival gap so a near-zero -// limit (e.g. the ramper's recovery-phase rate.Limit(1), or a misconfigured 0) -// cannot produce an unbounded sleep that wedges the scheduler. +// minScheduleRate floors λ when computing the inter-arrival gap so a zero or +// negative limit cannot make the gap divide-by-zero or blow up to a +Inf +// duration. It does not bound how long the scheduler sleeps: a small-but-finite +// λ still yields a long gap. The degenerate λ=Inf / TPS=0 open-loop case is +// rejected up front in config validation (see config.Settings.Validate). const minScheduleRate = 1e-9 func newOpenLoopScheduler( @@ -115,11 +125,31 @@ func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error s.dropped.Add(1) continue } + + // Complete fires exactly once when the send finishes: it releases the + // in-flight permit and reports the result. The worker invokes it via + // tx.OnComplete after the real send (the happy path), so the permit is + // held for the whole unacked-in-flight window, not just the enqueue. + // The Once guards the enqueue-failure fallback below from racing the + // worker — though by construction only one path runs. + var once sync.Once + complete := func(err error) { + once.Do(func() { + release() + if s.onSent != nil { + s.onSent(tx, err) + } + }) + } + tx.OnComplete = complete + scope.Spawn(func() error { - defer release() - err := s.sender.Send(ctx, tx) - if s.onSent != nil { - s.onSent(tx, err) + // Send returns at enqueue; the permit release is deferred to the + // worker via tx.OnComplete on real completion. If the enqueue itself + // fails (e.g. ctx canceled), the tx never reaches a worker, so we + // complete it here to avoid leaking the permit. + if err := s.sender.Send(ctx, tx); err != nil { + complete(err) } // A send error must not tear down the campaign; the closed-loop // path logs-and-continues identically. Drops/errors are surfaced diff --git a/sender/scheduler_test.go b/sender/scheduler_test.go index c1057af..202b775 100644 --- a/sender/scheduler_test.go +++ b/sender/scheduler_test.go @@ -50,25 +50,65 @@ func (g *fakeGenerator) issuedTxs() []*types.LoadTx { return out } -// fakeSender records send count and optionally blocks for delay, modeling a -// slow SUT so the in-flight bound is exercised. -type fakeSender struct { +// asyncFakeSender models the production ShardedSender's send semantics: Send +// returns when the tx lands in a buffered channel (enqueue-and-return), NOT when +// the network send completes. Background workers dequeue and, after an optional +// per-send delay (a slow SUT), invoke tx.OnComplete to release the scheduler's +// in-flight permit. This is what exercises the HONEST in-flight bound (B2): a +// synchronous sender that blocks in Send would hide that the permit must be tied +// to real completion, not to enqueue. +type asyncFakeSender struct { + ch chan *types.LoadTx delay time.Duration - sent atomic.Uint64 + sent atomic.Uint64 // incremented when a send actually completes } -func (s *fakeSender) Send(ctx context.Context, _ *types.LoadTx) error { - s.sent.Add(1) - if s.delay > 0 { - t := time.NewTimer(s.delay) - defer t.Stop() +// newAsyncFakeSender starts `workers` background senders draining a buffer of +// `buffer` slots. Mirrors a worker pool behind a bounded channel. +func newAsyncFakeSender(ctx context.Context, buffer, workers int, delay time.Duration) *asyncFakeSender { + s := &asyncFakeSender{ch: make(chan *types.LoadTx, buffer), delay: delay} + if workers < 1 { + workers = 1 + } + for range workers { + go s.drain(ctx) + } + return s +} + +func (s *asyncFakeSender) drain(ctx context.Context) { + for { select { - case <-t.C: case <-ctx.Done(): - return ctx.Err() + return + case tx := <-s.ch: + if s.delay > 0 { + t := time.NewTimer(s.delay) + select { + case <-t.C: + case <-ctx.Done(): + t.Stop() + } + } + s.sent.Add(1) + if tx.OnComplete != nil { + tx.OnComplete(nil) + } } } - return nil +} + +// Send enqueues without blocking on completion, returning at enqueue. If the +// buffer is full it blocks on the channel until a slot frees or ctx is done — +// like utils.Send in the real worker. The scheduler must never see this block +// throttle its clock because admission is gated by the in-flight permit upstream. +func (s *asyncFakeSender) Send(ctx context.Context, tx *types.LoadTx) error { + select { + case s.ch <- tx: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // runScheduler drives the scheduler in its own scope until the context expires, @@ -85,12 +125,13 @@ func runScheduler(ctx context.Context, sched *openLoopScheduler) { func TestOpenLoopSchedule_TracksT0PlusIOverLambda(t *testing.T) { const lambda = 200.0 // tx/s → 5ms gap gen := newFakeGenerator(40) - snd := &fakeSender{} - limiter := rate.NewLimiter(rate.Limit(lambda), 1) - sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) defer cancel() + snd := newAsyncFakeSender(ctx, 1024, 8, 0) + limiter := rate.NewLimiter(rate.Limit(lambda), 1) + sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) + start := time.Now() runScheduler(ctx, sched) @@ -115,17 +156,27 @@ func TestOpenLoopSchedule_TracksT0PlusIOverLambda(t *testing.T) { // dragged by a slow SUT: with a sender far slower than the in-flight bound can // absorb, the schedule must still advance at λ and the overrun must be dropped, // not absorbed by blocking. +// +// It uses an ASYNC sender (Send returns at enqueue) so the drop count reflects +// the HONEST in-flight bound (B2): the permit is held until each send actually +// completes, so a slow SUT saturates maxInFlight and forces genuine load-shed — +// not buffer geometry. A synchronous sender would have masked this. func TestOpenLoopSchedule_NotThrottledBySlowSender(t *testing.T) { const lambda = 500.0 // 2ms gap gen := newFakeGenerator(200) - // Each send takes 100ms; with maxInFlight=4 the senders can sustain only - // ~40 tx/s, an order of magnitude under λ → most txs must be dropped. - snd := &fakeSender{delay: 100 * time.Millisecond} - limiter := rate.NewLimiter(rate.Limit(lambda), 1) - sched := newOpenLoopScheduler(gen, snd, limiter, 4, nil) ctx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond) defer cancel() + + // Each send completes after 100ms; with maxInFlight=4 and only 4 draining + // workers the senders can sustain only ~40 tx/s, an order of magnitude under + // λ → most txs must be dropped. The buffer is deliberately small: with the + // honest bound the in-flight permit (not the buffer) is the gate. + const maxInFlight = 4 + snd := newAsyncFakeSender(ctx, maxInFlight, maxInFlight, 100*time.Millisecond) + limiter := rate.NewLimiter(rate.Limit(lambda), 1) + sched := newOpenLoopScheduler(gen, snd, limiter, maxInFlight, nil) + start := time.Now() runScheduler(ctx, sched) @@ -147,10 +198,97 @@ func TestOpenLoopSchedule_NotThrottledBySlowSender(t *testing.T) { "tx %d schedule must hold under a slow sender", i) } - // Overrun is dropped-and-counted, not blocked on. + // Overrun is dropped-and-counted, not blocked on. With ~150 issued in 300ms + // and senders able to complete only ~a dozen, the vast majority must drop. require.Positive(t, sched.Dropped(), "overrun must be counted as dropped") - require.Equal(t, uint64(len(issued)), sched.Dropped()+snd.sent.Load(), - "every issued tx is either sent or dropped exactly once") + require.Greater(t, sched.Dropped(), uint64(len(issued)/2), + "a slow SUT must shed most of the load through the in-flight bound") +} + +// gatedSender enqueues instantly but holds completion until release is called, +// letting a test observe the window where a tx is enqueued-but-not-completed. +type gatedSender struct { + enqueued atomic.Uint64 + mu sync.Mutex + pending []*types.LoadTx +} + +func (s *gatedSender) Send(_ context.Context, tx *types.LoadTx) error { + s.mu.Lock() + s.pending = append(s.pending, tx) + s.mu.Unlock() + s.enqueued.Add(1) + return nil // returns at enqueue, like the production worker +} + +// completeAll fires OnComplete for every tx enqueued so far, releasing permits. +func (s *gatedSender) completeAll() { + s.mu.Lock() + pending := s.pending + s.pending = nil + s.mu.Unlock() + for _, tx := range pending { + if tx.OnComplete != nil { + tx.OnComplete(nil) + } + } +} + +// TestOpenLoopSchedule_PermitHeldUntilCompletion is the B2 guard: the in-flight +// permit must be tied to real send completion, not to enqueue. With maxInFlight=1 +// and a sender that enqueues instantly but never completes, the first tx takes +// the only permit and holds it; every subsequent tx must drop. If the permit +// released at enqueue (the masked bug), the sender would have enqueued many. +func TestOpenLoopSchedule_PermitHeldUntilCompletion(t *testing.T) { + gen := newFakeGenerator(100) + snd := &gatedSender{} + limiter := rate.NewLimiter(rate.Limit(1000), 1) // 1ms gap → many arrivals + sched := newOpenLoopScheduler(gen, snd, limiter, 1, nil) + + ctx, cancel := context.WithTimeout(t.Context(), 120*time.Millisecond) + defer cancel() + runScheduler(ctx, sched) + + // Exactly one tx held the single permit through the whole run (never + // completed), so the sender saw exactly one enqueue and everything else + // dropped. Enqueue-time release would have let many through. + require.Equal(t, uint64(1), snd.enqueued.Load(), + "permit must be held until completion: only one tx may be in flight") + require.Positive(t, sched.Dropped(), "arrivals past the held permit must drop") + + // Release the held permit; conservation still holds (issued == sent+dropped + // is checked elsewhere). Drain to avoid a leaked OnComplete at teardown. + snd.completeAll() +} + +// TestOpenLoopSchedule_Conservation checks the accounting invariant: with a fast +// async sender that fully drains within the window, every issued tx is either +// completed (sent) or dropped exactly once — no permit leaks, no double-count. +func TestOpenLoopSchedule_Conservation(t *testing.T) { + gen := newFakeGenerator(300) + // Generous capacity so most txs complete; a few may drop on brief bursts. + limiter := rate.NewLimiter(rate.Limit(1000), 1) + + var completed atomic.Uint64 + onSent := func(_ *types.LoadTx, err error) { + require.NoError(t, err) + completed.Add(1) + } + + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + snd := newAsyncFakeSender(ctx, 256, 16, 0) + sched := newOpenLoopScheduler(gen, snd, limiter, 256, onSent) + runScheduler(ctx, sched) + + issued := uint64(len(gen.issuedTxs())) + require.Positive(t, issued) + // Allow the in-flight sends spawned just before deadline to settle. + require.Eventually(t, func() bool { + return completed.Load()+sched.Dropped() == issued + }, time.Second, 5*time.Millisecond, + "every issued tx must be completed or dropped exactly once (issued=%d sent=%d dropped=%d)", + issued, completed.Load(), sched.Dropped()) } // TestOpenLoopSchedule_HonorsRampedLambda verifies the schedule responds to a @@ -158,14 +296,13 @@ func TestOpenLoopSchedule_NotThrottledBySlowSender(t *testing.T) { // SetLimit, the inter-arrival gap tracks the new λ. func TestOpenLoopSchedule_HonorsRampedLambda(t *testing.T) { gen := newFakeGenerator(1000) - snd := &fakeSender{} + ctx, cancel := context.WithTimeout(t.Context(), 600*time.Millisecond) + defer cancel() + snd := newAsyncFakeSender(ctx, 1024, 8, 0) // Start slow so the first gaps are large and easy to distinguish. limiter := rate.NewLimiter(rate.Limit(50), 1) // 20ms gap sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) - ctx, cancel := context.WithTimeout(t.Context(), 600*time.Millisecond) - defer cancel() - var wg sync.WaitGroup wg.Add(1) go func() { @@ -199,7 +336,9 @@ func TestOpenLoopSchedule_HonorsRampedLambda(t *testing.T) { // send task can touch the tx. Run under -race to catch a regression. func TestOpenLoopSchedule_StampsBeforeHandoff(t *testing.T) { gen := newFakeGenerator(50) - snd := &fakeSender{} + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + snd := newAsyncFakeSender(ctx, 64, 8, 0) limiter := rate.NewLimiter(rate.Limit(1000), 1) var checked atomic.Uint64 @@ -210,8 +349,6 @@ func TestOpenLoopSchedule_StampsBeforeHandoff(t *testing.T) { } sched := newOpenLoopScheduler(gen, snd, limiter, 64, onSent) - ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) - defer cancel() runScheduler(ctx, sched) require.Positive(t, checked.Load(), "onSent must observe stamped txs") diff --git a/sender/worker.go b/sender/worker.go index 42c44ec..ca553de 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -257,6 +257,12 @@ func (w *Worker) processTransactions(ctx context.Context, client *http.Client) e // so stamping the actual send-attempt time here is race-free (see LoadTx). tx.AttemptedSendTime = startTime err = w.sendTransaction(ctx, client, tx) + // Release the in-flight permit on actual send completion (open-loop). + // This is what makes maxInFlight bound true unacked sends rather than + // enqueue backlog; the closed-loop and batch paths leave it nil. + if tx.OnComplete != nil { + tx.OnComplete(err) + } // Record statistics if collector is available if w.collector != nil { w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err == nil) diff --git a/stats/run_summary.go b/stats/run_summary.go index 652dce4..5572628 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -14,9 +14,11 @@ type RunSummary struct { // ArrivalModel is "open_loop" or "closed_loop"; tags the dropped gauge so a // nonzero drop count is attributable to the model that produced it. ArrivalModel string - // Dropped is the count of open-loop txs shed on in-flight saturation. These - // never reach the inclusion tracker, so they carry a zero InclusionTime; - // keep them out of later inclusion-rate denominators (PLT-463 forward-note). + // Dropped is the count of open-loop txs shed on in-flight saturation. A + // dropped tx never reaches the inclusion tracker and carries no + // InclusionTime, so it must stay out of inclusion-rate denominators: the + // denominator is sent (txs that reached a sender), never issued (sent + + // dropped). Dropped uint64 } diff --git a/types/scenario.go b/types/scenario.go index 9d2dcc8..8d02d8e 100644 --- a/types/scenario.go +++ b/types/scenario.go @@ -33,19 +33,31 @@ type LoadTx struct { // here (independent of when a sender is free), which is the basis for // coordinated-omission-free latency. In the legacy closed-loop model it // instead holds the back-pressured enqueue time and must not be used to - // derive latency. SequenceIndex disambiguates which model produced it: a - // nonzero SequenceIndex (or index 0 from a campaign start) marks the - // open-loop scheduler as the writer. + // derive latency. A LoadTx cannot self-describe which model wrote it — an + // open-loop tx and a closed-loop tx are byte-identical (both can have + // SequenceIndex 0). Latency and schedule-lag consumers must gate on the + // run-level arrival model (RunSummary.ArrivalModel), not on any field here. IntendedSendTime time.Time // SequenceIndex is the monotonic per-campaign arrival index i assigned by // the open-loop scheduler, which schedules tx i at t₀ + i/λ. It attributes // per-tx schedule lag (IntendedSendTime vs AttemptedSendTime) back to a // position in the arrival sequence. Zero in the legacy closed-loop model, - // where no scheduler assigns it. + // where no scheduler assigns it — so the value alone does not identify the + // model (see IntendedSendTime); the run's arrival model is authoritative. SequenceIndex uint64 // AttemptedSendTime is when the send was actually attempted, written by the // worker goroutine that owns the tx between dequeue and the sentTxs hand-off. AttemptedSendTime time.Time + // OnComplete, if set, is invoked exactly once when the network send attempt + // for this tx finishes (after sendTransaction returns), with the send error + // or nil. The open-loop scheduler sets it to release the in-flight permit so + // the bound covers true unacked sends (enqueue + send), not just queue depth; + // see the open-loop scheduler. The worker invokes it after sendTransaction + // and is the sole invoker on the happy path. Nil in the closed-loop and batch + // paths, where the worker simply skips it. The callback must be cheap and + // non-blocking — the worker holds the tx and calls it inline. Written by the + // owning goroutine before hand-off, per the lifecycle concurrency contract. + OnComplete func(err error) // InclusionTime is when the tx was observed included on-chain, written only // by the inclusion tracker. InclusionTime time.Time diff --git a/utils/semaphore.go b/utils/semaphore.go index 85f1b02..941c670 100644 --- a/utils/semaphore.go +++ b/utils/semaphore.go @@ -16,7 +16,7 @@ func NewSemaphore(n int) *Semaphore { // Acquire acquires a permit from the semaphore. // Blocks until a permit is available. -func (s *Semaphore) Acquire(ctx context.Context) (relase func(), err error) { +func (s *Semaphore) Acquire(ctx context.Context) (release func(), err error) { if err := Send(ctx, s.ch, struct{}{}); err != nil { return nil, err } From 9efe50c08ff772238d3f2c7adf8119062357f268 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 11:12:53 -0700 Subject: [PATCH 3/9] PLT-458: fix staticcheck lint (ST1023) Omit the redundant time.Duration type from the minGap declaration in scheduler_test (inferred from time.Hour). Caught by golangci-lint (CI gate). Co-Authored-By: Claude Opus 4.8 (1M context) --- sender/scheduler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sender/scheduler_test.go b/sender/scheduler_test.go index 202b775..733764b 100644 --- a/sender/scheduler_test.go +++ b/sender/scheduler_test.go @@ -320,7 +320,7 @@ func TestOpenLoopSchedule_HonorsRampedLambda(t *testing.T) { // The min gap observed in the back half must reflect the faster λ: with a // 2ms target the later gaps are far under the initial 20ms gap. - var minGap time.Duration = time.Hour + minGap := time.Hour for i := 1; i < len(issued); i++ { g := issued[i].IntendedSendTime.Sub(issued[i-1].IntendedSendTime) if g < minGap { From e32d157e480dea9e3874ba222fa618805c0c299c Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 11:47:44 -0700 Subject: [PATCH 4/9] PLT-458: cover real worker's OnComplete permit-release on the send path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every existing scheduler test drove a fake TxSender that fired tx.OnComplete itself, so the suite stayed green even if the real Worker forgot to invoke it in runTxSender — leaking the open-loop in-flight semaphore (permits never released → the maxInFlight bound becomes meaningless). Add an httptest JSON-RPC harness (answers eth_sendRawTransaction, the only RPC the ethclient send path issues) behind the real Worker + open-loop scheduler: - Conservation on the real path: issued == completed + dropped, with completed driven by the real worker's OnComplete; handled sends matched against the real RPC server's count. - Permit released by the worker: maxInFlight=1 with a server that blocks one send holds exactly one in flight and drops the rest; releasing it resumes flow — which is only possible if the worker fires OnComplete. Both tests fail when the OnComplete invoke is removed (verified). Also clarify the scheduler doc: enqueue is async but the RPC send is synchronous, so the permit is held for the full round-trip. Co-Authored-By: Claude Opus 4.8 (1M context) --- sender/scheduler.go | 8 +- sender/scheduler_realworker_test.go | 411 ++++++++++++++++++++++++++++ 2 files changed, 417 insertions(+), 2 deletions(-) create mode 100644 sender/scheduler_realworker_test.go diff --git a/sender/scheduler.go b/sender/scheduler.go index 453d62a..49bd093 100644 --- a/sender/scheduler.go +++ b/sender/scheduler.go @@ -34,8 +34,12 @@ import ( // is enqueued: the scheduler stamps tx.OnComplete with the release, and the // worker invokes it after sendTransaction returns. So maxInFlight bounds true // unacked in-flight sends and dropped measures genuine load-shed, not buffer -// geometry (the worker's send is async enqueue-and-return). Note: schedule_lag -// (PLT-463), not the drop count, remains the primary coordinated-omission gate. +// geometry. Note the two distinct phases of the worker path: the ENQUEUE into +// the worker's txChan (TxSender.Send) is async and returns immediately, but the +// RPC send itself (sendTransaction → eth_sendRawTransaction) is SYNCHRONOUS — so +// the permit is held for the full RPC round-trip, not just the enqueue. Note +// also: schedule_lag (PLT-463), not the drop count, remains the primary +// coordinated-omission gate. type openLoopScheduler struct { generator generator.Generator sender TxSender diff --git a/sender/scheduler_realworker_test.go b/sender/scheduler_realworker_test.go new file mode 100644 index 0000000..79954c5 --- /dev/null +++ b/sender/scheduler_realworker_test.go @@ -0,0 +1,411 @@ +package sender + +import ( + "context" + "encoding/json" + "io" + "math/big" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils/service" +) + +// This file is the production-path safety net for the open-loop in-flight bound. +// +// Every other scheduler test drives a FAKE TxSender that invokes tx.OnComplete +// itself, so the suite would stay green even if the real Worker forgot the +// `if tx.OnComplete != nil { tx.OnComplete(err) }` line in runTxSender — the one +// load-bearing line that makes the maxInFlight semaphore bound true unacked +// sends rather than nothing (permits would never be released → leak/meaningless +// bound). The tests here wire the REAL Worker (runTxSender → sendTransaction → +// the real ethclient → OnComplete) behind the scheduler and assert the permit +// is genuinely released by the worker on send completion. +// +// Harness: an httptest.Server speaking the minimal JSON-RPC the ethclient send +// path touches. SendTransaction issues exactly one eth_sendRawTransaction call +// per tx (verified against go-ethereum v1.16.1: HTTP dial makes no RPC call, and +// SendTransaction marshals the tx and calls eth_sendRawTransaction; no +// eth_chainId round-trip). This keeps the harness loopback-only and lets us +// exercise the real worker send path end to end with a controllable response — +// including a "block until released" mode for the maxInFlight=1 assertion. + +// jsonRPCReq is the subset of a JSON-RPC request we parse from the ethclient. +type jsonRPCReq struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Method string `json:"method"` +} + +// rpcServer is an httptest-backed JSON-RPC endpoint that answers +// eth_sendRawTransaction. It counts handled sends and can be put into a +// "block" mode where each send parks until explicitly released, so a test can +// hold a send in flight and observe the in-flight bound. +type rpcServer struct { + srv *httptest.Server + + entered atomic.Uint64 // eth_sendRawTransaction calls that entered the handler + handled atomic.Uint64 // eth_sendRawTransaction calls that returned a result + + // When blocking, every send waits on a fresh gate handed out via started so + // the test can release them one at a time. arrived is signaled when a send + // has entered the handler (so the test knows a send is genuinely in flight). + mu sync.Mutex + blocking bool + gates []chan struct{} // one per blocked send, in arrival order + arrived chan struct{} // buffered; one token per send that entered the handler +} + +func newRPCServer(t *testing.T) *rpcServer { + t.Helper() + s := &rpcServer{arrived: make(chan struct{}, 1024)} + s.srv = httptest.NewServer(http.HandlerFunc(s.handle)) + t.Cleanup(s.srv.Close) + return s +} + +func (s *rpcServer) url() string { return s.srv.URL } + +// setBlocking toggles the block-until-released mode. +func (s *rpcServer) setBlocking(b bool) { + s.mu.Lock() + s.blocking = b + s.mu.Unlock() +} + +// releaseOne unblocks the oldest parked send. Returns false if none is parked +// yet (caller should retry after observing an arrival). +func (s *rpcServer) releaseOne() bool { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.gates) == 0 { + return false + } + gate := s.gates[0] + s.gates = s.gates[1:] + close(gate) + return true +} + +func (s *rpcServer) handle(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var req jsonRPCReq + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if req.Method == "eth_sendRawTransaction" { + s.entered.Add(1) + s.mu.Lock() + blocking := s.blocking + var gate chan struct{} + if blocking { + gate = make(chan struct{}) + s.gates = append(s.gates, gate) + } + s.mu.Unlock() + + if blocking { + // Announce arrival, then park until released or the request ctx is + // canceled (campaign teardown). Parking here holds the real worker + // inside sendTransaction, so the scheduler's permit stays held. + s.arrived <- struct{}{} + select { + case <-gate: + case <-r.Context().Done(): + } + } + } + + id := req.ID + if len(id) == 0 { + id = json.RawMessage("0") + } + // A non-error result is enough; ethclient discards the value (nil out arg). + resp := struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Result string `json:"result"` + }{ + JSONRPC: "2.0", + ID: id, + Result: "0x0000000000000000000000000000000000000000000000000000000000000000", + } + if req.Method == "eth_sendRawTransaction" { + s.handled.Add(1) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&resp) +} + +// signedTxGenerator yields real, signed DynamicFee transactions (the production +// EVMTransfer shape) so the real ethclient marshals and ships a valid raw tx to +// the JSON-RPC server. It is the generator.Generator the scheduler drives. +type signedTxGenerator struct { + mu sync.Mutex + remaining int + acct *types.Account + signer ethtypes.Signer + chainID *big.Int + issued int +} + +func newSignedTxGenerator(t *testing.T, n int) *signedTxGenerator { + t.Helper() + acct, err := types.NewAccount() + require.NoError(t, err) + chainID := big.NewInt(1) + return &signedTxGenerator{ + remaining: n, + acct: acct, + signer: ethtypes.NewCancunSigner(chainID), + chainID: chainID, + } +} + +func (g *signedTxGenerator) Generate() (*types.LoadTx, bool) { + g.mu.Lock() + defer g.mu.Unlock() + if g.remaining == 0 { + return nil, false + } + g.remaining-- + g.issued++ + + to := g.acct.Address + inner := ðtypes.DynamicFeeTx{ + ChainID: g.chainID, + Nonce: g.acct.GetAndIncrementNonce(), + To: &to, + Value: big.NewInt(1), + Gas: 21000, + GasTipCap: big.NewInt(2_000_000_000), + GasFeeCap: big.NewInt(200_000_000_000), + } + signed, err := ethtypes.SignTx(ethtypes.NewTx(inner), g.signer, g.acct.PrivKey) + if err != nil { + // Generators have no error channel; a signing failure here is a test bug. + panic(err) + } + scenario := &types.TxScenario{Name: "realworker", Sender: g.acct, Receiver: to} + return types.CreateTxFromEthTx(signed, scenario), true +} + +func (g *signedTxGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *signedTxGenerator) GetAccountPools() []types.AccountPool { return nil } + +func (g *signedTxGenerator) issuedCount() int { + g.mu.Lock() + defer g.mu.Unlock() + return g.issued +} + +// newRealWorker builds the production Worker against the given endpoint, in the +// open-loop configuration (RateLimited=false so the scheduler owns the clock, +// TrackReceipts=false so watchTransactions returns immediately and we exercise +// only the send path). It is the real TxSender the scheduler drives. +func newRealWorker(endpoint string, tasks, buffer int) *Worker { + return NewWorker(&WorkerConfig{ + ID: 0, + SeiChainID: "test", + Endpoint: endpoint, + BufferSize: buffer, + Tasks: tasks, + DryRun: false, + Debug: false, + Collector: stats.NewCollector(), + RateLimited: false, + }) +} + +// TestRealWorker_Conservation_OnRealSendPath asserts conservation +// (issued == completed + dropped) where `completed` is driven exclusively by the +// REAL worker invoking tx.OnComplete after sendTransaction returns — not by a +// fake. If runTxSender stopped calling OnComplete, completed would stall and +// this would fail. +func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { + const txCount = 200 + srv := newRPCServer(t) + gen := newSignedTxGenerator(t, txCount) + worker := newRealWorker(srv.url(), 8, 256) + + var completed, succeeded atomic.Uint64 + onSent := func(_ *types.LoadTx, err error) { + completed.Add(1) + if err == nil { + succeeded.Add(1) + } + } + + limiter := rate.NewLimiter(rate.Limit(2000), 1) + sched := newOpenLoopScheduler(gen, worker, limiter, 256, onSent) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + // Run the real worker (background) and the scheduler (main) in the same + // scope. The worker must keep draining txChan after the scheduler exhausts + // the generator, so we DON'T let the scheduler's return tear the scope down: + // we keep the main task alive until every admitted tx has completed, then + // cancel. Otherwise an admitted tx still buffered in txChan at cancel time + // would never fire OnComplete and conservation would (correctly) fail. + runCtx, runCancel := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = service.Run(runCtx, func(ctx context.Context, scope service.Scope) error { + scope.SpawnBg(func() error { return worker.Run(ctx) }) + return sched.Run(ctx, scope) + }) + }() + + issued := func() uint64 { return uint64(gen.issuedCount()) } + + // Every issued tx must be completed (by the real worker's OnComplete) or + // dropped — exactly once each, no leaks, no double-count. Driven by the real + // worker's OnComplete, so a missing invoke leaves completions short forever. + require.Eventually(t, func() bool { + i := issued() + return i > 0 && completed.Load()+sched.Dropped() == i + }, 3*time.Second, 2*time.Millisecond, + "issued=%d completed=%d dropped=%d", issued(), completed.Load(), sched.Dropped()) + + runCancel() + wg.Wait() + + require.Positive(t, issued()) + + // The completions came from the REAL send path, not a phantom OnComplete: + // every send the server successfully handled produced exactly one successful + // worker-driven completion. (A send that errored client-side — e.g. ctx + // canceled at teardown — still completes but is not server-handled, so we + // match handled against the success count, not the total.) + require.Positive(t, srv.handled.Load(), "the real RPC server must have handled sends") + require.Equal(t, succeeded.Load(), srv.handled.Load(), + "each successful completion must correspond to one eth_sendRawTransaction") +} + +// TestRealWorker_PermitReleasedByWorker is the teeth: with maxInFlight=1 and a +// single worker task, the RPC server blocks the first send. The real worker is +// parked inside sendTransaction, so it has NOT yet called tx.OnComplete and the +// single permit stays held — every subsequent arrival must drop. Releasing the +// blocked send lets the worker return from sendTransaction, fire OnComplete, and +// free the permit, so flow resumes. +// +// If someone deletes the `if tx.OnComplete != nil { tx.OnComplete(err) }` invoke +// in runTxSender, the permit is never released even after the send completes: +// the worker would never accept a second tx, so handled stays at 1 and the +// resume assertion fails. That is the falsification this test exists for. +func TestRealWorker_PermitReleasedByWorker(t *testing.T) { + srv := newRPCServer(t) + srv.setBlocking(true) + + // Plenty of arrivals so the scheduler keeps offering txs while the first is + // parked; the surplus must drop because the lone permit is held. + gen := newSignedTxGenerator(t, 1000) + // One task: a single runTxSender owns the only permit's lifecycle, so the + // permit can only be freed by that worker calling OnComplete. + worker := newRealWorker(srv.url(), 1, 1) + + var completed atomic.Uint64 + onSent := func(_ *types.LoadTx, _ error) { completed.Add(1) } + + // Fast arrival clock so many txs are offered during the blocked window. + limiter := rate.NewLimiter(rate.Limit(5000), 1) // 0.2ms gap + sched := newOpenLoopScheduler(gen, worker, limiter, 1, onSent) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = service.Run(ctx, func(ctx context.Context, scope service.Scope) error { + scope.SpawnBg(func() error { return worker.Run(ctx) }) + return sched.Run(ctx, scope) + }) + }() + + // Wait until exactly one send is genuinely in flight (parked in the handler). + <-srv.arrived + + // While that send is parked, the worker has not fired OnComplete, so the lone + // permit is held. Give the fast scheduler time to offer (and drop) a slew of + // arrivals, then assert the bound held: exactly one send in flight, none + // completed yet, and the rest dropped. + require.Eventually(t, func() bool { + return sched.Dropped() > 10 + }, 2*time.Second, 2*time.Millisecond, + "arrivals past the single held permit must drop while the send is in flight") + + require.Equal(t, uint64(0), completed.Load(), + "no completion may be reported while the only send is still parked") + require.Equal(t, uint64(1), srv.entered.Load(), + "exactly one send may be in flight under maxInFlight=1") + require.Equal(t, uint64(0), srv.handled.Load(), + "the parked send has not returned a result yet, so the permit is still held") + + // Release the blocked send. The real worker now returns from sendTransaction + // and MUST invoke tx.OnComplete to free the permit. If it does not (the bug), + // no further send is ever admitted and handled stays at 1 forever. + require.True(t, srv.releaseOne(), "one send must be parked and releasable") + + // Switch off blocking so resumed sends complete immediately, and prove flow + // resumes: more than one send is now handled, which is only possible if the + // permit from the first send was released by the worker's OnComplete. + srv.setBlocking(false) + // Drain any further parked sends that arrived between release and unblocking. + go func() { + for ctx.Err() == nil { + if !srv.releaseOne() { + select { + case <-ctx.Done(): + return + case <-time.After(time.Millisecond): + } + } + } + }() + + require.Eventually(t, func() bool { + return srv.handled.Load() > 1 && completed.Load() > 1 + }, 3*time.Second, 2*time.Millisecond, + "flow must resume after the worker releases the permit via OnComplete "+ + "(handled=%d completed=%d)", srv.handled.Load(), completed.Load()) + + // Flow resumed: at least one further send completed after the release, so the + // worker really did free the permit. Record the post-resume state before tear + // down (strict end-to-end conservation is covered by the conservation test; + // here cancel may leave a single tx mid-flight, so we only bound leaks). + require.Greater(t, completed.Load(), uint64(1), "resumed sends must complete") + + cancel() + wg.Wait() + + // No leak past the one tx that may be mid-flight at cancel: accounted txs + // (completed + dropped) must never exceed issued, and must trail it by at + // most that single in-flight tx. + accounted := completed.Load() + sched.Dropped() + issued := uint64(gen.issuedCount()) + require.LessOrEqual(t, accounted, issued, "no tx may be counted more than once") + require.GreaterOrEqual(t, accounted+1, issued, + "at most one admitted tx may be unaccounted at cancel (issued=%d completed=%d dropped=%d)", + issued, completed.Load(), sched.Dropped()) +} From 016780366a0d87a0ab4587a4cc90d5805bfcc1cc Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 12:15:32 -0700 Subject: [PATCH 5/9] PLT-458: make real-worker conservation test deterministic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The conservation test sampled `succeeded == handled` against an in-flight window: the httptest server bumps `handled` on receiving eth_sendRawTransaction, but the worker bumps `succeeded` only after SendTransaction returns and OnComplete fires. CI's slower scheduling caught that window (handled=200, succeeded=199). The dominant cause was teardown ordering, not pure sampling: the scheduler ran as the scope's main task, so the instant it exhausted the generator and returned, service.Run canceled the worker's context — aborting the last send whose 200 OK the server had already counted. That send completed with context-canceled, so completed++ but not succeeded++. Fix: run the scheduler and worker as background tasks behind a main gate that blocks until the test tears down, so the scope stays alive until quiescence. Anchor the assertion on the fixed total and require exhaustion, conservation, and equality together in one predicate, evaluated only once they all hold (a stable fixpoint, since the counters are monotonic and no new work is issued after exhaustion). Correctness depends on convergence, not the deadline. Verified: go test -race -count=50 and -count=20 -cpu=1,2,4 green; GOMAXPROCS=1 and =2 green. Falsification holds — commenting out the OnComplete invoke in runTxSender fails both tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- sender/scheduler_realworker_test.go | 88 +++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/sender/scheduler_realworker_test.go b/sender/scheduler_realworker_test.go index 79954c5..e379150 100644 --- a/sender/scheduler_realworker_test.go +++ b/sender/scheduler_realworker_test.go @@ -206,7 +206,7 @@ func (g *signedTxGenerator) Generate() (*types.LoadTx, bool) { return types.CreateTxFromEthTx(signed, scenario), true } -func (g *signedTxGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *signedTxGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } func (g *signedTxGenerator) GetAccountPools() []types.AccountPool { return nil } func (g *signedTxGenerator) issuedCount() int { @@ -255,15 +255,25 @@ func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { limiter := rate.NewLimiter(rate.Limit(2000), 1) sched := newOpenLoopScheduler(gen, worker, limiter, 256, onSent) - ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + ctx, cancel := context.WithCancel(t.Context()) defer cancel() - // Run the real worker (background) and the scheduler (main) in the same - // scope. The worker must keep draining txChan after the scheduler exhausts - // the generator, so we DON'T let the scheduler's return tear the scope down: - // we keep the main task alive until every admitted tx has completed, then - // cancel. Otherwise an admitted tx still buffered in txChan at cancel time - // would never fire OnComplete and conservation would (correctly) fail. + // Run the worker and scheduler in a scope whose teardown WE control via + // runCancel — not the scheduler's return. + // + // service.Run cancels the scope's context as soon as every MAIN task returns. + // If the scheduler were a main task, the instant it exhausts the generator and + // returns, service.Run would cancel the worker's context — aborting any send + // still in flight. A send whose 200 OK the server already counted (handled++) + // but whose client.SendTransaction had not yet returned would then fail with + // context-canceled: OnComplete fires with err != nil, so completed++ but NOT + // succeeded++. That is exactly the observed flake (handled=200, succeeded=199): + // not a sampling artifact but a teardown that races the last in-flight send. + // + // So the scheduler and worker are BACKGROUND tasks, and the lone MAIN task is a + // gate that blocks until the test calls runCancel(). The scope therefore stays + // alive — the worker keeps draining txChan and firing OnComplete — until the + // test has observed quiescence and torn down deliberately. runCtx, runCancel := context.WithCancel(ctx) var wg sync.WaitGroup wg.Add(1) @@ -271,32 +281,62 @@ func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { defer wg.Done() _ = service.Run(runCtx, func(ctx context.Context, scope service.Scope) error { scope.SpawnBg(func() error { return worker.Run(ctx) }) - return sched.Run(ctx, scope) + scope.SpawnBg(func() error { return sched.Run(ctx, scope) }) + // Main task: hold the scope open until the test signals teardown. + <-ctx.Done() + return nil }) }() issued := func() uint64 { return uint64(gen.issuedCount()) } - // Every issued tx must be completed (by the real worker's OnComplete) or - // dropped — exactly once each, no leaks, no double-count. Driven by the real - // worker's OnComplete, so a missing invoke leaves completions short forever. + // Assert ONLY at quiescence. All invariants are sampled together in one + // predicate so we never read them mid-flight, and we anchor on the FIXED + // total txCount so `issued` is not a moving target: + // + // exhausted: issued == txCount (the generator is drained, so + // no new work can perturb the counters — the precondition that + // makes the fixpoint below stable) + // conservation: completed + dropped == txCount (every issued tx reached a + // terminal state via the real worker's OnComplete or a drop) + // equality: succeeded == handled (every server-handled send + // produced exactly one successful worker-driven completion) + // + // conservation and equality are transiently off WHILE a send is in flight: the + // server bumps `handled` when it RECEIVES eth_sendRawTransaction, but the worker + // bumps `succeeded` only AFTER SendTransaction returns and OnComplete fires — the + // instants differ by the server→worker-return window. Sampling any of them alone, + // or at different instants, can catch that window. Requiring all three together, + // only once they hold, observes the system after that window has drained. The + // counters are monotonic; once the generator is exhausted no new work is issued, + // so once ALL THREE hold they stay held — that stable fixpoint is the quiescent + // point. (The deeper hazard the gate above fixes is teardown racing that same + // window; here we additionally refuse to read until the window is empty.) + // + // Driven by the real worker's OnComplete — a missing invoke leaves completed + // (and succeeded) short forever, so convergence never happens and the test + // fails on the Eventually deadline. CI is slow, so the window is generous; + // correctness depends on convergence, not on the deadline firing. + const total = uint64(txCount) require.Eventually(t, func() bool { - i := issued() - return i > 0 && completed.Load()+sched.Dropped() == i - }, 3*time.Second, 2*time.Millisecond, - "issued=%d completed=%d dropped=%d", issued(), completed.Load(), sched.Dropped()) - + exhausted := issued() == total + conserved := completed.Load()+sched.Dropped() == total + balanced := succeeded.Load() == srv.handled.Load() + return exhausted && conserved && balanced + }, 10*time.Second, 2*time.Millisecond, + "never reached quiescence (want issued=conserved=%d, succeeded=handled)", total) + + // System is quiescent: the generator is drained, no send is in flight, every + // issued tx is terminal, and every handled send has its OnComplete recorded. + // Only now tear down — the counters cannot move under us, so the assertions + // below re-read a frozen state, not a sampled one. runCancel() wg.Wait() - require.Positive(t, issued()) - - // The completions came from the REAL send path, not a phantom OnComplete: - // every send the server successfully handled produced exactly one successful - // worker-driven completion. (A send that errored client-side — e.g. ctx - // canceled at teardown — still completes but is not server-handled, so we - // match handled against the success count, not the total.) + require.Equal(t, total, issued(), "the generator must be fully drained") require.Positive(t, srv.handled.Load(), "the real RPC server must have handled sends") + require.Equal(t, total, completed.Load()+sched.Dropped(), + "every issued tx must reach a terminal state (completed or dropped)") require.Equal(t, succeeded.Load(), srv.handled.Load(), "each successful completion must correspond to one eth_sendRawTransaction") } From 33d7237b56900ebb3a8da157cc00ec455d0e954b Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 13:33:33 -0700 Subject: [PATCH 6/9] docs(sender): consolidate open-loop rationale into package doc Move the dense coordinated-omission / arrival-model narrative out of scheduler.go into a new sender/doc.go package doc, and lean the inline comments to terse pointers. No behavior change. The load-bearing inline notes stay (leaned) at the code they guard: the worker's OnComplete permit-release and the single-writer stamp-before- hand-off contract. Co-Authored-By: Claude Opus 4.8 (1M context) --- sender/dispatcher.go | 5 ++- sender/doc.go | 66 ++++++++++++++++++++++++++++++++++ sender/scheduler.go | 85 ++++++++++++++------------------------------ sender/worker.go | 9 +++-- 4 files changed, 98 insertions(+), 67 deletions(-) create mode 100644 sender/doc.go diff --git a/sender/dispatcher.go b/sender/dispatcher.go index f377871..5ba2ccc 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -153,9 +153,8 @@ func (d *Dispatcher) runClosedLoop(ctx context.Context) error { return nil } - // Stamp before hand-off: the dispatcher is sole owner here (tx just - // returned by the generator, not yet enqueued), so this write is race-free. - // This is the back-pressured enqueue time, not a true schedule instant. + // Stamp before hand-off while sole owner: race-free (see LoadTx). This is + // the back-pressured enqueue time, not a true schedule instant. tx.IntendedSendTime = time.Now() // Send the transaction diff --git a/sender/doc.go b/sender/doc.go new file mode 100644 index 0000000..a69820e --- /dev/null +++ b/sender/doc.go @@ -0,0 +1,66 @@ +// Package sender drives transactions from generation to the chain. +// +// The send path is a pipeline: a [generator.Generator] produces transactions; +// the [Dispatcher] times their arrival and hands each off to a [TxSender]; the +// [ShardedSender] routes each tx to one of N per-endpoint [Worker]s by shard; +// the worker's send loop stamps the attempt and calls the go-ethereum client +// (eth_sendRawTransaction). Receipts, when tracked, are +// polled on a separate worker loop. A shared [golang.org/x/time/rate.Limiter] is +// the single rate authority for the whole pipeline; the [Ramper] drives its +// limit up or down via SetLimit. +// +// # Open-loop arrival model +// +// The dispatcher supports two arrival models (see [ArrivalModel]). The open-loop +// model exists to eliminate coordinated omission from the latency measurement. +// +// Coordinated omission. In the legacy closed-loop model the dispatcher generates +// the next tx only once a sender is free, so the dequeue clock is the SUT's +// clock: when the system under test slows, the generator slows with it and +// simply stops issuing the requests that would have observed the slowdown. The +// latency histogram then under-reports, because the worst-affected requests were +// never sent. This is coordinated omission. The closed-loop model lies about +// latency precisely when the answer matters most. +// +// Open-loop fixes this by decoupling the arrival clock from sender availability. +// Transaction i is scheduled at a fixed instant t₀ + i/λ, where t₀ is the run +// start and λ is the target rate, regardless of whether any sender is free. The +// scheduler sleeps until each absolute instant rather than for a relative gap +// ("sleep 1/λ from now"), so per-tx scheduling slop cannot accumulate into clock +// drift over a long run. λ is sampled from the shared limiter on each step, so a +// ramping rate is honored; at a fixed λ the running sum telescopes to exactly +// t₀ + i/λ. The limiter is read here as a clock source, not as a permit gate — +// the schedule advances whether or not the SUT keeps up. +// +// Bounded in-flight and drop-and-count. The arrival clock is never throttled by +// backpressure; throttling it would reintroduce coordinated omission. Instead a +// counting semaphore bounds true in-flight sends to maxInFlight. At each tx's +// scheduled instant the scheduler tries to acquire a permit without blocking: if +// the senders are saturated the tx is dropped and counted, and the clock moves +// on. The permit is not released at enqueue. The scheduler installs a release +// callback on tx.OnComplete, and the worker invokes it only after the +// synchronous send returns — note the two phases of the worker path: the enqueue +// into the worker's channel ([TxSender.Send]) is asynchronous and returns at +// once, but the RPC send itself is synchronous. So the permit is held for the +// full unacked-in-flight window (enqueue plus RPC round-trip), and maxInFlight +// bounds real in-flight work while the drop count measures genuine load shed, +// not buffer geometry. +// +// LoadTx lifecycle and scheduling. The scheduling-relevant fields of [types.LoadTx] +// follow its single-writer concurrency contract: each is written once by the +// goroutine that solely owns the tx at that stage, then is immutable as ownership +// transfers with the pointer across channels. The scheduler stamps IntendedSendTime +// (the true scheduled instant t₀ + i/λ) and SequenceIndex (the arrival index i) +// before hand-off; the worker stamps AttemptedSendTime at the real send. A tx +// cannot self-describe which model produced it — an open-loop and a closed-loop +// tx are byte-identical — so coordinated-omission safety is a property of the +// run's arrival model, not of any per-tx field. Latency and schedule-lag consumers +// must gate on the run-level arrival model. +// +// Detection and baseline. schedule_lag (AttemptedSendTime minus IntendedSendTime) +// is the primary coordinated-omission gate: it shows when sends fall behind the +// arrival schedule even before any tx is shed. The drop count measures only +// genuine shedding once in-flight saturates. A send error does not tear down the +// campaign — errors and drops are surfaced through counters, not by aborting the +// run. The closed-loop model is retained as the regression baseline. +package sender diff --git a/sender/scheduler.go b/sender/scheduler.go index 49bd093..249feb6 100644 --- a/sender/scheduler.go +++ b/sender/scheduler.go @@ -15,31 +15,11 @@ import ( "github.com/sei-protocol/sei-load/utils/service" ) -// openLoopScheduler issues transactions on an open-loop arrival clock: tx i is -// scheduled at t₀ + i/λ regardless of whether any sender is free. This is the -// coordinated-omission fix — when the SUT slows, the arrival clock does NOT -// slow with it; overdue txs are dropped and counted instead (REL8/REL9 load -// shedding), so measured latency reflects the backlog rather than hiding it. -// -// λ comes from the shared rate.Limiter, which the ramper drives via SetLimit -// (one rate authority). The limiter is read as a clock source here, not as a -// permit gate: the schedule advances by 1/λ per tx, sampling λ each step so a -// ramping λ is honored. At a fixed λ this telescopes to exactly t₀ + i/λ. -// -// In-flight work is bounded by a semaphore. A tx that cannot acquire a permit -// without blocking is dropped (the senders are saturated); the scheduler never -// blocks on capacity, which is what keeps the arrival clock unthrottled. -// -// The permit is held until the actual network send completes, not until the tx -// is enqueued: the scheduler stamps tx.OnComplete with the release, and the -// worker invokes it after sendTransaction returns. So maxInFlight bounds true -// unacked in-flight sends and dropped measures genuine load-shed, not buffer -// geometry. Note the two distinct phases of the worker path: the ENQUEUE into -// the worker's txChan (TxSender.Send) is async and returns immediately, but the -// RPC send itself (sendTransaction → eth_sendRawTransaction) is SYNCHRONOUS — so -// the permit is held for the full RPC round-trip, not just the enqueue. Note -// also: schedule_lag (PLT-463), not the drop count, remains the primary -// coordinated-omission gate. +// openLoopScheduler issues tx i at t₀ + i/λ on an arrival clock decoupled from +// sender availability, bounding true in-flight with a semaphore and dropping +// (counting) overdue txs rather than throttling the clock. λ comes from the +// shared limiter (one rate authority). See the package doc for the open-loop +// arrival model: coordinated omission, drop-and-count, and the permit lifecycle. type openLoopScheduler struct { generator generator.Generator sender TxSender @@ -53,11 +33,10 @@ type openLoopScheduler struct { dropped atomic.Uint64 } -// minScheduleRate floors λ when computing the inter-arrival gap so a zero or -// negative limit cannot make the gap divide-by-zero or blow up to a +Inf -// duration. It does not bound how long the scheduler sleeps: a small-but-finite -// λ still yields a long gap. The degenerate λ=Inf / TPS=0 open-loop case is -// rejected up front in config validation (see config.Settings.Validate). +// minScheduleRate floors λ in the inter-arrival gap so a zero/negative limit +// cannot divide-by-zero or yield a +Inf gap. It does not cap the sleep: a small +// finite λ still yields a long gap. The degenerate λ=Inf / TPS=0 case is +// rejected up front (see config.Settings.Validate). const minScheduleRate = 1e-9 func newOpenLoopScheduler( @@ -80,29 +59,23 @@ func newOpenLoopScheduler( // Dropped returns the number of txs shed so far because in-flight was saturated. func (s *openLoopScheduler) Dropped() uint64 { return s.dropped.Load() } -// Run drives the open-loop arrival clock until the context is canceled or the -// generator is exhausted. Each accepted tx is sent on its own task spawned into -// scope, bounded by the in-flight semaphore; the send task releases the permit -// on completion, so the bound covers true in-flight (enqueue + send), not just -// queue depth. +// Run drives the arrival clock until ctx is canceled or the generator is +// exhausted, spawning each accepted tx as a send task bounded by the in-flight +// semaphore. See the package doc for the arrival model. func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error { t0 := time.Now() nextSend := t0 var i uint64 for ctx.Err() == nil { - // Advance the schedule by one inter-arrival gap. Sampling λ here (not - // once up front) honors a ramping limit; at fixed λ the running sum is - // exactly t₀ + i/λ. + // Sample λ per step (honors a ramping limit; at fixed λ sums to t₀ + i/λ). lambda := float64(s.limiter.Limit()) if lambda < minScheduleRate { lambda = minScheduleRate } gap := time.Duration(float64(time.Second) / lambda) - // Sleep until this tx's scheduled instant. Sleeping to an absolute - // instant (not "gap from now") prevents per-tx scheduling slop from - // accumulating into clock drift. + // Sleep to the absolute instant (not "gap from now") to avoid drift. if err := utils.SleepUntil(ctx, nextSend); err != nil { return err } @@ -113,29 +86,26 @@ func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error return nil } - // Stamp the TRUE scheduled instant and the arrival index while we are - // the sole owner of tx (see LoadTx concurrency contract). + // Stamp the scheduled instant and arrival index while sole owner (see + // LoadTx concurrency contract). tx.IntendedSendTime = nextSend tx.SequenceIndex = i nextSend = nextSend.Add(gap) i++ - // Non-blocking admission: if senders are saturated, drop and count - // rather than block — blocking here would throttle the arrival clock - // and reintroduce coordinated omission. + // Non-blocking admit: never throttle the arrival clock (see package + // doc: coordinated omission). Saturated senders mean drop-and-count. release, ok := s.inflight.TryAcquire() if !ok { s.dropped.Add(1) continue } - // Complete fires exactly once when the send finishes: it releases the - // in-flight permit and reports the result. The worker invokes it via - // tx.OnComplete after the real send (the happy path), so the permit is - // held for the whole unacked-in-flight window, not just the enqueue. - // The Once guards the enqueue-failure fallback below from racing the - // worker — though by construction only one path runs. + // complete fires once on send completion: releases the permit and reports + // the result. The worker invokes it via tx.OnComplete after the real send + // (see package doc: permit lifecycle). The Once guards against the + // enqueue-failure fallback below racing the worker. var once sync.Once complete := func(err error) { once.Do(func() { @@ -148,16 +118,13 @@ func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error tx.OnComplete = complete scope.Spawn(func() error { - // Send returns at enqueue; the permit release is deferred to the - // worker via tx.OnComplete on real completion. If the enqueue itself - // fails (e.g. ctx canceled), the tx never reaches a worker, so we - // complete it here to avoid leaking the permit. + // Send returns at enqueue; the worker releases the permit on real + // completion. On enqueue failure the tx never reaches a worker, so + // complete here to avoid leaking the permit. if err := s.sender.Send(ctx, tx); err != nil { complete(err) } - // A send error must not tear down the campaign; the closed-loop - // path logs-and-continues identically. Drops/errors are surfaced - // via counters, not by returning here. + // A send error must not tear down the campaign; surfaced via counters. return nil }) } diff --git a/sender/worker.go b/sender/worker.go index bd224e1..293c754 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -239,13 +239,12 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro } startTime := time.Now() - // This goroutine solely owns tx between dequeue and the sentTxs hand-off, - // so stamping the actual send-attempt time here is race-free (see LoadTx). + // Sole owner between dequeue and hand-off: stamping here is race-free (see LoadTx). tx.AttemptedSendTime = startTime err = w.sendTransaction(ctx, client, tx) - // Release the in-flight permit on actual send completion (open-loop). - // This is what makes maxInFlight bound true unacked sends rather than - // enqueue backlog; the closed-loop and batch paths leave it nil. + // CRITICAL: invoke OnComplete only after the real send returns — this is + // what makes the open-loop semaphore bound true unacked sends, not enqueue + // backlog (see package doc: permit lifecycle). Nil on closed-loop/batch. if tx.OnComplete != nil { tx.OnComplete(err) } From fe8df95f24057e2bfb566470a03c52615cdcc322 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 14:19:07 -0700 Subject: [PATCH 7/9] PLT-458: admit before generate; account failed sends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bugbot finding 1: the scheduler called Generate() before TryAcquire, so a dropped tick still advanced the seeded generator streams and signed a tx that was then discarded. Under saturation the same seed produced a different admitted workload depending on SUT speed, violating the per-stream reproducibility contract (PLT-456). Reorder so the in-flight permit is acquired first; a dropped tick now consumes zero draws and zero signing CPU, making admitted txs a deterministic prefix of the seeded sequence. The arrival clock stays non-blocking/unthrottled. Add Admitted() so conservation anchors on scheduled-arrival counts (admitted + dropped), not generator-draw counts, since draws now occur only on admitted ticks. SequenceIndex remains the arrival-tick index i (monotonic, non-contiguous under drops) so IntendedSendTime = t0 + i/λ holds. New determinism guard test asserts admitted draws are a gapless prefix under forced drops. Bugbot finding 2: an admitted send that failed in the worker was logged but counted as neither sent nor dropped, breaking issued == sent + dropped under RPC failures. Add a failed counter; onSent increments succeeded on nil err and failed on non-nil err (both release the permit). Thread failed into the run summary (new run_txs_failed_total gauge) for goodput accounting. Conservation invariant documented and tested: scheduled == dropped + admitted, and admitted == succeeded + failed; the conservation test injects a failing send and asserts it is counted as failed. Co-Authored-By: Claude Opus 4.8 (1M context) --- main.go | 7 +- sender/dispatcher.go | 27 ++- sender/doc.go | 23 ++- sender/scheduler.go | 60 +++++-- sender/scheduler_realworker_test.go | 62 +++---- sender/scheduler_test.go | 247 +++++++++++++++++++++++++--- stats/metrics.go | 5 + stats/run_summary.go | 9 +- 8 files changed, 362 insertions(+), 78 deletions(-) diff --git a/main.go b/main.go index a0da110..9e9b18f 100644 --- a/main.go +++ b/main.go @@ -374,10 +374,15 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { summary := stats.RunSummary{ArrivalModel: config.ArrivalModelClosedLoop} if dispatcher != nil { summary.ArrivalModel = string(dispatcher.ArrivalModel()) - summary.Dropped = dispatcher.GetStats().Dropped + dstats := dispatcher.GetStats() + summary.Dropped = dstats.Dropped + summary.Failed = dstats.Failed if summary.Dropped > 0 { log.Printf("⚠️ Open-loop dropped %d txs (in-flight saturated; not throttled)", summary.Dropped) } + if summary.Failed > 0 { + log.Printf("⚠️ Open-loop %d txs failed to send (admitted but errored; not lost)", summary.Failed) + } } collector.EmitRunSummary(ctx, summary) if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 { diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 5ba2ccc..239d702 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -41,8 +41,13 @@ type Dispatcher struct { limiter *rate.Limiter maxInFlight int - // Statistics - totalSent uint64 + // Statistics. The open-loop conservation invariant is + // issued (scheduled ticks) == dropped (not admitted) + admitted, and + // admitted == totalSent (succeeded) + failed. + // A tx that is admitted and enqueues but errors in the worker's send is + // counted as failed, not lost: it is neither sent nor dropped. + totalSent uint64 // admitted sends that completed with a nil error (succeeded) + failed uint64 // admitted sends that completed with a non-nil error dropped uint64 mu sync.RWMutex collector *stats.Collector @@ -188,11 +193,17 @@ func (d *Dispatcher) runOpenLoop(ctx context.Context) error { return err } -// onSent records a completed open-loop send. A successful send advances -// totalSent; the scheduler counts drops separately. +// onSent records a completed open-loop send (an admitted tx whose send attempt +// finished). A nil error advances totalSent (succeeded); a non-nil error +// advances failed. An admitted tx is therefore always counted exactly once as +// succeeded or failed — never silently lost — preserving +// admitted == succeeded + failed under RPC failures. func (d *Dispatcher) onSent(tx *types.LoadTx, err error) { if err != nil { log.Printf("Scheduler: send failed (seq %d): %v", tx.SequenceIndex, err) + d.mu.Lock() + d.failed++ + d.mu.Unlock() return } d.mu.Lock() @@ -234,13 +245,21 @@ func (d *Dispatcher) GetStats() DispatcherStats { return DispatcherStats{ TotalSent: d.totalSent, + Failed: d.failed, Dropped: d.dropped, } } // DispatcherStats contains statistics for the dispatcher type DispatcherStats struct { + // TotalSent is the number of admitted sends that completed with a nil error + // (succeeded). TotalSent uint64 + // Failed is the number of open-loop sends that were admitted (took a permit) + // and enqueued but completed with a non-nil error. Counted, not lost: with + // Dropped and TotalSent it closes the conservation invariant + // issued == Dropped + TotalSent + Failed. Always 0 in closed-loop mode. + Failed uint64 // Dropped is the number of open-loop txs shed because in-flight was // saturated at their scheduled instant. Always 0 in closed-loop mode. Dropped uint64 diff --git a/sender/doc.go b/sender/doc.go index a69820e..ca080fb 100644 --- a/sender/doc.go +++ b/sender/doc.go @@ -34,9 +34,9 @@ // // Bounded in-flight and drop-and-count. The arrival clock is never throttled by // backpressure; throttling it would reintroduce coordinated omission. Instead a -// counting semaphore bounds true in-flight sends to maxInFlight. At each tx's +// counting semaphore bounds true in-flight sends to maxInFlight. At each // scheduled instant the scheduler tries to acquire a permit without blocking: if -// the senders are saturated the tx is dropped and counted, and the clock moves +// the senders are saturated the tick is dropped and counted, and the clock moves // on. The permit is not released at enqueue. The scheduler installs a release // callback on tx.OnComplete, and the worker invokes it only after the // synchronous send returns — note the two phases of the worker path: the enqueue @@ -46,6 +46,25 @@ // bounds real in-flight work while the drop count measures genuine load shed, // not buffer geometry. // +// Admit before generate (determinism). The permit is acquired BEFORE the +// generator is drawn, not after. A dropped tick therefore calls neither +// Generate (no seeded-stream draw, no per-tx state advance) nor the signer (no +// wasted CPU on shed load). This makes the admitted transactions a deterministic +// PREFIX of the seeded generator sequence: the same seed yields the same +// admitted multiset regardless of how many ticks the SUT speed forced to drop — +// the per-stream reproducibility contract holds under saturation, where it +// otherwise would not. SequenceIndex is the arrival-tick index i (so +// IntendedSendTime = t₀ + i/λ holds); under drops it is monotonic but +// non-contiguous across admitted txs, because dropped ticks still advance i and +// the clock while consuming no draw. +// +// Conservation. Every scheduled tick reaches exactly one terminal state: +// dropped (not admitted) or admitted. Every admitted tx in turn completes as +// succeeded (nil send error) or failed (non-nil send error) — a failed send is +// counted, never lost. So scheduled == dropped + admitted and +// admitted == succeeded + failed. The dispatcher folds these into the run +// summary (dropped and failed gauges) for goodput accounting. +// // LoadTx lifecycle and scheduling. The scheduling-relevant fields of [types.LoadTx] // follow its single-writer concurrency contract: each is written once by the // goroutine that solely owns the tx at that stage, then is immutable as ownership diff --git a/sender/scheduler.go b/sender/scheduler.go index 249feb6..0ee73ad 100644 --- a/sender/scheduler.go +++ b/sender/scheduler.go @@ -28,9 +28,18 @@ type openLoopScheduler struct { onSent func(tx *types.LoadTx, err error) maxInFlight int - // dropped counts txs shed because in-flight was saturated at their - // scheduled instant. Read after Run returns, or concurrently via Dropped. - dropped atomic.Uint64 + // dropped counts ticks shed because in-flight was saturated at their + // scheduled instant (no permit, no Generate, no draw). admitted counts ticks + // that took a permit AND drew a real tx from the generator (the work actually + // handed to a sender). Their sum is the number of scheduled arrivals that + // represented work — the conservation anchor: + // admitted + dropped == scheduled arrivals, and + // admitted == succeeded + failed (the send outcome, accounted by onSent). + // A tick whose Generate returns no work (generator exhausted) is neither: + // the permit it briefly held is released and no arrival is counted. Read + // after Run returns, or concurrently via Dropped/Admitted. + dropped atomic.Uint64 + admitted atomic.Uint64 } // minScheduleRate floors λ in the inter-arrival gap so a zero/negative limit @@ -56,9 +65,14 @@ func newOpenLoopScheduler( } } -// Dropped returns the number of txs shed so far because in-flight was saturated. +// Dropped returns the number of ticks shed so far because in-flight was saturated. func (s *openLoopScheduler) Dropped() uint64 { return s.dropped.Load() } +// Admitted returns the number of ticks that took a permit and drew a real tx +// (the work handed to a sender). Admitted + Dropped is the scheduled-arrival +// count; Admitted equals succeeded + failed once all sends complete. +func (s *openLoopScheduler) Admitted() uint64 { return s.admitted.Load() } + // Run drives the arrival clock until ctx is canceled or the generator is // exhausted, spawning each accepted tx as a send task bounded by the in-flight // semaphore. See the package doc for the arrival model. @@ -80,28 +94,40 @@ func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error return err } - tx, ok := s.generator.Generate() - if !ok { - log.Print("Scheduler: generator returned no more transactions") - return nil - } - - // Stamp the scheduled instant and arrival index while sole owner (see - // LoadTx concurrency contract). - tx.IntendedSendTime = nextSend - tx.SequenceIndex = i - + // Advance the arrival clock for this scheduled tick before admission, so + // the schedule walks at λ whether or not the tx is admitted. + intendedSendTime := nextSend + seqIndex := i nextSend = nextSend.Add(gap) i++ - // Non-blocking admit: never throttle the arrival clock (see package - // doc: coordinated omission). Saturated senders mean drop-and-count. + // Admit BEFORE generating: a non-blocking TryAcquire that never throttles + // the arrival clock (see package doc: coordinated omission). On a drop the + // tick is counted but the generator is NOT advanced — a dropped slot + // consumes zero seeded-stream draws and no signing CPU, so admitted txs are + // a deterministic prefix of the seeded sequence regardless of how many + // ticks were shed (the per-stream reproducibility contract, PLT-456). release, ok := s.inflight.TryAcquire() if !ok { s.dropped.Add(1) continue } + // Permit held: now draw the next tx from the (seeded) generator. + tx, ok := s.generator.Generate() + if !ok { + // No work left: release the permit we will not use and stop. + release() + log.Print("Scheduler: generator returned no more transactions") + return nil + } + + // Stamp the scheduled instant and arrival index while sole owner (see + // LoadTx concurrency contract). + tx.IntendedSendTime = intendedSendTime + tx.SequenceIndex = seqIndex + s.admitted.Add(1) + // complete fires once on send completion: releases the permit and reports // the result. The worker invokes it via tx.OnComplete after the real send // (see package doc: permit lifecycle). The Once guards against the diff --git a/sender/scheduler_realworker_test.go b/sender/scheduler_realworker_test.go index e379150..32ec741 100644 --- a/sender/scheduler_realworker_test.go +++ b/sender/scheduler_realworker_test.go @@ -288,18 +288,17 @@ func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { }) }() - issued := func() uint64 { return uint64(gen.issuedCount()) } - // Assert ONLY at quiescence. All invariants are sampled together in one - // predicate so we never read them mid-flight, and we anchor on the FIXED - // total txCount so `issued` is not a moving target: + // predicate so we never read them mid-flight. Post-reorder, the generator is + // drawn only on admitted ticks, so the conservation anchor is the scheduler's + // OWN counters, not the generator draw count: // - // exhausted: issued == txCount (the generator is drained, so - // no new work can perturb the counters — the precondition that - // makes the fixpoint below stable) - // conservation: completed + dropped == txCount (every issued tx reached a - // terminal state via the real worker's OnComplete or a drop) - // equality: succeeded == handled (every server-handled send + // admitted: Admitted() == txCount (every draw was admitted — the + // generator is drained and dropped ticks consumed no draw; + // the precondition that makes the fixpoint below stable) + // conservation: completed == Admitted() (every admitted tx reached a + // terminal state via the real worker's OnComplete) + // equality: succeeded == handled (every server-handled send // produced exactly one successful worker-driven completion) // // conservation and equality are transiently off WHILE a send is in flight: the @@ -308,7 +307,7 @@ func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { // instants differ by the server→worker-return window. Sampling any of them alone, // or at different instants, can catch that window. Requiring all three together, // only once they hold, observes the system after that window has drained. The - // counters are monotonic; once the generator is exhausted no new work is issued, + // counters are monotonic; once the generator is exhausted no new work is admitted, // so once ALL THREE hold they stay held — that stable fixpoint is the quiescent // point. (The deeper hazard the gate above fixes is teardown racing that same // window; here we additionally refuse to read until the window is empty.) @@ -319,24 +318,25 @@ func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { // correctness depends on convergence, not on the deadline firing. const total = uint64(txCount) require.Eventually(t, func() bool { - exhausted := issued() == total - conserved := completed.Load()+sched.Dropped() == total + admittedAll := sched.Admitted() == total + conserved := completed.Load() == sched.Admitted() balanced := succeeded.Load() == srv.handled.Load() - return exhausted && conserved && balanced + return admittedAll && conserved && balanced }, 10*time.Second, 2*time.Millisecond, - "never reached quiescence (want issued=conserved=%d, succeeded=handled)", total) + "never reached quiescence (want admitted=completed=%d, succeeded=handled)", total) // System is quiescent: the generator is drained, no send is in flight, every - // issued tx is terminal, and every handled send has its OnComplete recorded. + // admitted tx is terminal, and every handled send has its OnComplete recorded. // Only now tear down — the counters cannot move under us, so the assertions // below re-read a frozen state, not a sampled one. runCancel() wg.Wait() - require.Equal(t, total, issued(), "the generator must be fully drained") + require.Equal(t, total, sched.Admitted(), "every generator draw must be an admitted tx") + require.Equal(t, total, uint64(gen.issuedCount()), "the generator must be fully drained") require.Positive(t, srv.handled.Load(), "the real RPC server must have handled sends") - require.Equal(t, total, completed.Load()+sched.Dropped(), - "every issued tx must reach a terminal state (completed or dropped)") + require.Equal(t, sched.Admitted(), completed.Load(), + "every admitted tx must reach a terminal state via the worker's OnComplete") require.Equal(t, succeeded.Load(), srv.handled.Load(), "each successful completion must correspond to one eth_sendRawTransaction") } @@ -439,13 +439,19 @@ func TestRealWorker_PermitReleasedByWorker(t *testing.T) { cancel() wg.Wait() - // No leak past the one tx that may be mid-flight at cancel: accounted txs - // (completed + dropped) must never exceed issued, and must trail it by at - // most that single in-flight tx. - accounted := completed.Load() + sched.Dropped() - issued := uint64(gen.issuedCount()) - require.LessOrEqual(t, accounted, issued, "no tx may be counted more than once") - require.GreaterOrEqual(t, accounted+1, issued, - "at most one admitted tx may be unaccounted at cancel (issued=%d completed=%d dropped=%d)", - issued, completed.Load(), sched.Dropped()) + // No leak past the one tx that may be mid-flight at cancel. Post-reorder, the + // generator is drawn only on admitted ticks, so every draw is an admitted tx: + // generator-draw count must equal Admitted() exactly (dropped ticks consumed + // no draw — the determinism property, on the real worker path). + admitted := sched.Admitted() + require.Equal(t, admitted, uint64(gen.issuedCount()), + "every generator draw must be an admitted tx; dropped ticks consume no draw") + + // Each admitted tx completes exactly once via the worker's OnComplete, so + // completed must equal Admitted() — minus at most the single tx left mid-flight + // when cancel raced its in-flight send. + require.LessOrEqual(t, completed.Load(), admitted, "no admitted tx may complete more than once") + require.GreaterOrEqual(t, completed.Load()+1, admitted, + "at most one admitted tx may be unaccounted at cancel (admitted=%d completed=%d dropped=%d)", + admitted, completed.Load(), sched.Dropped()) } diff --git a/sender/scheduler_test.go b/sender/scheduler_test.go index 733764b..9dcd7bb 100644 --- a/sender/scheduler_test.go +++ b/sender/scheduler_test.go @@ -2,6 +2,8 @@ package sender import ( "context" + "errors" + "slices" "sync" "sync/atomic" "testing" @@ -50,6 +52,48 @@ func (g *fakeGenerator) issuedTxs() []*types.LoadTx { return out } +// seededGenerator stands in for a PLT-456 seeded generator: each Generate() +// draw is the next element of a deterministic stream, recorded here in draw +// order via a strictly increasing draw index stamped on the LoadTx. The point +// the determinism guard pins: a draw is consumed only when Generate() is +// called, so if the scheduler advances the stream on a dropped tick the draw +// indices admitted under saturation would be non-contiguous (gaps where a +// dropped tick stole a draw). Admitted draws forming the prefix 0..N-1 with no +// gaps proves dropped slots consume zero seeded draws. +type seededGenerator struct { + mu sync.Mutex + remaining int + drawIndex map[*types.LoadTx]uint64 // tx → its draw position in the stream + nextDraw uint64 +} + +func newSeededGenerator(n int) *seededGenerator { + return &seededGenerator{remaining: n, drawIndex: map[*types.LoadTx]uint64{}} +} + +func (g *seededGenerator) Generate() (*types.LoadTx, bool) { + g.mu.Lock() + defer g.mu.Unlock() + if g.remaining == 0 { + return nil, false + } + g.remaining-- + tx := &types.LoadTx{Scenario: &types.TxScenario{Name: "seeded"}} + g.drawIndex[tx] = g.nextDraw + g.nextDraw++ + return tx, true +} + +// draw returns the stream position at which tx was produced by Generate(). +func (g *seededGenerator) draw(tx *types.LoadTx) uint64 { + g.mu.Lock() + defer g.mu.Unlock() + return g.drawIndex[tx] +} + +func (g *seededGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *seededGenerator) GetAccountPools() []types.AccountPool { return nil } + // asyncFakeSender models the production ShardedSender's send semantics: Send // returns when the tx lands in a buffered channel (enqueue-and-return), NOT when // the network send completes. Background workers dequeue and, after an optional @@ -180,28 +224,41 @@ func TestOpenLoopSchedule_NotThrottledBySlowSender(t *testing.T) { start := time.Now() runScheduler(ctx, sched) - issued := gen.issuedTxs() + admittedTxs := gen.issuedTxs() gap := time.Second / time.Duration(lambda) // The clock must have kept advancing at λ despite the slow sender: the - // schedule should have walked far past what the senders could absorb. - require.GreaterOrEqual(t, len(issued), 100, - "arrival clock must not be throttled by the slow sender") - - // Schedule accuracy still holds for the issued txs. - t0 := issued[0].IntendedSendTime - require.WithinDuration(t, start, t0, gap) + // number of SCHEDULED ARRIVALS (admitted + dropped) — not generator draws — + // must have walked far past what the senders could absorb. Generate() now + // runs only on admitted ticks (post-reorder), so generated count is small; + // the never-throttled-clock property lives in the scheduled-arrival count. + scheduled := sched.Admitted() + sched.Dropped() + require.GreaterOrEqual(t, scheduled, uint64(100), + "arrival clock must not be throttled by the slow sender (scheduled=%d)", scheduled) + + // Schedule accuracy still holds for each admitted tx, keyed on its + // SequenceIndex (the arrival-tick index i): IntendedSendTime ≈ t₀ + i/λ. + // Admitted txs have NON-CONTIGUOUS SequenceIndex under drops, so the schedule + // must be checked against tx.SequenceIndex, never the slice position. + require.Positive(t, len(admittedTxs), "some txs must have been admitted") + // Recover the scheduler's internal t₀ from the first admitted tx and its + // arrival index: t₀ = IntendedSendTime − SequenceIndex·gap. Bound it to the + // test's observed start window, then assert every admitted tx sits on the + // t₀ + i/λ grid at its own SequenceIndex. + first := admittedTxs[0] + t0 := first.IntendedSendTime.Add(-time.Duration(first.SequenceIndex) * gap) + require.WithinDuration(t, start, t0, gap, "recovered t₀ must be the campaign start") const tol = 3 * time.Millisecond - for i, tx := range issued { - want := t0.Add(time.Duration(i) * gap) + for _, tx := range admittedTxs { + want := t0.Add(time.Duration(tx.SequenceIndex) * gap) require.WithinDuration(t, want, tx.IntendedSendTime, tol, - "tx %d schedule must hold under a slow sender", i) + "admitted tx (seq %d) schedule must hold under a slow sender", tx.SequenceIndex) } - // Overrun is dropped-and-counted, not blocked on. With ~150 issued in 300ms - // and senders able to complete only ~a dozen, the vast majority must drop. + // Overrun is dropped-and-counted, not blocked on. With ~150 scheduled arrivals + // in 300ms and senders able to complete only ~a dozen, the vast majority drop. require.Positive(t, sched.Dropped(), "overrun must be counted as dropped") - require.Greater(t, sched.Dropped(), uint64(len(issued)/2), + require.Greater(t, sched.Dropped(), sched.Admitted(), "a slow SUT must shed most of the load through the in-flight bound") } @@ -261,34 +318,174 @@ func TestOpenLoopSchedule_PermitHeldUntilCompletion(t *testing.T) { snd.completeAll() } -// TestOpenLoopSchedule_Conservation checks the accounting invariant: with a fast -// async sender that fully drains within the window, every issued tx is either -// completed (sent) or dropped exactly once — no permit leaks, no double-count. +// flakyAsyncSender drains like asyncFakeSender but completes every failEvery-th +// send (1-indexed) with an error, so a tx can be ADMITTED, enqueued, and then +// fail in the send — the path the failed-send accounting must capture. A failed +// send must be counted as failed (not lost and not dropped), so the conservation +// invariant becomes issued == dropped + succeeded + failed. +type flakyAsyncSender struct { + ch chan *types.LoadTx + failEvery uint64 + seen atomic.Uint64 +} + +func newFlakyAsyncSender(ctx context.Context, buffer, workers int, failEvery uint64) *flakyAsyncSender { + s := &flakyAsyncSender{ch: make(chan *types.LoadTx, buffer), failEvery: failEvery} + if workers < 1 { + workers = 1 + } + for range workers { + go s.drain(ctx) + } + return s +} + +func (s *flakyAsyncSender) drain(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case tx := <-s.ch: + n := s.seen.Add(1) + var err error + if s.failEvery > 0 && n%s.failEvery == 0 { + err = errors.New("injected send failure") + } + if tx.OnComplete != nil { + tx.OnComplete(err) + } + } + } +} + +func (s *flakyAsyncSender) Send(ctx context.Context, tx *types.LoadTx) error { + select { + case s.ch <- tx: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// TestOpenLoopSchedule_Conservation checks the accounting invariant under +// injected send failures: every issued tx reaches exactly one terminal state — +// dropped (not admitted), succeeded (admitted, nil err), or failed (admitted, +// non-nil err). A failed send must be counted as failed, never silently lost, +// so issued == dropped + succeeded + failed holds. No permit leaks, no double +// count. The dispatcher's onSent is the accountant under test (via a stand-in +// here that mirrors its succeeded/failed split). func TestOpenLoopSchedule_Conservation(t *testing.T) { gen := newFakeGenerator(300) // Generous capacity so most txs complete; a few may drop on brief bursts. limiter := rate.NewLimiter(rate.Limit(1000), 1) - var completed atomic.Uint64 + var succeeded, failed atomic.Uint64 onSent := func(_ *types.LoadTx, err error) { - require.NoError(t, err) - completed.Add(1) + if err != nil { + failed.Add(1) + return + } + succeeded.Add(1) } ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) defer cancel() - snd := newAsyncFakeSender(ctx, 256, 16, 0) + // Fail every 5th send so failed > 0 and the invariant must absorb it. + snd := newFlakyAsyncSender(ctx, 256, 16, 5) sched := newOpenLoopScheduler(gen, snd, limiter, 256, onSent) runScheduler(ctx, sched) issued := uint64(len(gen.issuedTxs())) require.Positive(t, issued) - // Allow the in-flight sends spawned just before deadline to settle. + // Allow the in-flight sends spawned just before deadline to settle, then + // assert the full conservation invariant including the failed bucket. require.Eventually(t, func() bool { - return completed.Load()+sched.Dropped() == issued + return succeeded.Load()+failed.Load()+sched.Dropped() == issued }, time.Second, 5*time.Millisecond, - "every issued tx must be completed or dropped exactly once (issued=%d sent=%d dropped=%d)", - issued, completed.Load(), sched.Dropped()) + "every issued tx must be succeeded, failed, or dropped exactly once "+ + "(issued=%d succeeded=%d failed=%d dropped=%d)", + issued, succeeded.Load(), failed.Load(), sched.Dropped()) + require.Positive(t, failed.Load(), + "injected failures must be counted as failed, not lost or dropped") +} + +// TestOpenLoopSchedule_DroppedSlotsConsumeNoDraws is the determinism guard for +// the PLT-456 × PLT-458 interaction: under saturation the scheduler must admit a +// deterministic PREFIX of the seeded stream — a dropped tick consumes zero +// generator draws (and zero signing CPU), so the same seed yields the same +// admitted multiset no matter how many ticks the SUT speed forced to drop. +// +// The falsification: if Generate() were called before TryAcquire (the original +// bug), every dropped tick would still advance the seeded stream, so the admitted +// txs' draw indices would have gaps — admitted draw k+1 would jump past the draws +// the dropped ticks consumed. Here we drive a saturated sender (maxInFlight=1, +// completion gated) so the great majority of ticks drop, then assert the admitted +// txs' draw indices are exactly 0,1,2,… contiguous with no gaps. +func TestOpenLoopSchedule_DroppedSlotsConsumeNoDraws(t *testing.T) { + gen := newSeededGenerator(2000) + snd := &gatedSender{} + limiter := rate.NewLimiter(rate.Limit(5000), 1) // 0.2ms gap → many ticks + // maxInFlight=1 with a sender that never auto-completes: the first admitted + // tx holds the only permit, so every later tick drops until we release. + var admitted []uint64 + var mu sync.Mutex + onSent := func(tx *types.LoadTx, err error) { + require.NoError(t, err) + mu.Lock() + admitted = append(admitted, gen.draw(tx)) + mu.Unlock() + } + sched := newOpenLoopScheduler(gen, snd, limiter, 1, onSent) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runScheduler(ctx, sched) + }() + + // Periodically release the single held permit so flow inches forward one + // admitted tx at a time while the fast clock drops the rest. This guarantees + // many drops interleaved with a handful of admits — the regime that would + // expose any draw consumed by a dropped tick. + relCtx, relCancel := context.WithCancel(ctx) + defer relCancel() + go func() { + for relCtx.Err() == nil { + snd.completeAll() + select { + case <-relCtx.Done(): + case <-time.After(2 * time.Millisecond): + } + } + }() + + wg.Wait() + relCancel() + snd.completeAll() // drain any final held permit's OnComplete + + mu.Lock() + got := append([]uint64(nil), admitted...) + mu.Unlock() + + require.Positive(t, sched.Dropped(), "the saturated sender must force drops") + require.Positive(t, len(got), "some txs must have been admitted") + require.Greater(t, sched.Dropped(), uint64(len(got)), + "saturation must drop far more ticks than it admits") + + // The admitted draws must be the contiguous prefix 0,1,2,…,N-1: each admitted + // tx consumed exactly the next stream draw, and no dropped tick consumed any. + // Sort first: with the permit released just before onSent fires, two adjacent + // completions can be recorded out of order, but the SET must still be gapless. + slices.Sort(got) + for k, draw := range got { + require.Equal(t, uint64(k), draw, + "admitted draw set must be the gapless prefix; slot %d is draw %d (a dropped tick must consume no draw)", + k, draw) + } } // TestOpenLoopSchedule_HonorsRampedLambda verifies the schedule responds to a diff --git a/stats/metrics.go b/stats/metrics.go index b7f3460..c66b879 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -46,6 +46,11 @@ var ( "run_txs_dropped_total", metric.WithDescription("Total open-loop transactions dropped because in-flight was saturated at their scheduled instant (emitted once at run end)"), metric.WithUnit("{transactions}"))) + + runTxsFailedTotal = must(meter.Int64Gauge( + "run_txs_failed_total", + metric.WithDescription("Total open-loop transactions admitted and enqueued but whose send completed with an error (emitted once at run end)"), + metric.WithUnit("{transactions}"))) ) func must[V any](v V, err error) V { diff --git a/stats/run_summary.go b/stats/run_summary.go index 5572628..8f45164 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -18,8 +18,13 @@ type RunSummary struct { // dropped tx never reaches the inclusion tracker and carries no // InclusionTime, so it must stay out of inclusion-rate denominators: the // denominator is sent (txs that reached a sender), never issued (sent + - // dropped). + // failed + dropped). Dropped uint64 + // Failed is the count of open-loop txs that were admitted and enqueued but + // whose send completed with an error. Like Dropped, a failed tx reached no + // inclusion tracker; it is reported so the conservation invariant + // issued == Dropped + Failed + sent is auditable from the run summary. + Failed uint64 } // EmitRunSummary records the run-summary gauges. Call once at shutdown. @@ -35,4 +40,6 @@ func (c *Collector) EmitRunSummary(ctx context.Context, summary RunSummary) { runTxsAcceptedTotal.Record(ctx, int64(totalTxs)) runTxsDroppedTotal.Record(ctx, int64(summary.Dropped), metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) + runTxsFailedTotal.Record(ctx, int64(summary.Failed), + metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) } From 30c25780a2c6b41c81677b558d9d4124931cc530 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 14:26:16 -0700 Subject: [PATCH 8/9] docs(sender): trim duplicated invariant narrative; unify conservation vocabulary Trim the dropped/admitted field-comment block and DispatcherStats/RunSummary comments to at-site facts plus a pointer to doc.go's Conservation section, which owns the invariant. Unify prose to scheduled = dropped + admitted; admitted = succeeded + failed. Make Admitted()'s doc honest: test/audit only, no production consumer. Comment-only; no identifier or metric renames. Co-Authored-By: Claude Opus 4.8 (1M context) --- sender/dispatcher.go | 16 ++++++---------- sender/scheduler.go | 22 +++++++++------------- stats/run_summary.go | 12 ++++++------ 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 239d702..7b95ff3 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -41,13 +41,10 @@ type Dispatcher struct { limiter *rate.Limiter maxInFlight int - // Statistics. The open-loop conservation invariant is - // issued (scheduled ticks) == dropped (not admitted) + admitted, and - // admitted == totalSent (succeeded) + failed. - // A tx that is admitted and enqueues but errors in the worker's send is - // counted as failed, not lost: it is neither sent nor dropped. + // Statistics for the open-loop conservation invariant; see the package doc + // (Conservation): scheduled = dropped + admitted, admitted = succeeded + failed. totalSent uint64 // admitted sends that completed with a nil error (succeeded) - failed uint64 // admitted sends that completed with a non-nil error + failed uint64 // admitted sends that completed with a non-nil error (failed) dropped uint64 mu sync.RWMutex collector *stats.Collector @@ -255,10 +252,9 @@ type DispatcherStats struct { // TotalSent is the number of admitted sends that completed with a nil error // (succeeded). TotalSent uint64 - // Failed is the number of open-loop sends that were admitted (took a permit) - // and enqueued but completed with a non-nil error. Counted, not lost: with - // Dropped and TotalSent it closes the conservation invariant - // issued == Dropped + TotalSent + Failed. Always 0 in closed-loop mode. + // Failed is the number of admitted open-loop sends that completed with a + // non-nil error: counted, not lost (see package doc, Conservation: + // admitted = succeeded + failed). Always 0 in closed-loop mode. Failed uint64 // Dropped is the number of open-loop txs shed because in-flight was // saturated at their scheduled instant. Always 0 in closed-loop mode. diff --git a/sender/scheduler.go b/sender/scheduler.go index 0ee73ad..79ea751 100644 --- a/sender/scheduler.go +++ b/sender/scheduler.go @@ -28,16 +28,11 @@ type openLoopScheduler struct { onSent func(tx *types.LoadTx, err error) maxInFlight int - // dropped counts ticks shed because in-flight was saturated at their - // scheduled instant (no permit, no Generate, no draw). admitted counts ticks - // that took a permit AND drew a real tx from the generator (the work actually - // handed to a sender). Their sum is the number of scheduled arrivals that - // represented work — the conservation anchor: - // admitted + dropped == scheduled arrivals, and - // admitted == succeeded + failed (the send outcome, accounted by onSent). - // A tick whose Generate returns no work (generator exhausted) is neither: - // the permit it briefly held is released and no arrival is counted. Read - // after Run returns, or concurrently via Dropped/Admitted. + // dropped and admitted partition scheduled ticks; see the package doc + // (Conservation) for the invariant scheduled = dropped + admitted. A tick + // whose Generate returns no work (generator exhausted under a held permit) is + // neither: the permit is released and no arrival is counted. Written by the + // Run goroutine; read after Run returns or concurrently via Dropped/Admitted. dropped atomic.Uint64 admitted atomic.Uint64 } @@ -68,9 +63,10 @@ func newOpenLoopScheduler( // Dropped returns the number of ticks shed so far because in-flight was saturated. func (s *openLoopScheduler) Dropped() uint64 { return s.dropped.Load() } -// Admitted returns the number of ticks that took a permit and drew a real tx -// (the work handed to a sender). Admitted + Dropped is the scheduled-arrival -// count; Admitted equals succeeded + failed once all sends complete. +// Admitted returns the admitted-tick count (ticks that took a permit and drew a +// real tx). It exposes the conservation invariant for tests/audit, mirroring +// Dropped; no production path consumes it (only Dropped folds into the run +// summary). func (s *openLoopScheduler) Admitted() uint64 { return s.admitted.Load() } // Run drives the arrival clock until ctx is canceled or the generator is diff --git a/stats/run_summary.go b/stats/run_summary.go index 8f45164..f3401d8 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -17,13 +17,13 @@ type RunSummary struct { // Dropped is the count of open-loop txs shed on in-flight saturation. A // dropped tx never reaches the inclusion tracker and carries no // InclusionTime, so it must stay out of inclusion-rate denominators: the - // denominator is sent (txs that reached a sender), never issued (sent + - // failed + dropped). + // denominator is succeeded txs (those that reached a sender), never the full + // scheduled count. Dropped uint64 - // Failed is the count of open-loop txs that were admitted and enqueued but - // whose send completed with an error. Like Dropped, a failed tx reached no - // inclusion tracker; it is reported so the conservation invariant - // issued == Dropped + Failed + sent is auditable from the run summary. + // Failed is the count of admitted open-loop txs whose send completed with an + // error. Like Dropped, a failed tx reached no inclusion tracker; it is + // reported so the conservation invariant (see sender package doc: + // scheduled = dropped + succeeded + failed) is auditable from the run summary. Failed uint64 } From 5399f90b19480d077275da4a80b62b2766c917a5 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Fri, 12 Jun 2026 14:45:01 -0700 Subject: [PATCH 9/9] PLT-458: rate-limit prewarm in open-loop; fix conservation test In open-loop the workers are constructed RateLimited=false because the scheduler owns the arrival clock, but the scheduler paces only the main load. Prewarm runs first over those same ungated workers, so it flooded the SUT unthrottled. Pace prewarm off the shared limiter in the dispatcher: closed-loop leaves d.limiter nil and the worker still gates, so neither phase is double-throttled and the main loop stays scheduler-paced. Correct the conservation test to the two-level invariant from doc.go: scheduled == dropped + admitted (using the scheduler counters, since a dropped tick no longer draws Generate) and admitted == succeeded + failed. Add a real-worker prewarm-rate-limit guard. Trim worker.go/dispatcher.go inline narration that doc.go owns, keeping the load-bearing notes (OnComplete CRITICAL, single-writer stamp, per-iteration context cancel, conservation counters). Co-Authored-By: Claude Opus 4.8 (1M context) --- sender/dispatcher.go | 31 ++++++++++------ sender/scheduler.go | 52 ++++++++++----------------- sender/scheduler_realworker_test.go | 56 +++++++++++++++++++++++++++++ sender/scheduler_test.go | 37 +++++++++++-------- sender/worker.go | 30 ++++++---------- 5 files changed, 128 insertions(+), 78 deletions(-) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 7b95ff3..1efde90 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -41,10 +41,10 @@ type Dispatcher struct { limiter *rate.Limiter maxInFlight int - // Statistics for the open-loop conservation invariant; see the package doc - // (Conservation): scheduled = dropped + admitted, admitted = succeeded + failed. - totalSent uint64 // admitted sends that completed with a nil error (succeeded) - failed uint64 // admitted sends that completed with a non-nil error (failed) + // Conservation counters (doc.go): scheduled = dropped + admitted, + // admitted = succeeded + failed. + totalSent uint64 // admitted, nil send error (succeeded) + failed uint64 // admitted, non-nil send error dropped uint64 mu sync.RWMutex collector *stats.Collector @@ -99,6 +99,12 @@ func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator) { func (d *Dispatcher) Prewarm(ctx context.Context) error { d.mu.RLock() prewarmGen := d.prewarmGen + // In open-loop the workers are not rate-limited (the scheduler is the rate + // authority), but the scheduler only paces the MAIN load. Prewarm runs first, + // over those same ungated workers, so it must pace itself off the shared + // limiter or it floods the SUT. In closed-loop limiter is nil here and the + // worker gates the send instead, so there is never a double-throttle. + limiter := d.limiter d.mu.RUnlock() gen, ok := prewarmGen.Get() @@ -112,6 +118,12 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { // Run prewarm generator until completion for ctx.Err() == nil { + if limiter != nil { + if err := limiter.Wait(ctx); err != nil { + return err + } + } + tx, ok := gen.Generate() if !ok { break // Prewarming is complete @@ -182,19 +194,16 @@ func (d *Dispatcher) runOpenLoop(ctx context.Context) error { err := service.Run(ctx, func(ctx context.Context, s service.Scope) error { return sched.Run(ctx, s) }) - // Fold the scheduler's drop count into the dispatcher's accounting so the - // final summary can report it. + // Fold the scheduler's drop count into the summary accounting. d.mu.Lock() d.dropped = sched.Dropped() d.mu.Unlock() return err } -// onSent records a completed open-loop send (an admitted tx whose send attempt -// finished). A nil error advances totalSent (succeeded); a non-nil error -// advances failed. An admitted tx is therefore always counted exactly once as -// succeeded or failed — never silently lost — preserving -// admitted == succeeded + failed under RPC failures. +// onSent records a completed open-loop send: nil err advances totalSent +// (succeeded), non-nil advances failed. Each admitted tx is counted exactly +// once, never lost (doc.go: admitted == succeeded + failed). func (d *Dispatcher) onSent(tx *types.LoadTx, err error) { if err != nil { log.Printf("Scheduler: send failed (seq %d): %v", tx.SequenceIndex, err) diff --git a/sender/scheduler.go b/sender/scheduler.go index 79ea751..8f2f761 100644 --- a/sender/scheduler.go +++ b/sender/scheduler.go @@ -28,19 +28,15 @@ type openLoopScheduler struct { onSent func(tx *types.LoadTx, err error) maxInFlight int - // dropped and admitted partition scheduled ticks; see the package doc - // (Conservation) for the invariant scheduled = dropped + admitted. A tick - // whose Generate returns no work (generator exhausted under a held permit) is - // neither: the permit is released and no arrival is counted. Written by the - // Run goroutine; read after Run returns or concurrently via Dropped/Admitted. + // Written by Run; read after Run returns or concurrently via Dropped/Admitted. + // See package doc (Conservation) for scheduled = dropped + admitted. dropped atomic.Uint64 admitted atomic.Uint64 } -// minScheduleRate floors λ in the inter-arrival gap so a zero/negative limit -// cannot divide-by-zero or yield a +Inf gap. It does not cap the sleep: a small -// finite λ still yields a long gap. The degenerate λ=Inf / TPS=0 case is -// rejected up front (see config.Settings.Validate). +// minScheduleRate floors λ so a zero/negative limit can't divide-by-zero or +// yield a +Inf gap; the degenerate λ=Inf/TPS=0 case is rejected up front +// (config.Settings.Validate). const minScheduleRate = 1e-9 func newOpenLoopScheduler( @@ -63,10 +59,8 @@ func newOpenLoopScheduler( // Dropped returns the number of ticks shed so far because in-flight was saturated. func (s *openLoopScheduler) Dropped() uint64 { return s.dropped.Load() } -// Admitted returns the admitted-tick count (ticks that took a permit and drew a -// real tx). It exposes the conservation invariant for tests/audit, mirroring -// Dropped; no production path consumes it (only Dropped folds into the run -// summary). +// Admitted returns the admitted-tick count (took a permit and drew a tx), for +// the conservation invariant in tests/audit; mirrors Dropped. func (s *openLoopScheduler) Admitted() uint64 { return s.admitted.Load() } // Run drives the arrival clock until ctx is canceled or the generator is @@ -90,44 +84,36 @@ func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error return err } - // Advance the arrival clock for this scheduled tick before admission, so - // the schedule walks at λ whether or not the tx is admitted. + // Advance the clock per scheduled tick before admission (walks at λ + // regardless of drops). intendedSendTime := nextSend seqIndex := i nextSend = nextSend.Add(gap) i++ - // Admit BEFORE generating: a non-blocking TryAcquire that never throttles - // the arrival clock (see package doc: coordinated omission). On a drop the - // tick is counted but the generator is NOT advanced — a dropped slot - // consumes zero seeded-stream draws and no signing CPU, so admitted txs are - // a deterministic prefix of the seeded sequence regardless of how many - // ticks were shed (the per-stream reproducibility contract, PLT-456). + // Admit before generating: a dropped tick must not advance the seeded + // generator (determinism). TryAcquire is non-blocking — see package doc. release, ok := s.inflight.TryAcquire() if !ok { s.dropped.Add(1) continue } - // Permit held: now draw the next tx from the (seeded) generator. tx, ok := s.generator.Generate() if !ok { - // No work left: release the permit we will not use and stop. - release() + release() // generator exhausted: release the unused permit and stop. log.Print("Scheduler: generator returned no more transactions") return nil } - // Stamp the scheduled instant and arrival index while sole owner (see - // LoadTx concurrency contract). + // Stamp while sole owner (see LoadTx concurrency contract). tx.IntendedSendTime = intendedSendTime tx.SequenceIndex = seqIndex s.admitted.Add(1) - // complete fires once on send completion: releases the permit and reports - // the result. The worker invokes it via tx.OnComplete after the real send - // (see package doc: permit lifecycle). The Once guards against the - // enqueue-failure fallback below racing the worker. + // complete releases the permit and reports the result, exactly once: the + // worker invokes it via tx.OnComplete after the real send; the Once guards + // the enqueue-failure fallback below from racing the worker. var once sync.Once complete := func(err error) { once.Do(func() { @@ -140,13 +126,11 @@ func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error tx.OnComplete = complete scope.Spawn(func() error { - // Send returns at enqueue; the worker releases the permit on real - // completion. On enqueue failure the tx never reaches a worker, so - // complete here to avoid leaking the permit. + // On enqueue failure the tx never reaches a worker; complete here so the + // permit isn't leaked. A send error must not tear down the campaign. if err := s.sender.Send(ctx, tx); err != nil { complete(err) } - // A send error must not tear down the campaign; surfaced via counters. return nil }) } diff --git a/sender/scheduler_realworker_test.go b/sender/scheduler_realworker_test.go index 32ec741..364f07f 100644 --- a/sender/scheduler_realworker_test.go +++ b/sender/scheduler_realworker_test.go @@ -455,3 +455,59 @@ func TestRealWorker_PermitReleasedByWorker(t *testing.T) { "at most one admitted tx may be unaccounted at cancel (admitted=%d completed=%d dropped=%d)", admitted, completed.Load(), sched.Dropped()) } + +// TestDispatcher_PrewarmRateLimitedInOpenLoop guards the prewarm-flood +// regression: in open-loop the workers are constructed RateLimited=false, but +// the scheduler paces only the MAIN load. Prewarm runs first over those same +// ungated workers, so it must pace itself off the shared limiter or it floods +// the SUT. With workers wired exactly as in open-loop, a low limit, and many +// more prewarm txs than the worker pool could absorb instantly, an unpaced +// prewarm would drain in well under the limiter's minimum span. We assert the +// run took at least the paced floor — i.e. the limiter actually gated prewarm — +// and that every prewarm tx still reached the RPC server (no drops on prewarm). +func TestDispatcher_PrewarmRateLimitedInOpenLoop(t *testing.T) { + srv := newRPCServer(t) + const prewarmTxs = 40 + const rps = 200.0 // limiter: 200 tx/s → unpaced 40 txs is near-instant + + worker := newRealWorker(srv.url(), 8, 256) // RateLimited=false, open-loop shape + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = service.Run(ctx, func(ctx context.Context, scope service.Scope) error { + scope.SpawnBg(func() error { return worker.Run(ctx) }) + <-ctx.Done() + return nil + }) + }() + + limiter := rate.NewLimiter(rate.Limit(rps), 1) + d := NewDispatcher(newSignedTxGenerator(t, 0), worker) + d.SetOpenLoop(limiter, 256) // sets d.limiter so Prewarm self-paces + d.SetPrewarmGenerator(newSignedTxGenerator(t, prewarmTxs)) + + start := time.Now() + require.NoError(t, d.Prewarm(ctx)) + elapsed := time.Since(start) + + // Paced floor: (N-1) gaps at the limiter rate (burst=1 lets the first through + // immediately). Use half as a generous lower bound to absorb scheduling slop + // while still excluding the unpaced (near-zero) case decisively. + pacedFloor := time.Duration(float64(prewarmTxs-1) / rps * float64(time.Second)) + require.Greater(t, elapsed, pacedFloor/2, + "prewarm must be limiter-paced in open-loop, not flooded (elapsed=%s floor=%s)", + elapsed, pacedFloor) + + require.Eventually(t, func() bool { + return srv.handled.Load() == uint64(prewarmTxs) + }, 2*time.Second, 2*time.Millisecond, + "every prewarm tx must reach the SUT (handled=%d want=%d)", + srv.handled.Load(), prewarmTxs) + + cancel() + wg.Wait() +} diff --git a/sender/scheduler_test.go b/sender/scheduler_test.go index 9dcd7bb..a40d910 100644 --- a/sender/scheduler_test.go +++ b/sender/scheduler_test.go @@ -367,13 +367,17 @@ func (s *flakyAsyncSender) Send(ctx context.Context, tx *types.LoadTx) error { } } -// TestOpenLoopSchedule_Conservation checks the accounting invariant under -// injected send failures: every issued tx reaches exactly one terminal state — -// dropped (not admitted), succeeded (admitted, nil err), or failed (admitted, -// non-nil err). A failed send must be counted as failed, never silently lost, -// so issued == dropped + succeeded + failed holds. No permit leaks, no double -// count. The dispatcher's onSent is the accountant under test (via a stand-in -// here that mirrors its succeeded/failed split). +// TestOpenLoopSchedule_Conservation checks the two-level accounting invariant +// (see package doc, Conservation) under injected send failures: +// +// scheduled == dropped + admitted (every tick has one arrival outcome) +// admitted == succeeded + failed (every admitted tx has one send outcome) +// +// scheduled is the scheduler's own arrival count (Admitted + Dropped), NOT the +// generator's issued count: after admit-before-generate a dropped tick never +// calls Generate, so len(issuedTxs) tracks admitted draws, not scheduled ticks. +// A failed send must land in failed, never be silently lost. The dispatcher's +// onSent is the accountant under test (via a stand-in mirroring its split). func TestOpenLoopSchedule_Conservation(t *testing.T) { gen := newFakeGenerator(300) // Generous capacity so most txs complete; a few may drop on brief bursts. @@ -395,16 +399,21 @@ func TestOpenLoopSchedule_Conservation(t *testing.T) { sched := newOpenLoopScheduler(gen, snd, limiter, 256, onSent) runScheduler(ctx, sched) - issued := uint64(len(gen.issuedTxs())) - require.Positive(t, issued) + admitted := sched.Admitted() + require.Positive(t, admitted) + // scheduled == dropped + admitted: a dropped tick consumes no Generate draw, + // so the generator's issued count tracks admitted ticks, not scheduled ticks. + require.Equal(t, admitted, uint64(len(gen.issuedTxs())), + "each admitted tick must draw exactly one tx (admit-before-generate)") + // Allow the in-flight sends spawned just before deadline to settle, then - // assert the full conservation invariant including the failed bucket. + // assert admitted == succeeded + failed (the send-outcome partition). require.Eventually(t, func() bool { - return succeeded.Load()+failed.Load()+sched.Dropped() == issued + return succeeded.Load()+failed.Load() == sched.Admitted() }, time.Second, 5*time.Millisecond, - "every issued tx must be succeeded, failed, or dropped exactly once "+ - "(issued=%d succeeded=%d failed=%d dropped=%d)", - issued, succeeded.Load(), failed.Load(), sched.Dropped()) + "every admitted tx must be succeeded or failed exactly once "+ + "(admitted=%d succeeded=%d failed=%d)", + sched.Admitted(), succeeded.Load(), failed.Load()) require.Positive(t, failed.Load(), "injected failures must be counted as failed, not lost or dropped") } diff --git a/sender/worker.go b/sender/worker.go index 293c754..7e7b60c 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -38,11 +38,9 @@ type WorkerConfig struct { Debug bool TrackReceipts bool Collector *stats.Collector - Limiter *rate.Limiter // Shared rate limiter for transaction sending - // RateLimited gates worker-side rate limiting. True for the legacy - // closed-loop model, where the worker is the rate authority. False in the - // open-loop model, where the scheduler owns the arrival clock and gating - // here too would double-throttle the rate. + Limiter *rate.Limiter // Shared rate authority; nil disables gating. + // RateLimited gates worker-side gating: true in closed-loop (worker is the + // rate authority), false in open-loop (scheduler owns the clock; see doc.go). RateLimited bool } @@ -224,9 +222,8 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * // runTxSender is the main worker loop that processes transactions func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) error { for ctx.Err() == nil { - // Closed-loop rate limiting: block until the limiter releases a permit - // before getting the next transaction. Skipped when an open-loop - // scheduler is the rate authority (it would otherwise double-throttle). + // Closed-loop gating: block on the limiter before dequeuing. Skipped in + // open-loop where the scheduler is the rate authority (see doc.go). if w.cfg.RateLimited && w.cfg.Limiter != nil { if err := w.cfg.Limiter.Wait(ctx); err != nil { return err @@ -239,16 +236,15 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro } startTime := time.Now() - // Sole owner between dequeue and hand-off: stamping here is race-free (see LoadTx). + // Sole owner between dequeue and hand-off: stamp is race-free (see LoadTx). tx.AttemptedSendTime = startTime err = w.sendTransaction(ctx, client, tx) - // CRITICAL: invoke OnComplete only after the real send returns — this is - // what makes the open-loop semaphore bound true unacked sends, not enqueue - // backlog (see package doc: permit lifecycle). Nil on closed-loop/batch. + // CRITICAL: OnComplete must fire only after the real send returns — that is + // what makes the open-loop semaphore bound true unacked sends (doc.go: + // permit lifecycle). Nil on closed-loop/batch. if tx.OnComplete != nil { tx.OnComplete(err) } - // Record statistics if collector is available w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) if err != nil { log.Printf("%v", err) @@ -280,12 +276,9 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, ) }(time.Now()) if w.cfg.DryRun { - // In dry-run mode, simulate processing time and mark as successful - // Use very minimal delay to avoid channel overflow - return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation + return utils.Sleep(ctx, 10*time.Microsecond) // minimal delay, no RPC } - // Send through go-ethereum so the same code path supports both HTTP(S) and WS(S) RPC. if err := client.SendTransaction(ctx, tx.EthTx); err != nil { txsRejected.Add(ctx, 1, metric.WithAttributes( attribute.String("endpoint", w.cfg.Endpoint), @@ -300,8 +293,7 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, attribute.String("scenario", tx.Scenario.Name), )) - // Write to sentTxs channel without blocking - utils.SendOrDrop(w.sentTxs, tx) + utils.SendOrDrop(w.sentTxs, tx) // non-blocking handoff to receipt poller return nil }