diff --git a/config/settings.go b/config/settings.go index 2c4e2bb..c679cf0 100644 --- a/config/settings.go +++ b/config/settings.go @@ -28,6 +28,44 @@ type Settings struct { TargetGas uint64 `json:"targetGas,omitempty"` NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"` PostSummaryFlushDelay Duration `json:"postSummaryFlushDelay,omitempty"` + // ArrivalModel selects the transaction arrival model: "open_loop" schedules + // tx i at t₀ + i/λ independent of sender availability (the + // coordinated-omission fix), "closed_loop" (default) keeps the legacy + // generate-then-send lockstep as the regression baseline. + ArrivalModel string `json:"arrivalModel,omitempty"` + // MaxInFlight bounds concurrent in-flight sends in the open-loop model; + // txs that would exceed it at their scheduled instant are dropped and + // counted rather than throttling the arrival clock. Ignored in closed-loop. + MaxInFlight int `json:"maxInFlight,omitempty"` +} + +// Arrival model identifiers for the ArrivalModel setting. +const ( + ArrivalModelClosedLoop = "closed_loop" + ArrivalModelOpenLoop = "open_loop" +) + +// Validate checks resolved settings for self-consistent run configuration, +// failing fast on combinations that would otherwise produce a silently +// degenerate run. Call once after ResolveSettings. +func (s Settings) Validate() error { + switch s.ArrivalModel { + case ArrivalModelClosedLoop, ArrivalModelOpenLoop: + default: + return fmt.Errorf("invalid arrival-model %q: must be %q or %q", + s.ArrivalModel, ArrivalModelOpenLoop, ArrivalModelClosedLoop) + } + + // Open-loop derives the inter-arrival gap as 1/λ. With no finite positive + // arrival rate, λ is rate.Inf, the gap collapses to 0, IntendedSendTime + // never advances past t₀, and the scheduler spins and drops everything — + // the latency anchor degenerates to "time since campaign start". A finite λ + // comes from either a configured TPS>0 or a ramp curve (RampUp), which the + // ramper drives to finite limits. Reject the degenerate case up front. + if s.ArrivalModel == ArrivalModelOpenLoop && s.TPS <= 0 && !s.RampUp { + return fmt.Errorf("arrival-model %q requires a finite positive arrival rate: set --tps>0 or --ramp-up", ArrivalModelOpenLoop) + } + return nil } // DefaultSettings returns the default configuration values @@ -49,6 +87,8 @@ func DefaultSettings() Settings { TargetGas: 10_000_000, NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), + ArrivalModel: ArrivalModelClosedLoop, + MaxInFlight: 10_000, } } @@ -72,6 +112,8 @@ func InitializeViper(cmd *cobra.Command) error { "targetGas": "target-gas", "numBlocksToWrite": "num-blocks-to-write", "postSummaryFlushDelay": "post-summary-flush-delay", + "arrivalModel": "arrival-model", + "maxInFlight": "max-in-flight", } for viperKey, flagName := range flagBindings { @@ -98,6 +140,8 @@ func InitializeViper(cmd *cobra.Command) error { viper.SetDefault("targetGas", defaults.TargetGas) viper.SetDefault("numBlocksToWrite", defaults.NumBlocksToWrite) viper.SetDefault("postSummaryFlushDelay", defaults.PostSummaryFlushDelay.ToDuration()) + viper.SetDefault("arrivalModel", defaults.ArrivalModel) + viper.SetDefault("maxInFlight", defaults.MaxInFlight) return nil } @@ -140,5 +184,7 @@ func ResolveSettings() *Settings { TargetGas: viper.GetUint64("targetGas"), NumBlocksToWrite: viper.GetInt("numBlocksToWrite"), PostSummaryFlushDelay: Duration(viper.GetDuration("postSummaryFlushDelay")), + ArrivalModel: viper.GetString("arrivalModel"), + MaxInFlight: viper.GetInt("maxInFlight"), } } diff --git a/config/settings_test.go b/config/settings_test.go index ae4d797..09762fa 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -97,6 +97,8 @@ func TestArgumentPrecedence(t *testing.T) { cmd.Flags().Uint64("target-gas", 0, "Target gas") cmd.Flags().Int("num-blocks-to-write", 0, "Number of blocks to write") cmd.Flags().Duration("post-summary-flush-delay", 0, "Post-summary flush delay") + cmd.Flags().String("arrival-model", "", "Arrival model") + cmd.Flags().Int("max-in-flight", 0, "Max in-flight") // Parse CLI args if len(tt.cliArgs) > 0 { @@ -141,9 +143,65 @@ func TestDefaultSettings(t *testing.T) { TargetGas: 10_000_000, NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), + ArrivalModel: ArrivalModelClosedLoop, + MaxInFlight: 10_000, } if defaults != expected { t.Errorf("DefaultSettings mismatch.\nExpected: %+v\nGot: %+v", expected, defaults) } } + +func TestSettingsValidate(t *testing.T) { + tests := []struct { + name string + settings Settings + wantErr string + }{ + { + name: "closed-loop with no rate is fine", + settings: Settings{ArrivalModel: ArrivalModelClosedLoop, TPS: 0}, + }, + { + name: "open-loop with finite TPS is fine", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 100}, + }, + { + name: "open-loop with ramp-up is fine (finite ramp curve λ)", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 0, RampUp: true}, + }, + { + // B1: open-loop with TPS=0 and no ramp ⇒ λ=Inf ⇒ degenerate anchor. + name: "open-loop with zero TPS and no ramp is rejected", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 0}, + wantErr: "finite positive arrival rate", + }, + { + name: "open-loop with negative TPS is rejected", + settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: -1}, + wantErr: "finite positive arrival rate", + }, + { + name: "unrecognized arrival-model is rejected", + settings: Settings{ArrivalModel: "burst", TPS: 100}, + wantErr: "invalid arrival-model", + }, + { + name: "empty arrival-model is rejected", + settings: Settings{ArrivalModel: "", TPS: 100}, + wantErr: "invalid arrival-model", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.settings.Validate() + if tt.wantErr == "" { + require.NoError(t, err) + return + } + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + }) + } +} diff --git a/main.go b/main.go index 8502aea..9e9b18f 100644 --- a/main.go +++ b/main.go @@ -71,6 +71,8 @@ func init() { rootCmd.Flags().Int("num-blocks-to-write", 100, "Number of blocks to write") rootCmd.Flags().Duration("post-summary-flush-delay", 25*time.Second, "In-process delay after run-summary metrics are recorded, allowing Prometheus to scrape them before exit") rootCmd.Flags().Duration("duration", 0, "Run duration (0 = until SIGTERM/SIGINT)") + rootCmd.Flags().String("arrival-model", config.ArrivalModelClosedLoop, "Transaction arrival model: open_loop (schedule t0+i/lambda, drop on overrun) or closed_loop (legacy generate-then-send)") + rootCmd.Flags().Int("max-in-flight", 10_000, "Open-loop only: max concurrent in-flight sends before overdue txs are dropped") // Initialize Viper with proper error handling if err := config.InitializeViper(rootCmd); err != nil { @@ -106,6 +108,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { // Get resolved settings from the config package cfg.Settings = config.ResolveSettings() + if err := cfg.Settings.Validate(); err != nil { + return fmt.Errorf("invalid settings: %w", err) + } // Handle --nodes flag to limit number of endpoints nodes, _ := cmd.Flags().GetInt("nodes") @@ -202,6 +207,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { collector := stats.NewCollector() logger := stats.NewLogger(collector, cfg.Settings.StatsInterval.ToDuration(), cfg.Settings.ReportPath, cfg.Settings.Debug) var ramper *sender.Ramper + var dispatcher *sender.Dispatcher err = service.Run(ctx, func(ctx context.Context, s service.Scope) error { // Create the generator from the config struct @@ -267,7 +273,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { } // Create dispatcher - var dispatcher *sender.Dispatcher if cfg.Settings.TxsDir != "" { // get latest height ethclient, err := ethclient.Dial(cfg.Endpoints[0]) @@ -290,6 +295,19 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { // Set statistics collector for dispatcher dispatcher.SetStatsCollector(collector) + // Open-loop arrival: the scheduler owns the rate (via the shared + // limiter, which the ramper still drives) and drops on overrun. The + // workers' own rate gating is disabled at construction for this model + // (see NewShardedSender) so they don't double-throttle. Only applies to + // the live-send path; the txs writer path has no arrival clock. + openLoop := cfg.Settings.ArrivalModel == config.ArrivalModelOpenLoop + if openLoop && cfg.Settings.TxsDir == "" { + dispatcher.SetOpenLoop(sharedLimiter, cfg.Settings.MaxInFlight) + log.Printf("📤 Arrival model: open_loop (max in-flight: %d)", cfg.Settings.MaxInFlight) + } else { + log.Printf("📤 Arrival model: closed_loop") + } + // Set up prewarming if enabled if cfg.Settings.Prewarm { log.Printf("🔥 Creating prewarm generator...") @@ -353,7 +371,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { if cfg.Settings.RampUp && ramper != nil { ramper.LogFinalStats() } - collector.EmitRunSummary(ctx) + summary := stats.RunSummary{ArrivalModel: config.ArrivalModelClosedLoop} + if dispatcher != nil { + summary.ArrivalModel = string(dispatcher.ArrivalModel()) + dstats := dispatcher.GetStats() + summary.Dropped = dstats.Dropped + summary.Failed = dstats.Failed + if summary.Dropped > 0 { + log.Printf("⚠️ Open-loop dropped %d txs (in-flight saturated; not throttled)", summary.Dropped) + } + if summary.Failed > 0 { + log.Printf("⚠️ Open-loop %d txs failed to send (admitted but errored; not lost)", summary.Failed) + } + } + collector.EmitRunSummary(ctx, summary) if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 { log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d) time.Sleep(d) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index b3614c9..1efde90 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -7,9 +7,26 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" +) + +// ArrivalModel selects how the dispatcher times transaction arrival. +type ArrivalModel string + +const ( + // ArrivalClosedLoop is the legacy model: a tx is generated and sent only + // when a sender is free, so a slow SUT slows the generator (coordinated + // omission). Kept reachable as the regression baseline. + ArrivalClosedLoop ArrivalModel = "closed_loop" + // ArrivalOpenLoop schedules tx i at t₀ + i/λ independent of sender + // availability; overdue txs are dropped and counted. See scheduler.go. + ArrivalOpenLoop ArrivalModel = "open_loop" ) // Dispatcher continuously generates transactions and dispatches them to the sender @@ -18,18 +35,50 @@ type Dispatcher struct { prewarmGen utils.Option[generator.Generator] // Optional prewarm generator sender TxSender - // Statistics - totalSent uint64 + // Open-loop arrival configuration. arrivalModel defaults to closed-loop; + // limiter and maxInFlight are only consulted in open-loop mode. + arrivalModel ArrivalModel + limiter *rate.Limiter + maxInFlight int + + // Conservation counters (doc.go): scheduled = dropped + admitted, + // admitted = succeeded + failed. + totalSent uint64 // admitted, nil send error (succeeded) + failed uint64 // admitted, non-nil send error + dropped uint64 mu sync.RWMutex collector *stats.Collector } -// NewDispatcher creates a new dispatcher +// NewDispatcher creates a new dispatcher in the legacy closed-loop arrival model. func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { return &Dispatcher{ - generator: gen, - sender: sender, + generator: gen, + sender: sender, + arrivalModel: ArrivalClosedLoop, + } +} + +// SetOpenLoop switches the dispatcher to the open-loop arrival model, driven by +// the shared limiter (the one rate authority, also driven by the ramper) and +// bounded by maxInFlight concurrent sends. A non-positive maxInFlight is treated +// as 1 so admission control is always live. +func (d *Dispatcher) SetOpenLoop(limiter *rate.Limiter, maxInFlight int) { + if maxInFlight < 1 { + maxInFlight = 1 } + d.mu.Lock() + defer d.mu.Unlock() + d.arrivalModel = ArrivalOpenLoop + d.limiter = limiter + d.maxInFlight = maxInFlight +} + +// ArrivalModel reports the configured arrival model (for recording/reporting). +func (d *Dispatcher) ArrivalModel() ArrivalModel { + d.mu.RLock() + defer d.mu.RUnlock() + return d.arrivalModel } // SetStatsCollector sets the statistics collector for this dispatcher @@ -50,6 +99,12 @@ func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator) { func (d *Dispatcher) Prewarm(ctx context.Context) error { d.mu.RLock() prewarmGen := d.prewarmGen + // In open-loop the workers are not rate-limited (the scheduler is the rate + // authority), but the scheduler only paces the MAIN load. Prewarm runs first, + // over those same ungated workers, so it must pace itself off the shared + // limiter or it floods the SUT. In closed-loop limiter is nil here and the + // worker gates the send instead, so there is never a double-throttle. + limiter := d.limiter d.mu.RUnlock() gen, ok := prewarmGen.Get() @@ -63,6 +118,12 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { // Run prewarm generator until completion for ctx.Err() == nil { + if limiter != nil { + if err := limiter.Wait(ctx); err != nil { + return err + } + } + tx, ok := gen.Generate() if !ok { break // Prewarming is complete @@ -86,8 +147,18 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { return nil } -// Start begins the dispatcher's transaction generation and sending loop +// Run begins the dispatcher's transaction generation and sending loop, using +// the configured arrival model. func (d *Dispatcher) Run(ctx context.Context) error { + if d.ArrivalModel() == ArrivalOpenLoop { + return d.runOpenLoop(ctx) + } + return d.runClosedLoop(ctx) +} + +// runClosedLoop is the legacy model: generate-then-send in lockstep, so a slow +// SUT back-pressures the generator. Kept as the regression baseline. +func (d *Dispatcher) runClosedLoop(ctx context.Context) error { for ctx.Err() == nil { // Generate a transaction from main generator tx, ok := d.generator.Generate() @@ -96,8 +167,8 @@ func (d *Dispatcher) Run(ctx context.Context) error { return nil } - // Stamp before hand-off: the dispatcher is sole owner here (tx just - // returned by the generator, not yet enqueued), so this write is race-free. + // Stamp before hand-off while sole owner: race-free (see LoadTx). This is + // the back-pressured enqueue time, not a true schedule instant. tx.IntendedSendTime = time.Now() // Send the transaction @@ -111,6 +182,41 @@ func (d *Dispatcher) Run(ctx context.Context) error { return ctx.Err() } +// runOpenLoop drives the open-loop scheduler (see scheduler.go), which owns the +// arrival clock (t₀, sequence index i) and the in-flight bound. Send tasks are +// spawned into a scope so they all complete on shutdown. +func (d *Dispatcher) runOpenLoop(ctx context.Context) error { + d.mu.RLock() + limiter, maxInFlight := d.limiter, d.maxInFlight + d.mu.RUnlock() + + sched := newOpenLoopScheduler(d.generator, d.sender, limiter, maxInFlight, d.onSent) + err := service.Run(ctx, func(ctx context.Context, s service.Scope) error { + return sched.Run(ctx, s) + }) + // Fold the scheduler's drop count into the summary accounting. + d.mu.Lock() + d.dropped = sched.Dropped() + d.mu.Unlock() + return err +} + +// onSent records a completed open-loop send: nil err advances totalSent +// (succeeded), non-nil advances failed. Each admitted tx is counted exactly +// once, never lost (doc.go: admitted == succeeded + failed). +func (d *Dispatcher) onSent(tx *types.LoadTx, err error) { + if err != nil { + log.Printf("Scheduler: send failed (seq %d): %v", tx.SequenceIndex, err) + d.mu.Lock() + d.failed++ + d.mu.Unlock() + return + } + d.mu.Lock() + d.totalSent++ + d.mu.Unlock() +} + // StartBatch generates and sends a specific number of transactions then stops func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { if count <= 0 { @@ -145,10 +251,21 @@ func (d *Dispatcher) GetStats() DispatcherStats { return DispatcherStats{ TotalSent: d.totalSent, + Failed: d.failed, + Dropped: d.dropped, } } // DispatcherStats contains statistics for the dispatcher type DispatcherStats struct { + // TotalSent is the number of admitted sends that completed with a nil error + // (succeeded). TotalSent uint64 + // Failed is the number of admitted open-loop sends that completed with a + // non-nil error: counted, not lost (see package doc, Conservation: + // admitted = succeeded + failed). Always 0 in closed-loop mode. + Failed uint64 + // Dropped is the number of open-loop txs shed because in-flight was + // saturated at their scheduled instant. Always 0 in closed-loop mode. + Dropped uint64 } diff --git a/sender/doc.go b/sender/doc.go new file mode 100644 index 0000000..ca080fb --- /dev/null +++ b/sender/doc.go @@ -0,0 +1,85 @@ +// Package sender drives transactions from generation to the chain. +// +// The send path is a pipeline: a [generator.Generator] produces transactions; +// the [Dispatcher] times their arrival and hands each off to a [TxSender]; the +// [ShardedSender] routes each tx to one of N per-endpoint [Worker]s by shard; +// the worker's send loop stamps the attempt and calls the go-ethereum client +// (eth_sendRawTransaction). Receipts, when tracked, are +// polled on a separate worker loop. A shared [golang.org/x/time/rate.Limiter] is +// the single rate authority for the whole pipeline; the [Ramper] drives its +// limit up or down via SetLimit. +// +// # Open-loop arrival model +// +// The dispatcher supports two arrival models (see [ArrivalModel]). The open-loop +// model exists to eliminate coordinated omission from the latency measurement. +// +// Coordinated omission. In the legacy closed-loop model the dispatcher generates +// the next tx only once a sender is free, so the dequeue clock is the SUT's +// clock: when the system under test slows, the generator slows with it and +// simply stops issuing the requests that would have observed the slowdown. The +// latency histogram then under-reports, because the worst-affected requests were +// never sent. This is coordinated omission. The closed-loop model lies about +// latency precisely when the answer matters most. +// +// Open-loop fixes this by decoupling the arrival clock from sender availability. +// Transaction i is scheduled at a fixed instant t₀ + i/λ, where t₀ is the run +// start and λ is the target rate, regardless of whether any sender is free. The +// scheduler sleeps until each absolute instant rather than for a relative gap +// ("sleep 1/λ from now"), so per-tx scheduling slop cannot accumulate into clock +// drift over a long run. λ is sampled from the shared limiter on each step, so a +// ramping rate is honored; at a fixed λ the running sum telescopes to exactly +// t₀ + i/λ. The limiter is read here as a clock source, not as a permit gate — +// the schedule advances whether or not the SUT keeps up. +// +// Bounded in-flight and drop-and-count. The arrival clock is never throttled by +// backpressure; throttling it would reintroduce coordinated omission. Instead a +// counting semaphore bounds true in-flight sends to maxInFlight. At each +// scheduled instant the scheduler tries to acquire a permit without blocking: if +// the senders are saturated the tick is dropped and counted, and the clock moves +// on. The permit is not released at enqueue. The scheduler installs a release +// callback on tx.OnComplete, and the worker invokes it only after the +// synchronous send returns — note the two phases of the worker path: the enqueue +// into the worker's channel ([TxSender.Send]) is asynchronous and returns at +// once, but the RPC send itself is synchronous. So the permit is held for the +// full unacked-in-flight window (enqueue plus RPC round-trip), and maxInFlight +// bounds real in-flight work while the drop count measures genuine load shed, +// not buffer geometry. +// +// Admit before generate (determinism). The permit is acquired BEFORE the +// generator is drawn, not after. A dropped tick therefore calls neither +// Generate (no seeded-stream draw, no per-tx state advance) nor the signer (no +// wasted CPU on shed load). This makes the admitted transactions a deterministic +// PREFIX of the seeded generator sequence: the same seed yields the same +// admitted multiset regardless of how many ticks the SUT speed forced to drop — +// the per-stream reproducibility contract holds under saturation, where it +// otherwise would not. SequenceIndex is the arrival-tick index i (so +// IntendedSendTime = t₀ + i/λ holds); under drops it is monotonic but +// non-contiguous across admitted txs, because dropped ticks still advance i and +// the clock while consuming no draw. +// +// Conservation. Every scheduled tick reaches exactly one terminal state: +// dropped (not admitted) or admitted. Every admitted tx in turn completes as +// succeeded (nil send error) or failed (non-nil send error) — a failed send is +// counted, never lost. So scheduled == dropped + admitted and +// admitted == succeeded + failed. The dispatcher folds these into the run +// summary (dropped and failed gauges) for goodput accounting. +// +// LoadTx lifecycle and scheduling. The scheduling-relevant fields of [types.LoadTx] +// follow its single-writer concurrency contract: each is written once by the +// goroutine that solely owns the tx at that stage, then is immutable as ownership +// transfers with the pointer across channels. The scheduler stamps IntendedSendTime +// (the true scheduled instant t₀ + i/λ) and SequenceIndex (the arrival index i) +// before hand-off; the worker stamps AttemptedSendTime at the real send. A tx +// cannot self-describe which model produced it — an open-loop and a closed-loop +// tx are byte-identical — so coordinated-omission safety is a property of the +// run's arrival model, not of any per-tx field. Latency and schedule-lag consumers +// must gate on the run-level arrival model. +// +// Detection and baseline. schedule_lag (AttemptedSendTime minus IntendedSendTime) +// is the primary coordinated-omission gate: it shows when sends fall behind the +// arrival schedule even before any tx is shed. The drop count measures only +// genuine shedding once in-flight saturates. A send error does not tear down the +// campaign — errors and drops are surfaced through counters, not by aborting the +// run. The closed-loop model is retained as the regression baseline. +package sender diff --git a/sender/scheduler.go b/sender/scheduler.go new file mode 100644 index 0000000..8f2f761 --- /dev/null +++ b/sender/scheduler.go @@ -0,0 +1,138 @@ +package sender + +import ( + "context" + "log" + "sync" + "sync/atomic" + "time" + + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" +) + +// openLoopScheduler issues tx i at t₀ + i/λ on an arrival clock decoupled from +// sender availability, bounding true in-flight with a semaphore and dropping +// (counting) overdue txs rather than throttling the clock. λ comes from the +// shared limiter (one rate authority). See the package doc for the open-loop +// arrival model: coordinated omission, drop-and-count, and the permit lifecycle. +type openLoopScheduler struct { + generator generator.Generator + sender TxSender + limiter *rate.Limiter + inflight *utils.Semaphore + onSent func(tx *types.LoadTx, err error) + maxInFlight int + + // Written by Run; read after Run returns or concurrently via Dropped/Admitted. + // See package doc (Conservation) for scheduled = dropped + admitted. + dropped atomic.Uint64 + admitted atomic.Uint64 +} + +// minScheduleRate floors λ so a zero/negative limit can't divide-by-zero or +// yield a +Inf gap; the degenerate λ=Inf/TPS=0 case is rejected up front +// (config.Settings.Validate). +const minScheduleRate = 1e-9 + +func newOpenLoopScheduler( + gen generator.Generator, + snd TxSender, + limiter *rate.Limiter, + maxInFlight int, + onSent func(tx *types.LoadTx, err error), +) *openLoopScheduler { + return &openLoopScheduler{ + generator: gen, + sender: snd, + limiter: limiter, + inflight: utils.NewSemaphore(maxInFlight), + onSent: onSent, + maxInFlight: maxInFlight, + } +} + +// Dropped returns the number of ticks shed so far because in-flight was saturated. +func (s *openLoopScheduler) Dropped() uint64 { return s.dropped.Load() } + +// Admitted returns the admitted-tick count (took a permit and drew a tx), for +// the conservation invariant in tests/audit; mirrors Dropped. +func (s *openLoopScheduler) Admitted() uint64 { return s.admitted.Load() } + +// Run drives the arrival clock until ctx is canceled or the generator is +// exhausted, spawning each accepted tx as a send task bounded by the in-flight +// semaphore. See the package doc for the arrival model. +func (s *openLoopScheduler) Run(ctx context.Context, scope service.Scope) error { + t0 := time.Now() + nextSend := t0 + var i uint64 + + for ctx.Err() == nil { + // Sample λ per step (honors a ramping limit; at fixed λ sums to t₀ + i/λ). + lambda := float64(s.limiter.Limit()) + if lambda < minScheduleRate { + lambda = minScheduleRate + } + gap := time.Duration(float64(time.Second) / lambda) + + // Sleep to the absolute instant (not "gap from now") to avoid drift. + if err := utils.SleepUntil(ctx, nextSend); err != nil { + return err + } + + // Advance the clock per scheduled tick before admission (walks at λ + // regardless of drops). + intendedSendTime := nextSend + seqIndex := i + nextSend = nextSend.Add(gap) + i++ + + // Admit before generating: a dropped tick must not advance the seeded + // generator (determinism). TryAcquire is non-blocking — see package doc. + release, ok := s.inflight.TryAcquire() + if !ok { + s.dropped.Add(1) + continue + } + + tx, ok := s.generator.Generate() + if !ok { + release() // generator exhausted: release the unused permit and stop. + log.Print("Scheduler: generator returned no more transactions") + return nil + } + + // Stamp while sole owner (see LoadTx concurrency contract). + tx.IntendedSendTime = intendedSendTime + tx.SequenceIndex = seqIndex + s.admitted.Add(1) + + // complete releases the permit and reports the result, exactly once: the + // worker invokes it via tx.OnComplete after the real send; the Once guards + // the enqueue-failure fallback below from racing the worker. + var once sync.Once + complete := func(err error) { + once.Do(func() { + release() + if s.onSent != nil { + s.onSent(tx, err) + } + }) + } + tx.OnComplete = complete + + scope.Spawn(func() error { + // On enqueue failure the tx never reaches a worker; complete here so the + // permit isn't leaked. A send error must not tear down the campaign. + if err := s.sender.Send(ctx, tx); err != nil { + complete(err) + } + return nil + }) + } + return ctx.Err() +} diff --git a/sender/scheduler_realworker_test.go b/sender/scheduler_realworker_test.go new file mode 100644 index 0000000..364f07f --- /dev/null +++ b/sender/scheduler_realworker_test.go @@ -0,0 +1,513 @@ +package sender + +import ( + "context" + "encoding/json" + "io" + "math/big" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils/service" +) + +// This file is the production-path safety net for the open-loop in-flight bound. +// +// Every other scheduler test drives a FAKE TxSender that invokes tx.OnComplete +// itself, so the suite would stay green even if the real Worker forgot the +// `if tx.OnComplete != nil { tx.OnComplete(err) }` line in runTxSender — the one +// load-bearing line that makes the maxInFlight semaphore bound true unacked +// sends rather than nothing (permits would never be released → leak/meaningless +// bound). The tests here wire the REAL Worker (runTxSender → sendTransaction → +// the real ethclient → OnComplete) behind the scheduler and assert the permit +// is genuinely released by the worker on send completion. +// +// Harness: an httptest.Server speaking the minimal JSON-RPC the ethclient send +// path touches. SendTransaction issues exactly one eth_sendRawTransaction call +// per tx (verified against go-ethereum v1.16.1: HTTP dial makes no RPC call, and +// SendTransaction marshals the tx and calls eth_sendRawTransaction; no +// eth_chainId round-trip). This keeps the harness loopback-only and lets us +// exercise the real worker send path end to end with a controllable response — +// including a "block until released" mode for the maxInFlight=1 assertion. + +// jsonRPCReq is the subset of a JSON-RPC request we parse from the ethclient. +type jsonRPCReq struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Method string `json:"method"` +} + +// rpcServer is an httptest-backed JSON-RPC endpoint that answers +// eth_sendRawTransaction. It counts handled sends and can be put into a +// "block" mode where each send parks until explicitly released, so a test can +// hold a send in flight and observe the in-flight bound. +type rpcServer struct { + srv *httptest.Server + + entered atomic.Uint64 // eth_sendRawTransaction calls that entered the handler + handled atomic.Uint64 // eth_sendRawTransaction calls that returned a result + + // When blocking, every send waits on a fresh gate handed out via started so + // the test can release them one at a time. arrived is signaled when a send + // has entered the handler (so the test knows a send is genuinely in flight). + mu sync.Mutex + blocking bool + gates []chan struct{} // one per blocked send, in arrival order + arrived chan struct{} // buffered; one token per send that entered the handler +} + +func newRPCServer(t *testing.T) *rpcServer { + t.Helper() + s := &rpcServer{arrived: make(chan struct{}, 1024)} + s.srv = httptest.NewServer(http.HandlerFunc(s.handle)) + t.Cleanup(s.srv.Close) + return s +} + +func (s *rpcServer) url() string { return s.srv.URL } + +// setBlocking toggles the block-until-released mode. +func (s *rpcServer) setBlocking(b bool) { + s.mu.Lock() + s.blocking = b + s.mu.Unlock() +} + +// releaseOne unblocks the oldest parked send. Returns false if none is parked +// yet (caller should retry after observing an arrival). +func (s *rpcServer) releaseOne() bool { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.gates) == 0 { + return false + } + gate := s.gates[0] + s.gates = s.gates[1:] + close(gate) + return true +} + +func (s *rpcServer) handle(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var req jsonRPCReq + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if req.Method == "eth_sendRawTransaction" { + s.entered.Add(1) + s.mu.Lock() + blocking := s.blocking + var gate chan struct{} + if blocking { + gate = make(chan struct{}) + s.gates = append(s.gates, gate) + } + s.mu.Unlock() + + if blocking { + // Announce arrival, then park until released or the request ctx is + // canceled (campaign teardown). Parking here holds the real worker + // inside sendTransaction, so the scheduler's permit stays held. + s.arrived <- struct{}{} + select { + case <-gate: + case <-r.Context().Done(): + } + } + } + + id := req.ID + if len(id) == 0 { + id = json.RawMessage("0") + } + // A non-error result is enough; ethclient discards the value (nil out arg). + resp := struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Result string `json:"result"` + }{ + JSONRPC: "2.0", + ID: id, + Result: "0x0000000000000000000000000000000000000000000000000000000000000000", + } + if req.Method == "eth_sendRawTransaction" { + s.handled.Add(1) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(&resp) +} + +// signedTxGenerator yields real, signed DynamicFee transactions (the production +// EVMTransfer shape) so the real ethclient marshals and ships a valid raw tx to +// the JSON-RPC server. It is the generator.Generator the scheduler drives. +type signedTxGenerator struct { + mu sync.Mutex + remaining int + acct *types.Account + signer ethtypes.Signer + chainID *big.Int + issued int +} + +func newSignedTxGenerator(t *testing.T, n int) *signedTxGenerator { + t.Helper() + acct, err := types.NewAccount() + require.NoError(t, err) + chainID := big.NewInt(1) + return &signedTxGenerator{ + remaining: n, + acct: acct, + signer: ethtypes.NewCancunSigner(chainID), + chainID: chainID, + } +} + +func (g *signedTxGenerator) Generate() (*types.LoadTx, bool) { + g.mu.Lock() + defer g.mu.Unlock() + if g.remaining == 0 { + return nil, false + } + g.remaining-- + g.issued++ + + to := g.acct.Address + inner := ðtypes.DynamicFeeTx{ + ChainID: g.chainID, + Nonce: g.acct.GetAndIncrementNonce(), + To: &to, + Value: big.NewInt(1), + Gas: 21000, + GasTipCap: big.NewInt(2_000_000_000), + GasFeeCap: big.NewInt(200_000_000_000), + } + signed, err := ethtypes.SignTx(ethtypes.NewTx(inner), g.signer, g.acct.PrivKey) + if err != nil { + // Generators have no error channel; a signing failure here is a test bug. + panic(err) + } + scenario := &types.TxScenario{Name: "realworker", Sender: g.acct, Receiver: to} + return types.CreateTxFromEthTx(signed, scenario), true +} + +func (g *signedTxGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *signedTxGenerator) GetAccountPools() []types.AccountPool { return nil } + +func (g *signedTxGenerator) issuedCount() int { + g.mu.Lock() + defer g.mu.Unlock() + return g.issued +} + +// newRealWorker builds the production Worker against the given endpoint, in the +// open-loop configuration (RateLimited=false so the scheduler owns the clock, +// TrackReceipts=false so watchTransactions returns immediately and we exercise +// only the send path). It is the real TxSender the scheduler drives. +func newRealWorker(endpoint string, tasks, buffer int) *Worker { + return NewWorker(&WorkerConfig{ + ID: 0, + SeiChainID: "test", + Endpoint: endpoint, + BufferSize: buffer, + Tasks: tasks, + DryRun: false, + Debug: false, + Collector: stats.NewCollector(), + RateLimited: false, + }) +} + +// TestRealWorker_Conservation_OnRealSendPath asserts conservation +// (issued == completed + dropped) where `completed` is driven exclusively by the +// REAL worker invoking tx.OnComplete after sendTransaction returns — not by a +// fake. If runTxSender stopped calling OnComplete, completed would stall and +// this would fail. +func TestRealWorker_Conservation_OnRealSendPath(t *testing.T) { + const txCount = 200 + srv := newRPCServer(t) + gen := newSignedTxGenerator(t, txCount) + worker := newRealWorker(srv.url(), 8, 256) + + var completed, succeeded atomic.Uint64 + onSent := func(_ *types.LoadTx, err error) { + completed.Add(1) + if err == nil { + succeeded.Add(1) + } + } + + limiter := rate.NewLimiter(rate.Limit(2000), 1) + sched := newOpenLoopScheduler(gen, worker, limiter, 256, onSent) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + // Run the worker and scheduler in a scope whose teardown WE control via + // runCancel — not the scheduler's return. + // + // service.Run cancels the scope's context as soon as every MAIN task returns. + // If the scheduler were a main task, the instant it exhausts the generator and + // returns, service.Run would cancel the worker's context — aborting any send + // still in flight. A send whose 200 OK the server already counted (handled++) + // but whose client.SendTransaction had not yet returned would then fail with + // context-canceled: OnComplete fires with err != nil, so completed++ but NOT + // succeeded++. That is exactly the observed flake (handled=200, succeeded=199): + // not a sampling artifact but a teardown that races the last in-flight send. + // + // So the scheduler and worker are BACKGROUND tasks, and the lone MAIN task is a + // gate that blocks until the test calls runCancel(). The scope therefore stays + // alive — the worker keeps draining txChan and firing OnComplete — until the + // test has observed quiescence and torn down deliberately. + runCtx, runCancel := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = service.Run(runCtx, func(ctx context.Context, scope service.Scope) error { + scope.SpawnBg(func() error { return worker.Run(ctx) }) + scope.SpawnBg(func() error { return sched.Run(ctx, scope) }) + // Main task: hold the scope open until the test signals teardown. + <-ctx.Done() + return nil + }) + }() + + // Assert ONLY at quiescence. All invariants are sampled together in one + // predicate so we never read them mid-flight. Post-reorder, the generator is + // drawn only on admitted ticks, so the conservation anchor is the scheduler's + // OWN counters, not the generator draw count: + // + // admitted: Admitted() == txCount (every draw was admitted — the + // generator is drained and dropped ticks consumed no draw; + // the precondition that makes the fixpoint below stable) + // conservation: completed == Admitted() (every admitted tx reached a + // terminal state via the real worker's OnComplete) + // equality: succeeded == handled (every server-handled send + // produced exactly one successful worker-driven completion) + // + // conservation and equality are transiently off WHILE a send is in flight: the + // server bumps `handled` when it RECEIVES eth_sendRawTransaction, but the worker + // bumps `succeeded` only AFTER SendTransaction returns and OnComplete fires — the + // instants differ by the server→worker-return window. Sampling any of them alone, + // or at different instants, can catch that window. Requiring all three together, + // only once they hold, observes the system after that window has drained. The + // counters are monotonic; once the generator is exhausted no new work is admitted, + // so once ALL THREE hold they stay held — that stable fixpoint is the quiescent + // point. (The deeper hazard the gate above fixes is teardown racing that same + // window; here we additionally refuse to read until the window is empty.) + // + // Driven by the real worker's OnComplete — a missing invoke leaves completed + // (and succeeded) short forever, so convergence never happens and the test + // fails on the Eventually deadline. CI is slow, so the window is generous; + // correctness depends on convergence, not on the deadline firing. + const total = uint64(txCount) + require.Eventually(t, func() bool { + admittedAll := sched.Admitted() == total + conserved := completed.Load() == sched.Admitted() + balanced := succeeded.Load() == srv.handled.Load() + return admittedAll && conserved && balanced + }, 10*time.Second, 2*time.Millisecond, + "never reached quiescence (want admitted=completed=%d, succeeded=handled)", total) + + // System is quiescent: the generator is drained, no send is in flight, every + // admitted tx is terminal, and every handled send has its OnComplete recorded. + // Only now tear down — the counters cannot move under us, so the assertions + // below re-read a frozen state, not a sampled one. + runCancel() + wg.Wait() + + require.Equal(t, total, sched.Admitted(), "every generator draw must be an admitted tx") + require.Equal(t, total, uint64(gen.issuedCount()), "the generator must be fully drained") + require.Positive(t, srv.handled.Load(), "the real RPC server must have handled sends") + require.Equal(t, sched.Admitted(), completed.Load(), + "every admitted tx must reach a terminal state via the worker's OnComplete") + require.Equal(t, succeeded.Load(), srv.handled.Load(), + "each successful completion must correspond to one eth_sendRawTransaction") +} + +// TestRealWorker_PermitReleasedByWorker is the teeth: with maxInFlight=1 and a +// single worker task, the RPC server blocks the first send. The real worker is +// parked inside sendTransaction, so it has NOT yet called tx.OnComplete and the +// single permit stays held — every subsequent arrival must drop. Releasing the +// blocked send lets the worker return from sendTransaction, fire OnComplete, and +// free the permit, so flow resumes. +// +// If someone deletes the `if tx.OnComplete != nil { tx.OnComplete(err) }` invoke +// in runTxSender, the permit is never released even after the send completes: +// the worker would never accept a second tx, so handled stays at 1 and the +// resume assertion fails. That is the falsification this test exists for. +func TestRealWorker_PermitReleasedByWorker(t *testing.T) { + srv := newRPCServer(t) + srv.setBlocking(true) + + // Plenty of arrivals so the scheduler keeps offering txs while the first is + // parked; the surplus must drop because the lone permit is held. + gen := newSignedTxGenerator(t, 1000) + // One task: a single runTxSender owns the only permit's lifecycle, so the + // permit can only be freed by that worker calling OnComplete. + worker := newRealWorker(srv.url(), 1, 1) + + var completed atomic.Uint64 + onSent := func(_ *types.LoadTx, _ error) { completed.Add(1) } + + // Fast arrival clock so many txs are offered during the blocked window. + limiter := rate.NewLimiter(rate.Limit(5000), 1) // 0.2ms gap + sched := newOpenLoopScheduler(gen, worker, limiter, 1, onSent) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = service.Run(ctx, func(ctx context.Context, scope service.Scope) error { + scope.SpawnBg(func() error { return worker.Run(ctx) }) + return sched.Run(ctx, scope) + }) + }() + + // Wait until exactly one send is genuinely in flight (parked in the handler). + <-srv.arrived + + // While that send is parked, the worker has not fired OnComplete, so the lone + // permit is held. Give the fast scheduler time to offer (and drop) a slew of + // arrivals, then assert the bound held: exactly one send in flight, none + // completed yet, and the rest dropped. + require.Eventually(t, func() bool { + return sched.Dropped() > 10 + }, 2*time.Second, 2*time.Millisecond, + "arrivals past the single held permit must drop while the send is in flight") + + require.Equal(t, uint64(0), completed.Load(), + "no completion may be reported while the only send is still parked") + require.Equal(t, uint64(1), srv.entered.Load(), + "exactly one send may be in flight under maxInFlight=1") + require.Equal(t, uint64(0), srv.handled.Load(), + "the parked send has not returned a result yet, so the permit is still held") + + // Release the blocked send. The real worker now returns from sendTransaction + // and MUST invoke tx.OnComplete to free the permit. If it does not (the bug), + // no further send is ever admitted and handled stays at 1 forever. + require.True(t, srv.releaseOne(), "one send must be parked and releasable") + + // Switch off blocking so resumed sends complete immediately, and prove flow + // resumes: more than one send is now handled, which is only possible if the + // permit from the first send was released by the worker's OnComplete. + srv.setBlocking(false) + // Drain any further parked sends that arrived between release and unblocking. + go func() { + for ctx.Err() == nil { + if !srv.releaseOne() { + select { + case <-ctx.Done(): + return + case <-time.After(time.Millisecond): + } + } + } + }() + + require.Eventually(t, func() bool { + return srv.handled.Load() > 1 && completed.Load() > 1 + }, 3*time.Second, 2*time.Millisecond, + "flow must resume after the worker releases the permit via OnComplete "+ + "(handled=%d completed=%d)", srv.handled.Load(), completed.Load()) + + // Flow resumed: at least one further send completed after the release, so the + // worker really did free the permit. Record the post-resume state before tear + // down (strict end-to-end conservation is covered by the conservation test; + // here cancel may leave a single tx mid-flight, so we only bound leaks). + require.Greater(t, completed.Load(), uint64(1), "resumed sends must complete") + + cancel() + wg.Wait() + + // No leak past the one tx that may be mid-flight at cancel. Post-reorder, the + // generator is drawn only on admitted ticks, so every draw is an admitted tx: + // generator-draw count must equal Admitted() exactly (dropped ticks consumed + // no draw — the determinism property, on the real worker path). + admitted := sched.Admitted() + require.Equal(t, admitted, uint64(gen.issuedCount()), + "every generator draw must be an admitted tx; dropped ticks consume no draw") + + // Each admitted tx completes exactly once via the worker's OnComplete, so + // completed must equal Admitted() — minus at most the single tx left mid-flight + // when cancel raced its in-flight send. + require.LessOrEqual(t, completed.Load(), admitted, "no admitted tx may complete more than once") + require.GreaterOrEqual(t, completed.Load()+1, admitted, + "at most one admitted tx may be unaccounted at cancel (admitted=%d completed=%d dropped=%d)", + admitted, completed.Load(), sched.Dropped()) +} + +// TestDispatcher_PrewarmRateLimitedInOpenLoop guards the prewarm-flood +// regression: in open-loop the workers are constructed RateLimited=false, but +// the scheduler paces only the MAIN load. Prewarm runs first over those same +// ungated workers, so it must pace itself off the shared limiter or it floods +// the SUT. With workers wired exactly as in open-loop, a low limit, and many +// more prewarm txs than the worker pool could absorb instantly, an unpaced +// prewarm would drain in well under the limiter's minimum span. We assert the +// run took at least the paced floor — i.e. the limiter actually gated prewarm — +// and that every prewarm tx still reached the RPC server (no drops on prewarm). +func TestDispatcher_PrewarmRateLimitedInOpenLoop(t *testing.T) { + srv := newRPCServer(t) + const prewarmTxs = 40 + const rps = 200.0 // limiter: 200 tx/s → unpaced 40 txs is near-instant + + worker := newRealWorker(srv.url(), 8, 256) // RateLimited=false, open-loop shape + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _ = service.Run(ctx, func(ctx context.Context, scope service.Scope) error { + scope.SpawnBg(func() error { return worker.Run(ctx) }) + <-ctx.Done() + return nil + }) + }() + + limiter := rate.NewLimiter(rate.Limit(rps), 1) + d := NewDispatcher(newSignedTxGenerator(t, 0), worker) + d.SetOpenLoop(limiter, 256) // sets d.limiter so Prewarm self-paces + d.SetPrewarmGenerator(newSignedTxGenerator(t, prewarmTxs)) + + start := time.Now() + require.NoError(t, d.Prewarm(ctx)) + elapsed := time.Since(start) + + // Paced floor: (N-1) gaps at the limiter rate (burst=1 lets the first through + // immediately). Use half as a generous lower bound to absorb scheduling slop + // while still excluding the unpaced (near-zero) case decisively. + pacedFloor := time.Duration(float64(prewarmTxs-1) / rps * float64(time.Second)) + require.Greater(t, elapsed, pacedFloor/2, + "prewarm must be limiter-paced in open-loop, not flooded (elapsed=%s floor=%s)", + elapsed, pacedFloor) + + require.Eventually(t, func() bool { + return srv.handled.Load() == uint64(prewarmTxs) + }, 2*time.Second, 2*time.Millisecond, + "every prewarm tx must reach the SUT (handled=%d want=%d)", + srv.handled.Load(), prewarmTxs) + + cancel() + wg.Wait() +} diff --git a/sender/scheduler_test.go b/sender/scheduler_test.go new file mode 100644 index 0000000..a40d910 --- /dev/null +++ b/sender/scheduler_test.go @@ -0,0 +1,561 @@ +package sender + +import ( + "context" + "errors" + "slices" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils/service" +) + +// fakeGenerator hands out blank LoadTx values until count is exhausted. It +// records the IntendedSendTime/SequenceIndex the scheduler stamped, since those +// are the open-loop schedule under test. +type fakeGenerator struct { + mu sync.Mutex + remaining int + issued []*types.LoadTx +} + +func newFakeGenerator(n int) *fakeGenerator { return &fakeGenerator{remaining: n} } + +func (g *fakeGenerator) Generate() (*types.LoadTx, bool) { + g.mu.Lock() + defer g.mu.Unlock() + if g.remaining == 0 { + return nil, false + } + g.remaining-- + tx := &types.LoadTx{Scenario: &types.TxScenario{Name: "fake"}} + g.issued = append(g.issued, tx) + return tx, true +} + +func (g *fakeGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *fakeGenerator) GetAccountPools() []types.AccountPool { + return nil +} + +func (g *fakeGenerator) issuedTxs() []*types.LoadTx { + g.mu.Lock() + defer g.mu.Unlock() + out := make([]*types.LoadTx, len(g.issued)) + copy(out, g.issued) + return out +} + +// seededGenerator stands in for a PLT-456 seeded generator: each Generate() +// draw is the next element of a deterministic stream, recorded here in draw +// order via a strictly increasing draw index stamped on the LoadTx. The point +// the determinism guard pins: a draw is consumed only when Generate() is +// called, so if the scheduler advances the stream on a dropped tick the draw +// indices admitted under saturation would be non-contiguous (gaps where a +// dropped tick stole a draw). Admitted draws forming the prefix 0..N-1 with no +// gaps proves dropped slots consume zero seeded draws. +type seededGenerator struct { + mu sync.Mutex + remaining int + drawIndex map[*types.LoadTx]uint64 // tx → its draw position in the stream + nextDraw uint64 +} + +func newSeededGenerator(n int) *seededGenerator { + return &seededGenerator{remaining: n, drawIndex: map[*types.LoadTx]uint64{}} +} + +func (g *seededGenerator) Generate() (*types.LoadTx, bool) { + g.mu.Lock() + defer g.mu.Unlock() + if g.remaining == 0 { + return nil, false + } + g.remaining-- + tx := &types.LoadTx{Scenario: &types.TxScenario{Name: "seeded"}} + g.drawIndex[tx] = g.nextDraw + g.nextDraw++ + return tx, true +} + +// draw returns the stream position at which tx was produced by Generate(). +func (g *seededGenerator) draw(tx *types.LoadTx) uint64 { + g.mu.Lock() + defer g.mu.Unlock() + return g.drawIndex[tx] +} + +func (g *seededGenerator) GenerateN(int) []*types.LoadTx { panic("unused") } +func (g *seededGenerator) GetAccountPools() []types.AccountPool { return nil } + +// asyncFakeSender models the production ShardedSender's send semantics: Send +// returns when the tx lands in a buffered channel (enqueue-and-return), NOT when +// the network send completes. Background workers dequeue and, after an optional +// per-send delay (a slow SUT), invoke tx.OnComplete to release the scheduler's +// in-flight permit. This is what exercises the HONEST in-flight bound (B2): a +// synchronous sender that blocks in Send would hide that the permit must be tied +// to real completion, not to enqueue. +type asyncFakeSender struct { + ch chan *types.LoadTx + delay time.Duration + sent atomic.Uint64 // incremented when a send actually completes +} + +// newAsyncFakeSender starts `workers` background senders draining a buffer of +// `buffer` slots. Mirrors a worker pool behind a bounded channel. +func newAsyncFakeSender(ctx context.Context, buffer, workers int, delay time.Duration) *asyncFakeSender { + s := &asyncFakeSender{ch: make(chan *types.LoadTx, buffer), delay: delay} + if workers < 1 { + workers = 1 + } + for range workers { + go s.drain(ctx) + } + return s +} + +func (s *asyncFakeSender) drain(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case tx := <-s.ch: + if s.delay > 0 { + t := time.NewTimer(s.delay) + select { + case <-t.C: + case <-ctx.Done(): + t.Stop() + } + } + s.sent.Add(1) + if tx.OnComplete != nil { + tx.OnComplete(nil) + } + } + } +} + +// Send enqueues without blocking on completion, returning at enqueue. If the +// buffer is full it blocks on the channel until a slot frees or ctx is done — +// like utils.Send in the real worker. The scheduler must never see this block +// throttle its clock because admission is gated by the in-flight permit upstream. +func (s *asyncFakeSender) Send(ctx context.Context, tx *types.LoadTx) error { + select { + case s.ch <- tx: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// runScheduler drives the scheduler in its own scope until the context expires, +// returning the scheduler so the caller can read Dropped(). +func runScheduler(ctx context.Context, sched *openLoopScheduler) { + _ = service.Run(ctx, func(ctx context.Context, s service.Scope) error { + return sched.Run(ctx, s) + }) +} + +// TestOpenLoopSchedule_TracksT0PlusIOverLambda is the core Done-criterion test: +// at a fixed λ against a fast sender, the IntendedSendTime stamped on tx i must +// track t₀ + i/λ within tolerance, independent of completion. +func TestOpenLoopSchedule_TracksT0PlusIOverLambda(t *testing.T) { + const lambda = 200.0 // tx/s → 5ms gap + gen := newFakeGenerator(40) + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + snd := newAsyncFakeSender(ctx, 1024, 8, 0) + limiter := rate.NewLimiter(rate.Limit(lambda), 1) + sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) + + start := time.Now() + runScheduler(ctx, sched) + + issued := gen.issuedTxs() + require.GreaterOrEqual(t, len(issued), 30, "scheduler should issue most txs within the window") + + gap := time.Second / time.Duration(lambda) + // t₀ is the scheduler's internal start; bound it to [start, start+gap]. + t0 := issued[0].IntendedSendTime + require.WithinDuration(t, start, t0, gap, "t₀ must be the campaign start") + + const tol = 2 * time.Millisecond + for i, tx := range issued { + require.Equal(t, uint64(i), tx.SequenceIndex, "sequence index must be monotonic from 0") + want := t0.Add(time.Duration(i) * gap) + require.WithinDuration(t, want, tx.IntendedSendTime, tol, + "tx %d IntendedSendTime must track t₀ + i/λ", i) + } +} + +// TestOpenLoopSchedule_NotThrottledBySlowSender proves the arrival clock is not +// dragged by a slow SUT: with a sender far slower than the in-flight bound can +// absorb, the schedule must still advance at λ and the overrun must be dropped, +// not absorbed by blocking. +// +// It uses an ASYNC sender (Send returns at enqueue) so the drop count reflects +// the HONEST in-flight bound (B2): the permit is held until each send actually +// completes, so a slow SUT saturates maxInFlight and forces genuine load-shed — +// not buffer geometry. A synchronous sender would have masked this. +func TestOpenLoopSchedule_NotThrottledBySlowSender(t *testing.T) { + const lambda = 500.0 // 2ms gap + gen := newFakeGenerator(200) + + ctx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond) + defer cancel() + + // Each send completes after 100ms; with maxInFlight=4 and only 4 draining + // workers the senders can sustain only ~40 tx/s, an order of magnitude under + // λ → most txs must be dropped. The buffer is deliberately small: with the + // honest bound the in-flight permit (not the buffer) is the gate. + const maxInFlight = 4 + snd := newAsyncFakeSender(ctx, maxInFlight, maxInFlight, 100*time.Millisecond) + limiter := rate.NewLimiter(rate.Limit(lambda), 1) + sched := newOpenLoopScheduler(gen, snd, limiter, maxInFlight, nil) + + start := time.Now() + runScheduler(ctx, sched) + + admittedTxs := gen.issuedTxs() + gap := time.Second / time.Duration(lambda) + + // The clock must have kept advancing at λ despite the slow sender: the + // number of SCHEDULED ARRIVALS (admitted + dropped) — not generator draws — + // must have walked far past what the senders could absorb. Generate() now + // runs only on admitted ticks (post-reorder), so generated count is small; + // the never-throttled-clock property lives in the scheduled-arrival count. + scheduled := sched.Admitted() + sched.Dropped() + require.GreaterOrEqual(t, scheduled, uint64(100), + "arrival clock must not be throttled by the slow sender (scheduled=%d)", scheduled) + + // Schedule accuracy still holds for each admitted tx, keyed on its + // SequenceIndex (the arrival-tick index i): IntendedSendTime ≈ t₀ + i/λ. + // Admitted txs have NON-CONTIGUOUS SequenceIndex under drops, so the schedule + // must be checked against tx.SequenceIndex, never the slice position. + require.Positive(t, len(admittedTxs), "some txs must have been admitted") + // Recover the scheduler's internal t₀ from the first admitted tx and its + // arrival index: t₀ = IntendedSendTime − SequenceIndex·gap. Bound it to the + // test's observed start window, then assert every admitted tx sits on the + // t₀ + i/λ grid at its own SequenceIndex. + first := admittedTxs[0] + t0 := first.IntendedSendTime.Add(-time.Duration(first.SequenceIndex) * gap) + require.WithinDuration(t, start, t0, gap, "recovered t₀ must be the campaign start") + const tol = 3 * time.Millisecond + for _, tx := range admittedTxs { + want := t0.Add(time.Duration(tx.SequenceIndex) * gap) + require.WithinDuration(t, want, tx.IntendedSendTime, tol, + "admitted tx (seq %d) schedule must hold under a slow sender", tx.SequenceIndex) + } + + // Overrun is dropped-and-counted, not blocked on. With ~150 scheduled arrivals + // in 300ms and senders able to complete only ~a dozen, the vast majority drop. + require.Positive(t, sched.Dropped(), "overrun must be counted as dropped") + require.Greater(t, sched.Dropped(), sched.Admitted(), + "a slow SUT must shed most of the load through the in-flight bound") +} + +// gatedSender enqueues instantly but holds completion until release is called, +// letting a test observe the window where a tx is enqueued-but-not-completed. +type gatedSender struct { + enqueued atomic.Uint64 + mu sync.Mutex + pending []*types.LoadTx +} + +func (s *gatedSender) Send(_ context.Context, tx *types.LoadTx) error { + s.mu.Lock() + s.pending = append(s.pending, tx) + s.mu.Unlock() + s.enqueued.Add(1) + return nil // returns at enqueue, like the production worker +} + +// completeAll fires OnComplete for every tx enqueued so far, releasing permits. +func (s *gatedSender) completeAll() { + s.mu.Lock() + pending := s.pending + s.pending = nil + s.mu.Unlock() + for _, tx := range pending { + if tx.OnComplete != nil { + tx.OnComplete(nil) + } + } +} + +// TestOpenLoopSchedule_PermitHeldUntilCompletion is the B2 guard: the in-flight +// permit must be tied to real send completion, not to enqueue. With maxInFlight=1 +// and a sender that enqueues instantly but never completes, the first tx takes +// the only permit and holds it; every subsequent tx must drop. If the permit +// released at enqueue (the masked bug), the sender would have enqueued many. +func TestOpenLoopSchedule_PermitHeldUntilCompletion(t *testing.T) { + gen := newFakeGenerator(100) + snd := &gatedSender{} + limiter := rate.NewLimiter(rate.Limit(1000), 1) // 1ms gap → many arrivals + sched := newOpenLoopScheduler(gen, snd, limiter, 1, nil) + + ctx, cancel := context.WithTimeout(t.Context(), 120*time.Millisecond) + defer cancel() + runScheduler(ctx, sched) + + // Exactly one tx held the single permit through the whole run (never + // completed), so the sender saw exactly one enqueue and everything else + // dropped. Enqueue-time release would have let many through. + require.Equal(t, uint64(1), snd.enqueued.Load(), + "permit must be held until completion: only one tx may be in flight") + require.Positive(t, sched.Dropped(), "arrivals past the held permit must drop") + + // Release the held permit; conservation still holds (issued == sent+dropped + // is checked elsewhere). Drain to avoid a leaked OnComplete at teardown. + snd.completeAll() +} + +// flakyAsyncSender drains like asyncFakeSender but completes every failEvery-th +// send (1-indexed) with an error, so a tx can be ADMITTED, enqueued, and then +// fail in the send — the path the failed-send accounting must capture. A failed +// send must be counted as failed (not lost and not dropped), so the conservation +// invariant becomes issued == dropped + succeeded + failed. +type flakyAsyncSender struct { + ch chan *types.LoadTx + failEvery uint64 + seen atomic.Uint64 +} + +func newFlakyAsyncSender(ctx context.Context, buffer, workers int, failEvery uint64) *flakyAsyncSender { + s := &flakyAsyncSender{ch: make(chan *types.LoadTx, buffer), failEvery: failEvery} + if workers < 1 { + workers = 1 + } + for range workers { + go s.drain(ctx) + } + return s +} + +func (s *flakyAsyncSender) drain(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case tx := <-s.ch: + n := s.seen.Add(1) + var err error + if s.failEvery > 0 && n%s.failEvery == 0 { + err = errors.New("injected send failure") + } + if tx.OnComplete != nil { + tx.OnComplete(err) + } + } + } +} + +func (s *flakyAsyncSender) Send(ctx context.Context, tx *types.LoadTx) error { + select { + case s.ch <- tx: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// TestOpenLoopSchedule_Conservation checks the two-level accounting invariant +// (see package doc, Conservation) under injected send failures: +// +// scheduled == dropped + admitted (every tick has one arrival outcome) +// admitted == succeeded + failed (every admitted tx has one send outcome) +// +// scheduled is the scheduler's own arrival count (Admitted + Dropped), NOT the +// generator's issued count: after admit-before-generate a dropped tick never +// calls Generate, so len(issuedTxs) tracks admitted draws, not scheduled ticks. +// A failed send must land in failed, never be silently lost. The dispatcher's +// onSent is the accountant under test (via a stand-in mirroring its split). +func TestOpenLoopSchedule_Conservation(t *testing.T) { + gen := newFakeGenerator(300) + // Generous capacity so most txs complete; a few may drop on brief bursts. + limiter := rate.NewLimiter(rate.Limit(1000), 1) + + var succeeded, failed atomic.Uint64 + onSent := func(_ *types.LoadTx, err error) { + if err != nil { + failed.Add(1) + return + } + succeeded.Add(1) + } + + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + // Fail every 5th send so failed > 0 and the invariant must absorb it. + snd := newFlakyAsyncSender(ctx, 256, 16, 5) + sched := newOpenLoopScheduler(gen, snd, limiter, 256, onSent) + runScheduler(ctx, sched) + + admitted := sched.Admitted() + require.Positive(t, admitted) + // scheduled == dropped + admitted: a dropped tick consumes no Generate draw, + // so the generator's issued count tracks admitted ticks, not scheduled ticks. + require.Equal(t, admitted, uint64(len(gen.issuedTxs())), + "each admitted tick must draw exactly one tx (admit-before-generate)") + + // Allow the in-flight sends spawned just before deadline to settle, then + // assert admitted == succeeded + failed (the send-outcome partition). + require.Eventually(t, func() bool { + return succeeded.Load()+failed.Load() == sched.Admitted() + }, time.Second, 5*time.Millisecond, + "every admitted tx must be succeeded or failed exactly once "+ + "(admitted=%d succeeded=%d failed=%d)", + sched.Admitted(), succeeded.Load(), failed.Load()) + require.Positive(t, failed.Load(), + "injected failures must be counted as failed, not lost or dropped") +} + +// TestOpenLoopSchedule_DroppedSlotsConsumeNoDraws is the determinism guard for +// the PLT-456 × PLT-458 interaction: under saturation the scheduler must admit a +// deterministic PREFIX of the seeded stream — a dropped tick consumes zero +// generator draws (and zero signing CPU), so the same seed yields the same +// admitted multiset no matter how many ticks the SUT speed forced to drop. +// +// The falsification: if Generate() were called before TryAcquire (the original +// bug), every dropped tick would still advance the seeded stream, so the admitted +// txs' draw indices would have gaps — admitted draw k+1 would jump past the draws +// the dropped ticks consumed. Here we drive a saturated sender (maxInFlight=1, +// completion gated) so the great majority of ticks drop, then assert the admitted +// txs' draw indices are exactly 0,1,2,… contiguous with no gaps. +func TestOpenLoopSchedule_DroppedSlotsConsumeNoDraws(t *testing.T) { + gen := newSeededGenerator(2000) + snd := &gatedSender{} + limiter := rate.NewLimiter(rate.Limit(5000), 1) // 0.2ms gap → many ticks + // maxInFlight=1 with a sender that never auto-completes: the first admitted + // tx holds the only permit, so every later tick drops until we release. + var admitted []uint64 + var mu sync.Mutex + onSent := func(tx *types.LoadTx, err error) { + require.NoError(t, err) + mu.Lock() + admitted = append(admitted, gen.draw(tx)) + mu.Unlock() + } + sched := newOpenLoopScheduler(gen, snd, limiter, 1, onSent) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runScheduler(ctx, sched) + }() + + // Periodically release the single held permit so flow inches forward one + // admitted tx at a time while the fast clock drops the rest. This guarantees + // many drops interleaved with a handful of admits — the regime that would + // expose any draw consumed by a dropped tick. + relCtx, relCancel := context.WithCancel(ctx) + defer relCancel() + go func() { + for relCtx.Err() == nil { + snd.completeAll() + select { + case <-relCtx.Done(): + case <-time.After(2 * time.Millisecond): + } + } + }() + + wg.Wait() + relCancel() + snd.completeAll() // drain any final held permit's OnComplete + + mu.Lock() + got := append([]uint64(nil), admitted...) + mu.Unlock() + + require.Positive(t, sched.Dropped(), "the saturated sender must force drops") + require.Positive(t, len(got), "some txs must have been admitted") + require.Greater(t, sched.Dropped(), uint64(len(got)), + "saturation must drop far more ticks than it admits") + + // The admitted draws must be the contiguous prefix 0,1,2,…,N-1: each admitted + // tx consumed exactly the next stream draw, and no dropped tick consumed any. + // Sort first: with the permit released just before onSent fires, two adjacent + // completions can be recorded out of order, but the SET must still be gapless. + slices.Sort(got) + for k, draw := range got { + require.Equal(t, uint64(k), draw, + "admitted draw set must be the gapless prefix; slot %d is draw %d (a dropped tick must consume no draw)", + k, draw) + } +} + +// TestOpenLoopSchedule_HonorsRampedLambda verifies the schedule responds to a +// λ change applied via the shared limiter (the ramper's rate authority): after +// SetLimit, the inter-arrival gap tracks the new λ. +func TestOpenLoopSchedule_HonorsRampedLambda(t *testing.T) { + gen := newFakeGenerator(1000) + ctx, cancel := context.WithTimeout(t.Context(), 600*time.Millisecond) + defer cancel() + snd := newAsyncFakeSender(ctx, 1024, 8, 0) + // Start slow so the first gaps are large and easy to distinguish. + limiter := rate.NewLimiter(rate.Limit(50), 1) // 20ms gap + sched := newOpenLoopScheduler(gen, snd, limiter, 1024, nil) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runScheduler(ctx, sched) + }() + + // Let it run at 50 tps, then ramp to 500 tps and let it run more. + time.Sleep(200 * time.Millisecond) + limiter.SetLimit(rate.Limit(500)) // 2ms gap + wg.Wait() + + issued := gen.issuedTxs() + require.GreaterOrEqual(t, len(issued), 2, "scheduler must issue txs") + + // The min gap observed in the back half must reflect the faster λ: with a + // 2ms target the later gaps are far under the initial 20ms gap. + minGap := time.Hour + for i := 1; i < len(issued); i++ { + g := issued[i].IntendedSendTime.Sub(issued[i-1].IntendedSendTime) + if g < minGap { + minGap = g + } + } + require.Less(t, minGap, 10*time.Millisecond, + "ramped-up λ must shrink the inter-arrival gap below the initial 20ms") +} + +// TestOpenLoopSchedule_StampsBeforeHandoff guards the LoadTx concurrency +// contract: the scheduler stamps IntendedSendTime and SequenceIndex before the +// send task can touch the tx. Run under -race to catch a regression. +func TestOpenLoopSchedule_StampsBeforeHandoff(t *testing.T) { + gen := newFakeGenerator(50) + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + snd := newAsyncFakeSender(ctx, 64, 8, 0) + limiter := rate.NewLimiter(rate.Limit(1000), 1) + + var checked atomic.Uint64 + onSent := func(tx *types.LoadTx, err error) { + require.NoError(t, err) + require.False(t, tx.IntendedSendTime.IsZero(), "schedule must be stamped before send") + checked.Add(1) + } + sched := newOpenLoopScheduler(gen, snd, limiter, 64, onSent) + + runScheduler(ctx, sched) + + require.Positive(t, checked.Load(), "onSent must observe stamped txs") +} diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index 31b0505..f82eb7f 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -23,6 +23,11 @@ func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector * return nil, fmt.Errorf("no endpoints configured") } + // The worker is the rate authority only in the closed-loop model. In + // open-loop the scheduler owns the arrival clock (driving the same shared + // limiter), so worker-side gating would double-throttle the rate. + rateLimited := cfg.Settings.ArrivalModel != config.ArrivalModelOpenLoop + workers := make([]*Worker, len(cfg.Endpoints)) for i, endpoint := range cfg.Endpoints { workers[i] = NewWorker(&WorkerConfig{ @@ -36,6 +41,7 @@ func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector * Debug: cfg.Settings.Debug, Collector: collector, Limiter: limiter, + RateLimited: rateLimited, }) } @@ -80,5 +86,5 @@ type WorkerStats struct { ChannelLength int } -// GetNumShards returns the number of shards (workers) +// NumShards returns the number of shards (workers) func (s *ShardedSender) NumShards() int { return len(s.workers) } diff --git a/sender/worker.go b/sender/worker.go index aff47fb..7e7b60c 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -38,7 +38,10 @@ type WorkerConfig struct { Debug bool TrackReceipts bool Collector *stats.Collector - Limiter *rate.Limiter // Shared rate limiter for transaction sending + Limiter *rate.Limiter // Shared rate authority; nil disables gating. + // RateLimited gates worker-side gating: true in closed-loop (worker is the + // rate authority), false in open-loop (scheduler owns the clock; see doc.go). + RateLimited bool } // Worker handles sending transactions to a specific endpoint @@ -219,9 +222,12 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * // runTxSender is the main worker loop that processes transactions func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) error { for ctx.Err() == nil { - // Apply rate limiting before getting the next transaction - if err := w.cfg.Limiter.Wait(ctx); err != nil { - return err + // Closed-loop gating: block on the limiter before dequeuing. Skipped in + // open-loop where the scheduler is the rate authority (see doc.go). + if w.cfg.RateLimited && w.cfg.Limiter != nil { + if err := w.cfg.Limiter.Wait(ctx); err != nil { + return err + } } tx, err := utils.Recv(ctx, w.txChan) @@ -230,11 +236,15 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro } startTime := time.Now() - // This goroutine solely owns tx between dequeue and the sentTxs hand-off, - // so stamping the actual send-attempt time here is race-free (see LoadTx). + // Sole owner between dequeue and hand-off: stamp is race-free (see LoadTx). tx.AttemptedSendTime = startTime err = w.sendTransaction(ctx, client, tx) - // Record statistics if collector is available + // CRITICAL: OnComplete must fire only after the real send returns — that is + // what makes the open-loop semaphore bound true unacked sends (doc.go: + // permit lifecycle). Nil on closed-loop/batch. + if tx.OnComplete != nil { + tx.OnComplete(err) + } w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) if err != nil { log.Printf("%v", err) @@ -266,12 +276,9 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, ) }(time.Now()) if w.cfg.DryRun { - // In dry-run mode, simulate processing time and mark as successful - // Use very minimal delay to avoid channel overflow - return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation + return utils.Sleep(ctx, 10*time.Microsecond) // minimal delay, no RPC } - // Send through go-ethereum so the same code path supports both HTTP(S) and WS(S) RPC. if err := client.SendTransaction(ctx, tx.EthTx); err != nil { txsRejected.Add(ctx, 1, metric.WithAttributes( attribute.String("endpoint", w.cfg.Endpoint), @@ -286,8 +293,7 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, attribute.String("scenario", tx.Scenario.Name), )) - // Write to sentTxs channel without blocking - utils.SendOrDrop(w.sentTxs, tx) + utils.SendOrDrop(w.sentTxs, tx) // non-blocking handoff to receipt poller return nil } diff --git a/stats/metrics.go b/stats/metrics.go index 75bce3d..c66b879 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -41,6 +41,16 @@ var ( "run_txs_accepted_total", metric.WithDescription("Total transactions accepted by endpoints over this run (emitted once at run end)"), metric.WithUnit("{transactions}"))) + + runTxsDroppedTotal = must(meter.Int64Gauge( + "run_txs_dropped_total", + metric.WithDescription("Total open-loop transactions dropped because in-flight was saturated at their scheduled instant (emitted once at run end)"), + metric.WithUnit("{transactions}"))) + + runTxsFailedTotal = must(meter.Int64Gauge( + "run_txs_failed_total", + metric.WithDescription("Total open-loop transactions admitted and enqueued but whose send completed with an error (emitted once at run end)"), + metric.WithUnit("{transactions}"))) ) func must[V any](v V, err error) V { diff --git a/stats/run_summary.go b/stats/run_summary.go index c8480df..f3401d8 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -3,10 +3,32 @@ package stats import ( "context" "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) +// RunSummary carries the arrival-model accounting the collector does not track +// itself (the dispatcher owns it) into the run-summary gauges. +type RunSummary struct { + // ArrivalModel is "open_loop" or "closed_loop"; tags the dropped gauge so a + // nonzero drop count is attributable to the model that produced it. + ArrivalModel string + // Dropped is the count of open-loop txs shed on in-flight saturation. A + // dropped tx never reaches the inclusion tracker and carries no + // InclusionTime, so it must stay out of inclusion-rate denominators: the + // denominator is succeeded txs (those that reached a sender), never the full + // scheduled count. + Dropped uint64 + // Failed is the count of admitted open-loop txs whose send completed with an + // error. Like Dropped, a failed tx reached no inclusion tracker; it is + // reported so the conservation invariant (see sender package doc: + // scheduled = dropped + succeeded + failed) is auditable from the run summary. + Failed uint64 +} + // EmitRunSummary records the run-summary gauges. Call once at shutdown. -func (c *Collector) EmitRunSummary(ctx context.Context) { +func (c *Collector) EmitRunSummary(ctx context.Context, summary RunSummary) { c.mu.RLock() duration := time.Since(c.startTime) totalTxs := c.totalTxs @@ -16,4 +38,8 @@ func (c *Collector) EmitRunSummary(ctx context.Context) { runDurationSeconds.Record(ctx, duration.Seconds()) runTPSFinal.Record(ctx, finalTPS) runTxsAcceptedTotal.Record(ctx, int64(totalTxs)) + runTxsDroppedTotal.Record(ctx, int64(summary.Dropped), + metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) + runTxsFailedTotal.Record(ctx, int64(summary.Failed), + metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) } diff --git a/types/scenario.go b/types/scenario.go index db5e6a7..8d02d8e 100644 --- a/types/scenario.go +++ b/types/scenario.go @@ -12,27 +12,52 @@ import ( // LoadTx is a wrapper that has pre-encoded json rpc payload and eth transaction. // -// Lifecycle timestamp concurrency contract: a *LoadTx is passed by pointer -// through buffered channels (txChan, sentTxs). Each lifecycle timestamp is -// written at most once, by whichever goroutine owns the tx at that stage, and -// is immutable thereafter; ownership transfers with the pointer across the -// channels, so the writes need no locking. A zero timestamp means "not -// recorded" (e.g. prewarm txs, or a stage not yet reached) — consumers must -// treat it as untracked, never as the zero epoch. +// Lifecycle field concurrency contract: a *LoadTx is passed by pointer through +// buffered channels (txChan, sentTxs). Each lifecycle field (the timestamps and +// SequenceIndex) is written at most once, by whichever goroutine owns the tx at +// that stage, and is immutable thereafter; ownership transfers with the pointer +// across the channels, so the writes need no locking. The open-loop scheduler +// writes IntendedSendTime and SequenceIndex while it solely owns the tx (before +// the worker hand-off); the worker writes AttemptedSendTime; the inclusion +// tracker writes InclusionTime. A zero timestamp means "not recorded" (e.g. +// prewarm txs, or a stage not yet reached) — consumers must treat it as +// untracked, never as the zero epoch. type LoadTx struct { EthTx *ethtypes.Transaction JSONRPCPayload []byte Payload []byte Scenario *TxScenario - // IntendedSendTime is when the tx was scheduled to be sent, written by the - // dispatcher before the tx is enqueued. It currently holds the enqueue time, - // which is back-pressured under load; until an open-loop scheduler sets it to - // the intended schedule instant, it must not be used to derive latency. + // IntendedSendTime is when the tx was scheduled to be sent. In the open-loop + // arrival model the scheduler writes the true scheduled instant t₀ + i/λ + // here (independent of when a sender is free), which is the basis for + // coordinated-omission-free latency. In the legacy closed-loop model it + // instead holds the back-pressured enqueue time and must not be used to + // derive latency. A LoadTx cannot self-describe which model wrote it — an + // open-loop tx and a closed-loop tx are byte-identical (both can have + // SequenceIndex 0). Latency and schedule-lag consumers must gate on the + // run-level arrival model (RunSummary.ArrivalModel), not on any field here. IntendedSendTime time.Time + // SequenceIndex is the monotonic per-campaign arrival index i assigned by + // the open-loop scheduler, which schedules tx i at t₀ + i/λ. It attributes + // per-tx schedule lag (IntendedSendTime vs AttemptedSendTime) back to a + // position in the arrival sequence. Zero in the legacy closed-loop model, + // where no scheduler assigns it — so the value alone does not identify the + // model (see IntendedSendTime); the run's arrival model is authoritative. + SequenceIndex uint64 // AttemptedSendTime is when the send was actually attempted, written by the // worker goroutine that owns the tx between dequeue and the sentTxs hand-off. AttemptedSendTime time.Time + // OnComplete, if set, is invoked exactly once when the network send attempt + // for this tx finishes (after sendTransaction returns), with the send error + // or nil. The open-loop scheduler sets it to release the in-flight permit so + // the bound covers true unacked sends (enqueue + send), not just queue depth; + // see the open-loop scheduler. The worker invokes it after sendTransaction + // and is the sole invoker on the happy path. Nil in the closed-loop and batch + // paths, where the worker simply skips it. The callback must be cheap and + // non-blocking — the worker holds the tx and calls it inline. Written by the + // owning goroutine before hand-off, per the lifecycle concurrency contract. + OnComplete func(err error) // InclusionTime is when the tx was observed included on-chain, written only // by the inclusion tracker. InclusionTime time.Time diff --git a/utils/semaphore.go b/utils/semaphore.go index 728c12a..941c670 100644 --- a/utils/semaphore.go +++ b/utils/semaphore.go @@ -16,9 +16,22 @@ func NewSemaphore(n int) *Semaphore { // Acquire acquires a permit from the semaphore. // Blocks until a permit is available. -func (s *Semaphore) Acquire(ctx context.Context) (relase func(), err error) { +func (s *Semaphore) Acquire(ctx context.Context) (release func(), err error) { if err := Send(ctx, s.ch, struct{}{}); err != nil { return nil, err } return func() { <-s.ch }, nil } + +// TryAcquire acquires a permit without blocking. It returns the release func +// and true if a permit was available, or nil and false if all permits are held. +// Used by callers that must never block waiting for capacity (e.g. an open-loop +// scheduler that drops rather than throttling its clock). +func (s *Semaphore) TryAcquire() (release func(), ok bool) { + select { + case s.ch <- struct{}{}: + return func() { <-s.ch }, true + default: + return nil, false + } +}