Add timeout to standard pilot fetch#1255
Conversation
brandur
left a comment
There was a problem hiding this comment.
Thanks!
@bgentry Any strong opinions on how you want to handle this one? Another option is to just put the timeout in producer.go in dispatchWork:
func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally removes any deadlines or cancellation from the parent
// context because we don't want it to get cancelled if the producer is asked
// to shut down. In that situation, we want to finish fetching any jobs we are
// in the midst of fetching, work them, and then stop. Otherwise we'd have a
// risk of shutting down when we had already fetched jobs in the database,
// leaving those jobs stranded. We'd then potentially have to release them
// back to the queue.
ctx := context.WithoutCancel(workCtx)
// Maximum size of the `attempted_by` array on each job row. This maximum is
// rarely hit, but exists to protect against degenerate cases.
const maxAttemptedBy = 100
jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
ClientID: p.config.ClientID,
MaxAttemptedBy: maxAttemptedBy,
MaxToLock: count,
Now: p.Time.NowOrNil(),
Queue: p.config.Queue,
ProducerID: p.id.Load(),
Schema: p.config.Schema,
})
if err != nil {
fetchResultCh <- producerFetchResult{err: err}
return
}
fetchResultCh <- producerFetchResult{jobs: jobs}
}That might be better in the way that not every pilot needs to remember to bring its own context cancellations. That said, maybe in this case we might want a longer cancellation for the pro pilot so it'd make sense to break up the two.
|
Another more robust option is to do fetches within a transaction that sets a |
Summary
Fix
StandardPilot.JobGetAvailableso a stalled fetch does not hang a producer indefinitely.Problem
producer.dispatchWorkintentionally strips cancellation from the work context before fetching jobs so an in-flight fetch is allowed to complete during shutdown:producer.go:744-766That is reasonable, but
StandardPilot.JobGetAvailableforwarded directly toexec.JobGetAvailablewith no timeout at all:rivershared/riverpilot/standard_pilot.go:18-22This meant a stalled driver call could block a standard-pilot producer forever. The pro pilot already applies per-attempt fetch timeouts, so the standard pilot was the outlier.
Change
Add a 10-second timeout inside
StandardPilot.JobGetAvailablebefore calling the driver.This keeps the existing shutdown semantics intact:
dispatchWorkThe timeout is local to the standard pilot so there is no driver SQL change and no producer state-machine change.
Testing
rivershared/riverpilot/standard_pilot_test.goMaxToLock <= 0no-op behaviorJobGetAvailablecall timing out withcontext.DeadlineExceededVerification
Locally verified with:
GOPROXY=https://goproxy.cn,direct GOSUMDB=off go test ./rivershared/riverpilot -count=1Closes #1026.