diff --git a/.server-changes/mollifier-trigger.md b/.server-changes/mollifier-trigger.md new file mode 100644 index 00000000000..a289972ef87 --- /dev/null +++ b/.server-changes/mollifier-trigger.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier trigger-time decisions: gate `engine.trigger`, mollify bursts into the buffer, claim idempotency keys, and read-fallback for buffered runs. diff --git a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts index a6fe5babe2c..e744d20ee9c 100644 --- a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts +++ b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts @@ -2,13 +2,42 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic"; import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; import type { RunEngine } from "~/v3/runEngine.server"; import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server"; import type { TraceEventConcern, TriggerTaskRequest } from "../types"; +// Claim ownership context returned to the caller when the +// IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the +// winning runId on pipeline success (`publishClaim`) or release the +// claim on failure (`releaseClaim`). +export type ClaimedIdempotency = { + envId: string; + taskIdentifier: string; + idempotencyKey: string; + // Ownership token from `claimOrAwait`. The caller's trigger pipeline + // MUST thread this into publishClaim/releaseClaim so the buffer's + // compare-and-act protects the slot against a stale predecessor. + token: string; +}; + export type IdempotencyKeyConcernResult = | { isCached: true; run: TaskRun } - | { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date }; + | { + isCached: false; + idempotencyKey?: string; + idempotencyKeyExpiresAt?: Date; + // Set when this trigger holds a pre-gate claim. The caller's + // trigger pipeline MUST resolve the claim by either publishing + // the runId on success or releasing on failure. Undefined when + // the request has no idempotency key, when the buffer is + // unavailable, or when the request is a triggerAndWait (claim + // path skipped per plan doc). + claim?: ClaimedIdempotency; + }; export class IdempotencyKeyConcern { constructor( @@ -17,6 +46,47 @@ export class IdempotencyKeyConcern { private readonly traceEventConcern: TraceEventConcern ) {} + // Buffer-side idempotency dedup. Resolves an idempotency key against the + // mollifier buffer when PG missed. Returns a SyntheticRun cast to + // TaskRun so the route handler (which only reads run.id / run.friendlyId) + // can echo the buffered run's friendlyId as a cached hit. Returns null + // for any failure or miss — buffer outages must not 500 the trigger + // hot path; we fail open to "no cache hit" and let the request through. + private async findBufferedRunWithIdempotency( + environmentId: string, + organizationId: string, + taskIdentifier: string, + idempotencyKey: string, + ): Promise { + const buffer = getMollifierBuffer(); + if (!buffer) return null; + + let bufferedRunId: string | null; + try { + bufferedRunId = await buffer.lookupIdempotency({ + envId: environmentId, + taskIdentifier, + idempotencyKey, + }); + } catch (err) { + logger.error("IdempotencyKeyConcern: buffer lookupIdempotency failed", { + environmentId, + taskIdentifier, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + if (!bufferedRunId) return null; + + const synthetic = await findRunByIdWithMollifierFallback({ + runId: bufferedRunId, + environmentId, + organizationId, + }); + if (!synthetic) return null; + return synthetic as unknown as TaskRun; + } + async handleTriggerRequest( request: TriggerTaskRequest, parentStore: string | undefined @@ -44,6 +114,25 @@ export class IdempotencyKeyConcern { }) : undefined; + // Buffer fallback per the mollifier-idempotency design. PG missed — + // the same key may belong to a buffered run that hasn't materialised + // yet. Skipped when `resumeParentOnCompletion` is set: blocking a + // parent on a buffered child via waitpoint requires a PG row that + // doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger + // still dedupes the trigger itself; the waitpoint just doesn't fire + // for this rare race window. + if (!existingRun && idempotencyKey && !request.body.options?.resumeParentOnCompletion) { + const buffered = await this.findBufferedRunWithIdempotency( + request.environment.id, + request.environment.organizationId, + request.taskId, + idempotencyKey, + ); + if (buffered) { + return { isCached: true, run: buffered }; + } + } + if (existingRun) { // The idempotency key has expired if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) { @@ -133,6 +222,100 @@ export class IdempotencyKeyConcern { return { isCached: true, run: existingRun }; } + // Pre-gate claim — closes the PG+buffer race during gate transition. + // All same-key triggers serialise here before evaluateGate decides + // PG-pass-through vs mollify. Skipped for triggerAndWait + // (resumeParentOnCompletion) — that path bypasses the gate via F4 + // and its existing PG-side dedup is sufficient. + if (!request.body.options?.resumeParentOnCompletion) { + const ttlSeconds = Math.max( + 1, + Math.min( + 30, + Math.ceil((idempotencyKeyExpiresAt.getTime() - Date.now()) / 1000), + ), + ); + const outcome = await claimOrAwait({ + envId: request.environment.id, + taskIdentifier: request.taskId, + idempotencyKey, + ttlSeconds, + }); + if (outcome.kind === "resolved") { + // Another concurrent trigger committed first. Re-resolve via the + // existing checks: writer-side PG findFirst first (defeats + // replica lag), then buffer fallback for the buffered case. + const writerRun = await this.prisma.taskRun.findFirst({ + where: { + runtimeEnvironmentId: request.environment.id, + idempotencyKey, + taskIdentifier: request.taskId, + }, + include: { associatedWaitpoint: true }, + }); + if (writerRun) { + return { isCached: true, run: writerRun }; + } + const buffered = await this.findBufferedRunWithIdempotency( + request.environment.id, + request.environment.organizationId, + request.taskId, + idempotencyKey, + ); + if (buffered) { + return { isCached: true, run: buffered }; + } + // Claim resolved to a runId nothing can find — the run was + // genuinely lost (claimant errored after publish, drain failed, + // or both the PG row and buffer entry TTL'd out). This is + // terminal, not transient: `lookupIdempotency` self-heals a + // dangling pointer, and `ack` keeps the entry hash as a + // read-fallback past the PG write, so re-polling cannot conjure + // a run that is gone. Falling through to a fresh trigger is the + // correct recovery. + // + // Why falling through claimless is safe (no duplicate runs): + // concurrent triggers that also fall through here converge on a + // single run via the same dedup backstops the claim layer relies + // on — the PG unique constraint on the idempotency key + // (RunDuplicateIdempotencyKeyError → retry resolves to the + // winner) for the pass-through path, and `accept`'s idempotency + // SETNX (`duplicate_idempotency`) for the mollify path. Once the + // first fall-through commits a run, later callers find it via the + // writer-PG / buffer lookups above despite the stale `resolved:` + // slot, which the slot's TTL clears within ~30s. The residual + // cost is a few redundant (deduped) trigger attempts in that + // window, not duplicate runs. + logger.warn("idempotency claim resolved but runId not findable", { + envId: request.environment.id, + taskIdentifier: request.taskId, + claimedRunId: outcome.runId, + }); + } + if (outcome.kind === "timed_out") { + throw new ServiceValidationError( + "Idempotency claim resolution timed out", + 503, + ); + } + if (outcome.kind === "claimed") { + // Caller MUST publish/release. Signalled via the result's + // `claim` field, including the ownership token so the buffer + // can compare-and-act on the slot we now own. + return { + isCached: false, + idempotencyKey, + idempotencyKeyExpiresAt, + claim: { + envId: request.environment.id, + taskIdentifier: request.taskId, + idempotencyKey, + token: outcome.token, + }, + }; + } + } + return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; } } diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 111ab17f4c1..306c3880ae2 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -30,7 +30,14 @@ import type { TriggerTaskServiceResult, } from "../../v3/services/triggerTask.server"; import { clampMaxDuration } from "../../v3/utils/maxDuration"; -import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server"; +import { + IdempotencyKeyConcern, + type ClaimedIdempotency, +} from "../concerns/idempotencyKeys.server"; +import { + publishClaim as publishMollifierClaim, + releaseClaim as releaseMollifierClaim, +} from "~/v3/mollifier/idempotencyClaim.server"; import type { PayloadProcessor, QueueManager, @@ -50,8 +57,8 @@ import { getMollifierBuffer as defaultGetMollifierBuffer, type MollifierGetBuffer, } from "~/v3/mollifier/mollifierBuffer.server"; -import { buildBufferedTriggerPayload } from "~/v3/mollifier/bufferedTriggerPayload.server"; -import { serialiseSnapshot } from "@trigger.dev/redis-worker"; +import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server"; +import { type MollifierBuffer } from "@trigger.dev/redis-worker"; import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server"; class NoopTriggerRacepointSystem implements TriggerRacepointSystem { @@ -124,7 +131,15 @@ export class RunEngineTriggerTaskService { options?: TriggerTaskServiceOptions; attempt?: number; }): Promise { - return await startSpan(this.tracer, "RunEngineTriggerTaskService.call()", async (span) => { + // Pre-gate idempotency-claim ownership. Set inside the span when + // `IdempotencyKeyConcern.handleTriggerRequest` returns `claim: + // {...}`. The try/catch below resolves it once the span finishes. + let idempotencyClaim: ClaimedIdempotency | undefined; + try { + const result = await startSpan( + this.tracer, + "RunEngineTriggerTaskService.call()", + async (span) => { span.setAttribute("taskId", taskId); span.setAttribute("attempt", attempt); @@ -247,7 +262,15 @@ export class RunEngineTriggerTaskService { return idempotencyKeyConcernResult; } - const { idempotencyKey, idempotencyKeyExpiresAt } = idempotencyKeyConcernResult; + const { idempotencyKey, idempotencyKeyExpiresAt, claim: claimResult } = + idempotencyKeyConcernResult; + + // If we own an idempotency claim, the trigger pipeline below MUST + // resolve it — publish on success so waiters see our runId, + // release on error so the next claimant can retry. Stored in an + // outer scope so the try/catch at the bottom of `callV2` can act + // on whichever return path or throw the pipeline takes. + idempotencyClaim = claimResult; if (idempotencyKey) { await this.triggerRacepointSystem.waitForRacepoint({ @@ -343,25 +366,6 @@ export class RunEngineTriggerTaskService { taskKind: taskKind ?? "STANDARD", }; - // Short-circuit before the gate when mollifier is globally off (the - // default for every deployment that hasn't opted in). Avoids the - // GateInputs allocation, the deps spread inside `evaluateGate`, and - // the `mollifier.decisions{outcome=pass_through}` OTel increment on - // every trigger — `triggerTask` is the highest-throughput code path - // in the system. The check goes through a DI'd predicate so unit - // tests that inject a custom `evaluateGate` can also override the - // gate-on check (the default reads `env.TRIGGER_MOLLIFIER_ENABLED`, - // which is "0" in CI where no .env file is present). - const mollifierOutcome: GateOutcome | null = this.isMollifierGloballyEnabled() - ? await this.evaluateGate({ - envId: environment.id, - orgId: environment.organizationId, - taskId, - orgFeatureFlags: - (environment.organization.featureFlags as Record | null) ?? null, - }) - : null; - try { return await this.traceEventConcern.traceRun( triggerRequest, @@ -372,148 +376,170 @@ export class RunEngineTriggerTaskService { event.setAttribute("runId", runFriendlyId); span.setAttribute("runId", runFriendlyId); - const payloadPacket = await this.payloadProcessor.process(triggerRequest); - - // Dual-write: if the org has the mollifier feature flag - // enabled and the per-env trip evaluator says divert, write the - // canonical replay payload to the buffer AND continue through - // engine.trigger as normal. The buffer entry is an audit/preview - // copy; the drainer's no-op handler consumes it to prove the - // dequeue mechanism works. A later change replaces engine.trigger - // (below) with a synthesised 200 response and relies on the - // drainer to perform the Postgres write via replay. + // Short-circuit when mollifier is globally off (the default + // for every deployment that hasn't opted in). Avoids the + // GateInputs allocation, the deps spread inside `evaluateGate`, + // and the `mollifier.decisions{outcome=pass_through}` OTel + // increment on every trigger — `triggerTask` is the + // highest-throughput code path in the system. The check goes + // through a DI'd predicate so unit tests that inject a custom + // `evaluateGate` can also override the gate-on check (the + // default reads `env.TRIGGER_MOLLIFIER_ENABLED`, which is "0" + // in CI where no .env file is present). + const mollifierOutcome: GateOutcome | null = this.isMollifierGloballyEnabled() + ? await this.evaluateGate({ + envId: environment.id, + orgId: environment.organizationId, + taskId, + orgFeatureFlags: + (environment.organization.featureFlags as Record | null) ?? + null, + options: { + debounce: body.options?.debounce, + oneTimeUseToken: options.oneTimeUseToken, + parentTaskRunId: body.options?.parentRunId, + resumeParentOnCompletion: body.options?.resumeParentOnCompletion, + }, + }) + : null; + + // When the gate says mollify, write the engine.trigger input + // snapshot into the Redis buffer and return a synthesised + // TriggerTaskServiceResult. The customer never waits on + // Postgres; the drainer materialises the run later by replaying + // engine.trigger against the snapshot. The run span has already + // been opened by traceRun above (PARTIAL event in ClickHouse), + // so its traceId/spanId live in the snapshot and the drainer's + // `mollifier.drained` span parents on the same trace — buffered + // runs become visible in the dashboard's trace view immediately, + // not only after the drainer fires. if (mollifierOutcome?.action === "mollify") { - const buffer = this.getMollifierBuffer(); - if (buffer) { - const canonicalPayload = buildBufferedTriggerPayload({ + const mollifierBuffer = this.getMollifierBuffer(); + if (mollifierBuffer && !body.options?.debounce) { + event.setAttribute("mollifier.reason", mollifierOutcome.decision.reason); + event.setAttribute("mollifier.count", String(mollifierOutcome.decision.count)); + event.setAttribute( + "mollifier.threshold", + String(mollifierOutcome.decision.threshold) + ); + event.setAttribute("taskRunId", runFriendlyId); + + const payloadPacket = await this.payloadProcessor.process(triggerRequest); + + const engineTriggerInput = this.#buildEngineTriggerInput({ runFriendlyId, + environment, + idempotencyKey, + idempotencyKeyExpiresAt, + body, + options, + queueName, + lockedQueueId, + workerQueue, + enableFastPath, + lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, + delayUntil, + ttl, + metadataPacket, + tags, + depth, + parentRun: parentRun ?? undefined, + annotations, + planType, taskId, + payloadPacket, + traceContext: this.#propagateExternalTraceContext( + event.traceContext, + parentRun?.traceContext, + event.traceparent?.spanId + ), + traceId: event.traceId, + spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" + ? undefined + : event.traceparent?.spanId, + taskEventStore: store, + }); + + const result = await mollifyTrigger({ + runFriendlyId, + environmentId: environment.id, + organizationId: environment.organizationId, + engineTriggerInput, + decision: mollifierOutcome.decision, + buffer: mollifierBuffer, + // Idempotency-key triple wires the buffer's SETNX into + // the trigger-time dedup symmetric with PG. + idempotencyKey, + taskIdentifier: taskId, + }); + + logger.debug("mollifier.buffered", { + runId: runFriendlyId, envId: environment.id, - envType: environment.type, - envSlug: environment.slug, orgId: environment.organizationId, - orgSlug: environment.organization.slug, - projectId: environment.projectId, - projectRef: environment.project.externalRef, - body, - idempotencyKey: idempotencyKey ?? null, - idempotencyKeyExpiresAt: idempotencyKey - ? idempotencyKeyExpiresAt ?? null - : null, - tags, - parentRunFriendlyId: parentRun?.friendlyId ?? null, - traceContext: event.traceContext, - triggerSource, - triggerAction, - serviceOptions: options, - createdAt: new Date(), + taskId, + reason: mollifierOutcome.decision.reason, }); - try { - const serialisedPayload = serialiseSnapshot(canonicalPayload); - await buffer.accept({ - runId: runFriendlyId, - envId: environment.id, - orgId: environment.organizationId, - payload: serialisedPayload, - }); - // Light log on the hot path — keep this synchronous work - // O(1) per trigger. The drainer computes the payload hash - // off-path; operators correlate `mollifier.buffered` → - // `mollifier.drained` by runId. - logger.debug("mollifier.buffered", { - runId: runFriendlyId, - envId: environment.id, - orgId: environment.organizationId, - taskId, - payloadBytes: serialisedPayload.length, - }); - } catch (err) { - // Fail-open: buffer write must never block the customer's - // trigger. engine.trigger below is still the primary write - // path here — the customer still gets a valid run. - logger.error("mollifier.buffer_accept_failed", { - runId: runFriendlyId, - envId: environment.id, - taskId, - err: err instanceof Error ? err.message : String(err), - }); - } + // Synthetic result is structurally narrower than the full + // TaskRun; the route handler only reads + // `result.run.friendlyId`. traceRun flushes the PARTIAL + // run-span event to ClickHouse on callback return. + return result as unknown as TriggerTaskServiceResult; + } + if (!mollifierBuffer) { + logger.warn( + "mollifier gate said mollify but buffer is null — falling through to pass-through" + ); } } + const payloadPacket = await this.payloadProcessor.process(triggerRequest); + + const baseEngineInput = this.#buildEngineTriggerInput({ + runFriendlyId, + environment, + idempotencyKey, + idempotencyKeyExpiresAt, + body, + options, + queueName, + lockedQueueId, + workerQueue, + enableFastPath, + lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, + delayUntil, + ttl, + metadataPacket, + tags, + depth, + parentRun: parentRun ?? undefined, + annotations, + planType, + taskId, + payloadPacket, + traceContext: this.#propagateExternalTraceContext( + event.traceContext, + parentRun?.traceContext, + event.traceparent?.spanId + ), + traceId: event.traceId, + spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId, + taskEventStore: store, + }); + const taskRun = await this.engine.trigger( { - friendlyId: runFriendlyId, - environment: environment, - idempotencyKey, - idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, - idempotencyKeyOptions: body.options?.idempotencyKeyOptions, - taskIdentifier: taskId, - payload: payloadPacket.data ?? "", - payloadType: payloadPacket.dataType, - context: body.context, - traceContext: this.#propagateExternalTraceContext( - event.traceContext, - parentRun?.traceContext, - event.traceparent?.spanId - ), - traceId: event.traceId, - spanId: event.spanId, - parentSpanId: - options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId, - replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId, - lockedToVersionId: lockedToBackgroundWorker?.id, - taskVersion: lockedToBackgroundWorker?.version, - sdkVersion: lockedToBackgroundWorker?.sdkVersion, - cliVersion: lockedToBackgroundWorker?.cliVersion, - concurrencyKey: body.options?.concurrencyKey, - queue: queueName, - lockedQueueId, - workerQueue, - enableFastPath, - isTest: body.options?.test ?? false, - delayUntil, - queuedAt: delayUntil ? undefined : new Date(), - maxAttempts: body.options?.maxAttempts, - taskEventStore: store, - ttl, - tags, - oneTimeUseToken: options.oneTimeUseToken, - parentTaskRunId: parentRun?.id, - rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, - batch: options?.batchId - ? { - id: options.batchId, - index: options.batchIndex ?? 0, - } - : undefined, - resumeParentOnCompletion: body.options?.resumeParentOnCompletion, - depth, - metadata: metadataPacket?.data, - metadataType: metadataPacket?.dataType, - seedMetadata: metadataPacket?.data, - seedMetadataType: metadataPacket?.dataType, - maxDurationInSeconds: body.options?.maxDuration - ? clampMaxDuration(body.options.maxDuration) - : undefined, - machine: body.options?.machine, - priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, - queueTimestamp: - options.queueTimestamp ?? - (parentRun && body.options?.resumeParentOnCompletion - ? parentRun.queueTimestamp ?? undefined - : undefined), - scheduleId: options.scheduleId, - scheduleInstanceId: options.scheduleInstanceId, - createdAt: options.overrideCreatedAt, - bulkActionId: body.options?.bulkActionId, - planType, - realtimeStreamsVersion: options.realtimeStreamsVersion, - streamBasinName: environment.organization.streamBasinName, - debounce: body.options?.debounce, - annotations, - // When debouncing with triggerAndWait, create a span for the debounced trigger + ...baseEngineInput, + // onDebounced is a closure over webapp state (triggerRequest + + // traceEventConcern) and can't be serialised into the mollifier + // snapshot. The pass-through path attaches it here; the drainer + // path replays without it. The debounce and triggerAndWait gate + // bypasses ensure neither reaches the mollify branch. onDebounced: body.options?.debounce && body.options?.resumeParentOnCompletion ? async ({ existingRun, waitpoint, debounceKey }) => { @@ -591,7 +617,131 @@ export class RunEngineTriggerTaskService { throw error; } - }); + }, + ); + // Pipeline returned successfully — publish the claim if we held + // one. Waiters polling for our key resolve to this runId. + if (idempotencyClaim && result?.run?.friendlyId) { + await publishMollifierClaim({ + envId: idempotencyClaim.envId, + taskIdentifier: idempotencyClaim.taskIdentifier, + idempotencyKey: idempotencyClaim.idempotencyKey, + token: idempotencyClaim.token, + runId: result.run.friendlyId, + }); + } + return result; + } catch (err) { + // Pipeline threw — release the claim so the next claimant can + // retry. Re-throw so the caller sees the original error. + if (idempotencyClaim) { + await releaseMollifierClaim(idempotencyClaim); + } + throw err; + } + } + + // Build the engine.trigger() input object from the values gathered during + // this.call(). Extracted so the mollify path can construct the + // same input shape without re-entering the trace-run span. The pass-through + // path spreads this result and attaches `onDebounced` inline; the mollify + // path serialises it into the buffer for drainer replay. + #buildEngineTriggerInput(args: { + runFriendlyId: string; + environment: AuthenticatedEnvironment; + idempotencyKey?: string; + idempotencyKeyExpiresAt?: Date; + body: TriggerTaskRequest["body"]; + options: TriggerTaskServiceOptions; + queueName: string; + lockedQueueId?: string; + workerQueue?: string; + enableFastPath: boolean; + lockedToBackgroundWorker?: { id: string; version: string; sdkVersion: string; cliVersion: string }; + delayUntil?: Date; + ttl?: string; + metadataPacket?: { data?: string; dataType: string }; + tags: string[]; + depth: number; + parentRun?: { id: string; rootTaskRunId?: string | null; queueTimestamp?: Date | null; taskEventStore?: string }; + annotations: { + triggerSource: string; + triggerAction: string; + rootTriggerSource: string; + rootScheduleId?: string | undefined; + }; + planType?: string; + taskId: string; + payloadPacket: { data?: string; dataType: string }; + traceContext: TriggerTraceContext; + traceId: string; + spanId: string; + parentSpanId: string | undefined; + taskEventStore: string; + }) { + return { + friendlyId: args.runFriendlyId, + environment: args.environment, + idempotencyKey: args.idempotencyKey, + idempotencyKeyExpiresAt: args.idempotencyKey ? args.idempotencyKeyExpiresAt : undefined, + idempotencyKeyOptions: args.body.options?.idempotencyKeyOptions, + taskIdentifier: args.taskId, + payload: args.payloadPacket.data ?? "", + payloadType: args.payloadPacket.dataType, + context: args.body.context, + traceContext: args.traceContext, + traceId: args.traceId, + spanId: args.spanId, + parentSpanId: args.parentSpanId, + replayedFromTaskRunFriendlyId: args.options.replayedFromTaskRunFriendlyId, + lockedToVersionId: args.lockedToBackgroundWorker?.id, + taskVersion: args.lockedToBackgroundWorker?.version, + sdkVersion: args.lockedToBackgroundWorker?.sdkVersion, + cliVersion: args.lockedToBackgroundWorker?.cliVersion, + concurrencyKey: args.body.options?.concurrencyKey, + queue: args.queueName, + lockedQueueId: args.lockedQueueId, + workerQueue: args.workerQueue, + enableFastPath: args.enableFastPath, + isTest: args.body.options?.test ?? false, + delayUntil: args.delayUntil, + queuedAt: args.delayUntil ? undefined : new Date(), + maxAttempts: args.body.options?.maxAttempts, + taskEventStore: args.taskEventStore, + ttl: args.ttl, + tags: args.tags, + oneTimeUseToken: args.options.oneTimeUseToken, + parentTaskRunId: args.parentRun?.id, + rootTaskRunId: args.parentRun?.rootTaskRunId ?? args.parentRun?.id, + batch: args.options?.batchId + ? { id: args.options.batchId, index: args.options.batchIndex ?? 0 } + : undefined, + resumeParentOnCompletion: args.body.options?.resumeParentOnCompletion, + depth: args.depth, + metadata: args.metadataPacket?.data, + metadataType: args.metadataPacket?.dataType, + seedMetadata: args.metadataPacket?.data, + seedMetadataType: args.metadataPacket?.dataType, + maxDurationInSeconds: args.body.options?.maxDuration + ? clampMaxDuration(args.body.options.maxDuration) + : undefined, + machine: args.body.options?.machine, + priorityMs: args.body.options?.priority ? args.body.options.priority * 1_000 : undefined, + queueTimestamp: + args.options.queueTimestamp ?? + (args.parentRun && args.body.options?.resumeParentOnCompletion + ? args.parentRun.queueTimestamp ?? undefined + : undefined), + scheduleId: args.options.scheduleId, + scheduleInstanceId: args.options.scheduleInstanceId, + createdAt: args.options.overrideCreatedAt, + bulkActionId: args.body.options?.bulkActionId, + planType: args.planType, + realtimeStreamsVersion: args.options.realtimeStreamsVersion, + streamBasinName: args.environment.organization.streamBasinName, + debounce: args.body.options?.debounce, + annotations: args.annotations, + }; } #propagateExternalTraceContext( diff --git a/apps/webapp/app/v3/mollifier/idempotencyClaim.server.ts b/apps/webapp/app/v3/mollifier/idempotencyClaim.server.ts new file mode 100644 index 00000000000..47c9733c927 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/idempotencyClaim.server.ts @@ -0,0 +1,218 @@ +import { randomUUID } from "node:crypto"; +import type { + IdempotencyClaimResult, + IdempotencyLookupInput, + MollifierBuffer, +} from "@trigger.dev/redis-worker"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +// Tunables. The TTL on the claim key is bounded by typical trigger-pipeline +// dwell; long enough that a slow PG insert doesn't expire mid-flight, +// short enough that a crashed claimant unblocks waiters quickly. +export const DEFAULT_CLAIM_TTL_SECONDS = 30; +// safetyNetMs caps how long a waiter blocks before returning timed_out. +// Matches the mutateWithFallback safety net so SDK retry policies don't +// have to special-case this path. +export const DEFAULT_CLAIM_WAIT_MS = 5_000; +export const DEFAULT_CLAIM_POLL_MS = 25; + +export type ClaimOrAwaitOutcome = + // We own the claim. `token` MUST be passed to publishClaim/releaseClaim + // so the buffer can compare-and-act against our ownership marker — a + // late release from a previous claimant whose TTL expired cannot + // erase our slot. + | { kind: "claimed"; token: string } + | { kind: "resolved"; runId: string } // someone else's runId; caller returns isCached:true + | { kind: "timed_out" }; + +export type ClaimOrAwaitInput = IdempotencyLookupInput & { + ttlSeconds?: number; + safetyNetMs?: number; + pollStepMs?: number; + abortSignal?: AbortSignal; + // Test injection. + buffer?: MollifierBuffer | null; + now?: () => number; + sleep?: (ms: number) => Promise; + // Test override for the ownership-token generator. Defaults to + // `crypto.randomUUID()`. Tests pass a deterministic value so they + // can assert publish/release pass-through. + generateToken?: () => string; +}; + +// Pre-gate Redis claim. All same-key triggers serialise through here +// before the trigger pipeline runs. Returning `resolved` short-circuits +// the trigger entirely — the caller responds with the cached runId. +// Returning `claimed` means we own the claim and MUST publish the +// winning runId on success (`publishClaim`) or release the claim on +// failure (`releaseClaim`). +// +// Failure modes: +// - Redis down at claim time: returns `claimed` (fail open, no +// coordination). Customer is no worse than today's race; the +// PG unique constraint is the eventual arbiter. +// - Claimant crashes mid-pipeline: claim TTL expires, waiters +// eventually time out, SDK retries. +// - PG/buffer publish failure: waiters time out and SDK retries; next +// attempt sees the eventual PG/buffer state via existing +// IdempotencyKeyConcern PG-first lookup. +export async function claimOrAwait(input: ClaimOrAwaitInput): Promise { + const buffer = input.buffer === undefined ? getMollifierBuffer() : input.buffer; + if (!buffer) { + // Mollifier disabled / buffer construction failed. Fall open — + // caller proceeds with the trigger pipeline (PG unique constraint + // backstop). The token is never read in this case (publish/release + // are buffer-null no-ops downstream), so we skip the default + // `randomUUID()` to keep the mollifier-OFF hot path allocation-free + // for idempotency-keyed triggers — `triggerTask` is the + // highest-throughput code path in the system. A test-injected + // generator is still honoured for deterministic assertions. + return { kind: "claimed", token: input.generateToken ? input.generateToken() : "" }; + } + const generateToken = input.generateToken ?? randomUUID; + // Generate the ownership token up front so the retry loop reuses it + // — we're the same logical claimant across attempts; only the slot + // owner changes between releases. + const token = generateToken(); + const ttlSeconds = input.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS; + const safetyNetMs = input.safetyNetMs ?? DEFAULT_CLAIM_WAIT_MS; + const pollStepMs = input.pollStepMs ?? DEFAULT_CLAIM_POLL_MS; + const now = input.now ?? Date.now; + const sleep = input.sleep ?? defaultSleep; + + const lookupInput: IdempotencyLookupInput = { + envId: input.envId, + taskIdentifier: input.taskIdentifier, + idempotencyKey: input.idempotencyKey, + }; + + // Initial claim attempt. Most production-path calls resolve here on + // the first call (either we win, or the key is already resolved from + // a prior burst). + let result: IdempotencyClaimResult; + try { + result = await buffer.claimIdempotency({ ...lookupInput, token, ttlSeconds }); + } catch (err) { + logger.warn("idempotency claim failed (fail-open)", { + envId: input.envId, + taskIdentifier: input.taskIdentifier, + err: err instanceof Error ? err.message : String(err), + }); + return { kind: "claimed", token }; + } + + if (result.kind === "claimed") return { kind: "claimed", token }; + if (result.kind === "resolved") return result; + + // result.kind === "pending" — wait/poll loop. May see the value flip + // to "resolved" (winner published), the key vanish (winner released + // on error → retry claim), or stay "pending" until the safety net. + const deadline = now() + safetyNetMs; + while (now() < deadline) { + if (input.abortSignal?.aborted) return { kind: "timed_out" }; + await sleep(pollStepMs); + + let current: IdempotencyClaimResult | null; + try { + current = await buffer.readClaim(lookupInput); + } catch (err) { + // Transient read failure — keep polling until deadline. + logger.warn("idempotency claim read failed mid-poll", { + err: err instanceof Error ? err.message : String(err), + }); + continue; + } + + if (current === null) { + // Claimant released on error. Re-attempt the claim — one of the + // waiters will win, the rest see "pending" again. Reuse our token: + // we're still the same logical claimant, just contending for a + // freshly empty slot. + try { + const retry = await buffer.claimIdempotency({ ...lookupInput, token, ttlSeconds }); + if (retry.kind === "claimed") return { kind: "claimed", token }; + if (retry.kind === "resolved") return retry; + // "pending" again → keep polling. + } catch (err) { + logger.warn("idempotency claim retry failed", { + err: err instanceof Error ? err.message : String(err), + }); + return { kind: "claimed", token }; + } + continue; + } + if (current.kind === "resolved") return current; + // current.kind === "pending" → keep polling. + } + return { kind: "timed_out" }; +} + +// Publish the winning runId so waiters resolve. Best-effort: failure +// here means waiters will time out and the SDK will retry, which will +// then find the row via the existing IdempotencyKeyConcern PG-first +// check. +export async function publishClaim(input: { + envId: string; + taskIdentifier: string; + idempotencyKey: string; + // Ownership token from the `claimed` outcome. Buffer compare-and-sets + // on this so a publish from a stale claimant (TTL expired, another + // claimant moved in) is a no-op rather than overwriting their claim. + token: string; + runId: string; + ttlSeconds?: number; + buffer?: MollifierBuffer | null; +}): Promise { + const buffer = input.buffer === undefined ? getMollifierBuffer() : input.buffer; + if (!buffer) return; + const ttlSeconds = input.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS; + try { + await buffer.publishClaim({ + envId: input.envId, + taskIdentifier: input.taskIdentifier, + idempotencyKey: input.idempotencyKey, + token: input.token, + runId: input.runId, + ttlSeconds, + }); + } catch (err) { + logger.warn("idempotency claim publish failed", { + envId: input.envId, + taskIdentifier: input.taskIdentifier, + err: err instanceof Error ? err.message : String(err), + }); + } +} + +// Release on pipeline failure. Best-effort. If the DEL fails, the claim +// TTL is the safety net — waiters time out, SDK retries. +export async function releaseClaim(input: { + envId: string; + taskIdentifier: string; + idempotencyKey: string; + // Ownership token from the `claimed` outcome. Buffer compare-and- + // deletes on this so a release from a stale claimant whose TTL + // expired can't wipe a new owner's claim. + token: string; + buffer?: MollifierBuffer | null; +}): Promise { + const buffer = input.buffer === undefined ? getMollifierBuffer() : input.buffer; + if (!buffer) return; + try { + await buffer.releaseClaim({ + envId: input.envId, + taskIdentifier: input.taskIdentifier, + idempotencyKey: input.idempotencyKey, + token: input.token, + }); + } catch (err) { + logger.warn("idempotency claim release failed", { + err: err instanceof Error ? err.message : String(err), + }); + } +} + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/webapp/app/v3/mollifier/mollifierGate.server.ts b/apps/webapp/app/v3/mollifier/mollifierGate.server.ts index 766d499dd4a..63146b4c323 100644 --- a/apps/webapp/app/v3/mollifier/mollifierGate.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierGate.server.ts @@ -46,6 +46,17 @@ export type GateInputs = { // the pattern used by `canAccessAi`, `canAccessPrivateConnections`, and the // compute-template beta gate. orgFeatureFlags: Record | null; + // Trigger options that drive the debounce / OTU / triggerAndWait + // bypasses. The mollify path can't + // serialise stateful callbacks (debounce), can't safely break OTU's + // synchronous-rejection contract, and shouldn't intercept single + // triggerAndWait (batchTriggerAndWait still funnels through per item). + options?: { + debounce?: unknown; + oneTimeUseToken?: string; + parentTaskRunId?: string; + resumeParentOnCompletion?: boolean; + }; }; export type TripEvaluator = (inputs: GateInputs) => Promise; @@ -141,6 +152,28 @@ export async function evaluateGate( ): Promise { const d = { ...defaultGateDependencies, ...deps }; + // Debounce bypass. onDebounced is a closure over webapp state and + // can't be snapshotted into the buffer for drainer replay. Skip before the + // trip evaluator so debounce traffic is never counted against the rate. + if (inputs.options?.debounce) { + d.recordDecision("pass_through"); + return { action: "pass_through" }; + } + // OneTimeUseToken bypass. OTU is a security feature on the PUBLIC_JWT + // auth path; its synchronous-rejection contract is materially worse to + // break than the idempotency-key contract. + if (inputs.options?.oneTimeUseToken) { + d.recordDecision("pass_through"); + return { action: "pass_through" }; + } + // Single triggerAndWait bypass. batchTriggerAndWait still funnels + // through TriggerTaskService.call per item so the dominant burst pattern + // remains covered. + if (inputs.options?.parentTaskRunId && inputs.options?.resumeParentOnCompletion) { + d.recordDecision("pass_through"); + return { action: "pass_through" }; + } + if (!d.isMollifierEnabled()) { d.recordDecision("pass_through"); return { action: "pass_through" }; diff --git a/apps/webapp/app/v3/mollifier/mollifierMollify.server.ts b/apps/webapp/app/v3/mollifier/mollifierMollify.server.ts new file mode 100644 index 00000000000..bca9a5e129c --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierMollify.server.ts @@ -0,0 +1,98 @@ +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { serialiseMollifierSnapshot, type MollifierSnapshot } from "./mollifierSnapshot.server"; +import type { TripDecision } from "./mollifierGate.server"; + +export type MollifyNotice = { + code: "mollifier.queued"; + message: string; + docs: string; +}; + +export type MollifySyntheticResult = { + // `id` is the canonical TaskRun primary key derived from `friendlyId` + // via `RunId.fromFriendlyId`. Downstream consumers in the trigger + // route — notably `saveRequestIdempotency` — index the request- + // idempotency cache by this id; without it the cache stores + // `undefined` and Prisma's `findFirst({ where: { id: undefined } })` + // on retry strips the predicate and returns an arbitrary TaskRun + // (potential cross-tenant leak). Always populated. + // + // `spanId` is the root-span id allocated at gate-accept time and + // stored in the snapshot. Callers like the dashboard's Test action + // use it to build a `v3RunSpanPath` URL that auto-opens the right + // details panel — without it, the buffered run lands on the + // run-detail page with no span selected (parity gap with PG runs). + run: { id: string; friendlyId: string; spanId: string }; + error: undefined; + // The race-loser path: if accept's SETNX hit an existing + // buffered run with the same (env, task, idempotencyKey), the + // response echoes the winner's runId with isCached=true. The + // mollifier-queued notice is only attached for the happy accept. + isCached: boolean; + notice?: MollifyNotice; +}; + +const NOTICE: MollifyNotice = { + code: "mollifier.queued", + message: + "Trigger accepted into burst buffer. Consider batchTrigger for fan-outs of 100+.", + docs: "https://trigger.dev/docs/triggering#burst-handling", +}; + +export async function mollifyTrigger(args: { + runFriendlyId: string; + environmentId: string; + organizationId: string; + engineTriggerInput: MollifierSnapshot; + decision: Extract; + buffer: MollifierBuffer; + // Optional idempotency context. When both are passed, accept SETNXes + // the lookup so the buffered window participates in trigger-time + // dedup symmetrically with PG. + idempotencyKey?: string; + taskIdentifier?: string; +}): Promise { + const result = await args.buffer.accept({ + runId: args.runFriendlyId, + envId: args.environmentId, + orgId: args.organizationId, + payload: serialiseMollifierSnapshot(args.engineTriggerInput), + idempotencyKey: args.idempotencyKey, + taskIdentifier: args.taskIdentifier, + }); + + if (result.kind === "duplicate_idempotency") { + // Race loser. Echo the winner's runId so the SDK's response shape + // matches PG-side idempotency cache hits. The winner's spanId isn't + // readily available without a second buffer fetch; an empty string + // causes `v3RunSpanPath` to omit the `?span=` param, which matches + // current behaviour for cached PG responses. + return { + run: { + id: RunId.fromFriendlyId(result.existingRunId), + friendlyId: result.existingRunId, + spanId: "", + }, + error: undefined, + isCached: true, + }; + } + + // Both "accepted" and "duplicate_run_id" produce the same customer- + // visible response: a buffered-trigger acknowledgement. The duplicate + // runId case is unreachable in practice (runIds are server-generated + // and unique) but is silently idempotent at the buffer layer either way. + const rawSpanId = args.engineTriggerInput.spanId; + const spanId = typeof rawSpanId === "string" ? rawSpanId : ""; + return { + run: { + id: RunId.fromFriendlyId(args.runFriendlyId), + friendlyId: args.runFriendlyId, + spanId, + }, + error: undefined, + isCached: false, + notice: NOTICE, + }; +} diff --git a/apps/webapp/app/v3/mollifier/readFallback.server.ts b/apps/webapp/app/v3/mollifier/readFallback.server.ts index 34a8b48f970..2afaac737e3 100644 --- a/apps/webapp/app/v3/mollifier/readFallback.server.ts +++ b/apps/webapp/app/v3/mollifier/readFallback.server.ts @@ -1,4 +1,8 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; import { logger } from "~/services/logger.server"; +import { deserialiseMollifierSnapshot } from "./mollifierSnapshot.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; export type ReadFallbackInput = { runId: string; @@ -6,11 +10,208 @@ export type ReadFallbackInput = { organizationId: string; }; +export type SyntheticRun = { + // Snapshot-derived TaskRun primary key. Used by ReplayTaskRunService + // for logging and by callers passing this object where a TaskRun is + // expected (cast). Derived deterministically from `friendlyId`. + id: string; + friendlyId: string; + status: "QUEUED" | "FAILED" | "CANCELED"; + // Set when the customer cancelled the run via the dashboard or API + // while it was buffered. The drainer's cancel bifurcation reads this + // on next pop and writes a CANCELED PG row directly (skipping + // materialisation). Reflected back into the UI by the synthesised + // SpanRun so the run-detail page shows the cancelled state even before + // the drainer materialises it. + cancelledAt: Date | undefined; + cancelReason: string | undefined; + // Reschedule patch (`set_delay`) writes `delayUntil` into the snapshot. + // Surfacing it on SyntheticRun lets the retrieve-run shape reflect the + // pending delay before the drainer materialises the PG row. + delayUntil: Date | undefined; + taskIdentifier: string | undefined; + createdAt: Date; + + payload: unknown; + payloadType: string | undefined; + metadata: unknown; + metadataType: string | undefined; + // Seed-metadata mirrors what `triggerTask.server.ts` writes into the + // snapshot: the original metadataPacket data preserved separately from + // any later customer mutations. ReplayTaskRunService uses these to + // rebuild the replay's metadata. + seedMetadata: string | undefined; + seedMetadataType: string | undefined; + + idempotencyKey: string | undefined; + idempotencyKeyOptions: string[] | undefined; + isTest: boolean; + depth: number; + ttl: string | undefined; + tags: string[]; + // Mirror of `tags` under the PG field name. ReplayTaskRunService reads + // `existingTaskRun.runTags`; both names are kept here so a synthetic + // run can be passed wherever the PG-shape `runTags` is expected. + runTags: string[]; + lockedToVersion: string | undefined; + resumeParentOnCompletion: boolean; + parentTaskRunId: string | undefined; + + // Allocated at gate-accept time and embedded in the snapshot so the run's + // trace is continuous from QUEUED-in-buffer through executing post-drain. + traceId: string | undefined; + spanId: string | undefined; + parentSpanId: string | undefined; + + // Replay-relevant fields populated from the engine-trigger snapshot. + // ReplayTaskRunService reads each of these from the existing TaskRun; + // when the original lives in the buffer we synthesise them here. + runtimeEnvironmentId: string | undefined; + engine: "V2"; + workerQueue: string | undefined; + queue: string | undefined; + concurrencyKey: string | undefined; + machinePreset: string | undefined; + realtimeStreamsVersion: string | undefined; + + // Additional snapshot-sourced fields used when synthesising a SpanRun + // for the dashboard's right-side details panel. All optional because + // older snapshots may not carry them. + maxAttempts: number | undefined; + maxDurationInSeconds: number | undefined; + replayedFromTaskRunFriendlyId: string | undefined; + annotations: unknown; + traceContext: unknown; + scheduleId: string | undefined; + batchId: string | undefined; + parentTaskRunFriendlyId: string | undefined; + rootTaskRunFriendlyId: string | undefined; + + error?: { code: string; message: string }; +}; + +export type ReadFallbackDeps = { + getBuffer?: () => MollifierBuffer | null; +}; + +function asString(value: unknown): string | undefined { + return typeof value === "string" ? value : undefined; +} + +function asStringArray(value: unknown): string[] { + return Array.isArray(value) && value.every((v) => typeof v === "string") ? (value as string[]) : []; +} + +function asDate(value: unknown): Date | undefined { + const raw = asString(value); + if (!raw) return undefined; + const parsed = new Date(raw); + return Number.isNaN(parsed.getTime()) ? undefined : parsed; +} + export async function findRunByIdWithMollifierFallback( input: ReadFallbackInput, -): Promise { - logger.debug("mollifier read-fallback called (phase 1 stub)", { - runId: input.runId, - }); - return null; + deps: ReadFallbackDeps = {}, +): Promise { + const buffer = (deps.getBuffer ?? getMollifierBuffer)(); + if (!buffer) return null; + + try { + const entry = await buffer.getEntry(input.runId); + if (!entry) return null; + + if (entry.envId !== input.environmentId || entry.orgId !== input.organizationId) { + logger.warn("mollifier read-fallback auth mismatch", { + runId: input.runId, + callerEnvId: input.environmentId, + callerOrgId: input.organizationId, + }); + return null; + } + + const snapshot = deserialiseMollifierSnapshot(entry.payload); + const idempotencyKeyOptionsRaw = snapshot.idempotencyKeyOptions; + const idempotencyKeyOptions = Array.isArray(idempotencyKeyOptionsRaw) + ? asStringArray(idempotencyKeyOptionsRaw) + : undefined; + + const tags = asStringArray(snapshot.tags); + const environment = + snapshot.environment && typeof snapshot.environment === "object" + ? (snapshot.environment as Record) + : undefined; + + const cancelledAt = asDate(snapshot.cancelledAt); + const cancelReason = asString(snapshot.cancelReason); + let status: SyntheticRun["status"] = "QUEUED"; + if (cancelledAt) { + status = "CANCELED"; + } else if (entry.status === "FAILED") { + status = "FAILED"; + } + const delayUntil = asDate(snapshot.delayUntil); + + return { + id: RunId.fromFriendlyId(entry.runId), + friendlyId: entry.runId, + status, + cancelledAt, + cancelReason, + delayUntil, + taskIdentifier: asString(snapshot.taskIdentifier), + createdAt: entry.createdAt, + + payload: snapshot.payload, + payloadType: asString(snapshot.payloadType), + metadata: snapshot.metadata, + metadataType: asString(snapshot.metadataType), + seedMetadata: asString(snapshot.seedMetadata), + seedMetadataType: asString(snapshot.seedMetadataType), + + idempotencyKey: asString(snapshot.idempotencyKey), + idempotencyKeyOptions, + isTest: snapshot.isTest === true, + depth: typeof snapshot.depth === "number" ? snapshot.depth : 0, + ttl: asString(snapshot.ttl), + tags, + runTags: tags, + lockedToVersion: asString(snapshot.taskVersion), + resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true, + parentTaskRunId: asString(snapshot.parentTaskRunId), + + traceId: asString(snapshot.traceId), + spanId: asString(snapshot.spanId), + parentSpanId: asString(snapshot.parentSpanId), + + runtimeEnvironmentId: + asString(environment?.id) ?? entry.envId, + engine: "V2", + workerQueue: asString(snapshot.workerQueue), + queue: asString(snapshot.queue), + concurrencyKey: asString(snapshot.concurrencyKey), + machinePreset: asString(snapshot.machine), + realtimeStreamsVersion: asString(snapshot.realtimeStreamsVersion), + + maxAttempts: typeof snapshot.maxAttempts === "number" ? snapshot.maxAttempts : undefined, + maxDurationInSeconds: + typeof snapshot.maxDurationInSeconds === "number" + ? snapshot.maxDurationInSeconds + : undefined, + replayedFromTaskRunFriendlyId: asString(snapshot.replayedFromTaskRunFriendlyId), + annotations: snapshot.annotations, + traceContext: snapshot.traceContext, + scheduleId: asString(snapshot.scheduleId), + batchId: asString(snapshot.batchId), + parentTaskRunFriendlyId: asString(snapshot.parentTaskRunFriendlyId), + rootTaskRunFriendlyId: asString(snapshot.rootTaskRunFriendlyId), + + error: entry.lastError, + }; + } catch (err) { + logger.error("mollifier read-fallback errored — fail-open to null", { + runId: input.runId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } } diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index 9c64036a040..e05d707571c 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -68,17 +68,31 @@ class MockTriggerTaskValidator implements TriggerTaskValidator { } } +// Mirror the production ClickhouseEventRepository.traceEvent shape so +// callers that read `event.traceContext.traceparent` (e.g. the +// mollifier branch seeding the snapshot) get the same W3C-formatted +// value they'd get against a real event repository. +const MOCK_TRACE_ID = "0123456789abcdef0123456789abcdef"; +const MOCK_SPAN_ID = "fedcba9876543210"; +const MOCK_TRACEPARENT = `00-${MOCK_TRACE_ID}-${MOCK_SPAN_ID}-01`; + class MockTraceEventConcern implements TraceEventConcern { + // Records the start time of the most recent traceRun callback entry. + // Used by ordering assertions that verify traceRun fires before + // downstream side effects (e.g. mollifier buffer writes). + public traceRunEnteredAt: number | undefined; + async traceRun( request: TriggerTaskRequest, parentStore: string | undefined, callback: (span: TracedEventSpan, store: string) => Promise ): Promise { + this.traceRunEnteredAt = Date.now(); return await callback( { - traceId: "test", - spanId: "test", - traceContext: {}, + traceId: MOCK_TRACE_ID, + spanId: MOCK_SPAN_ID, + traceContext: { traceparent: MOCK_TRACEPARENT }, traceparent: undefined, setAttribute: () => { }, failWithError: () => { }, @@ -1269,8 +1283,17 @@ describe("RunEngineTriggerTaskService", () => { ); containerTest( - "mollifier · mollify action triggers dual-write (buffer.accept + engine.trigger)", + "mollifier · mollify action writes to buffer and returns synthetic result (no Postgres row)", async ({ prisma, redisOptions }) => { + // When the gate decides mollify, the call site + // invokes `mollifyTrigger` which writes the engine.trigger snapshot + // to the buffer and returns a synthesised `MollifySyntheticResult` + // (run.friendlyId + notice + isCached:false). `engine.trigger` is + // NEVER invoked on this path — the run materialises in Postgres + // later, when the drainer replays the snapshot. The replay is + // covered by `mollifierDrainerHandler.test.ts`; this test pins the + // call-site integration: synthetic result + buffer write + no + // Postgres side effect. const engine = new RunEngine({ prisma, worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, @@ -1288,7 +1311,24 @@ describe("RunEngineTriggerTaskService", () => { const taskIdentifier = "test-task"; await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); - const buffer = new CapturingMollifierBuffer(); + // Buffer override records the time of the accept call so we can + // assert that traceRun fired strictly before the buffer was + // touched. If a future change re-introduces the "skip traceRun on + // mollify" shortcut, traceConcern.traceRunEnteredAt stays + // undefined and the ordering assertion fails. + class TimestampedBuffer extends CapturingMollifierBuffer { + public acceptedAt: number | undefined; + override async accept(input: { + runId: string; + envId: string; + orgId: string; + payload: string; + }) { + this.acceptedAt = Date.now(); + return await super.accept(input); + } + } + const buffer = new TimestampedBuffer(); const trippedDecision = { divert: true as const, reason: "per_env_rate" as const, @@ -1297,6 +1337,7 @@ describe("RunEngineTriggerTaskService", () => { windowMs: 200, holdMs: 500, }; + const traceConcern = new MockTraceEventConcern(); const triggerTaskService = new RunEngineTriggerTaskService({ engine, @@ -1305,7 +1346,7 @@ describe("RunEngineTriggerTaskService", () => { queueConcern: new DefaultQueueManager(prisma, engine), idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), validator: new MockTriggerTaskValidator(), - traceEventConcern: new MockTraceEventConcern(), + traceEventConcern: traceConcern, tracer: trace.getTracer("test", "0.0.0"), metadataMaximumSize: 1024 * 1024, evaluateGate: async () => ({ action: "mollify", decision: trippedDecision }), @@ -1319,25 +1360,81 @@ describe("RunEngineTriggerTaskService", () => { body: { payload: { hello: "world" } }, }); - // engine.trigger ran — Postgres has the run + // Pre-modifier span creation: traceRun must run BEFORE the buffer + // is touched. Customer-visible effect — the run span lands in + // ClickHouse from the moment the trigger returns, even when the + // drainer is offline, so buffered runs are visible in the trace + // view immediately rather than only after drain. + expect(traceConcern.traceRunEnteredAt).toBeDefined(); + expect(buffer.acceptedAt).toBeDefined(); + expect(traceConcern.traceRunEnteredAt!).toBeLessThanOrEqual(buffer.acceptedAt!); + + // Synthetic result is returned with the `mollifier.queued` notice + // (the call-site casts the synthetic shape to `TriggerTaskServiceResult`; + // at runtime the `notice` and `isCached: false` fields are present + // and read by the api.v1.tasks.$taskId.trigger.ts route handler). expect(result).toBeDefined(); expect(result?.run.friendlyId).toBeDefined(); - const pgRun = await prisma.taskRun.findFirst({ where: { id: result!.run.id } }); - expect(pgRun).not.toBeNull(); - expect(pgRun!.friendlyId).toBe(result!.run.friendlyId); - - // buffer.accept ran — Redis has the audit copy under the same friendlyId + const synthetic = result as unknown as { + run: { friendlyId: string }; + isCached: false; + notice: { code: string; message: string; docs: string }; + }; + expect(synthetic.isCached).toBe(false); + expect(synthetic.notice.code).toBe("mollifier.queued"); + expect(synthetic.notice.message).toBeTypeOf("string"); + expect(synthetic.notice.docs).toBeTypeOf("string"); + + // buffer.accept ran — Redis has the canonical engine.trigger snapshot + // under the synthesised friendlyId. The drainer will read this and + // replay it through engine.trigger to materialise the run. expect(buffer.accepted).toHaveLength(1); expect(buffer.accepted[0]!.runId).toBe(result!.run.friendlyId); expect(buffer.accepted[0]!.envId).toBe(authenticatedEnvironment.id); expect(buffer.accepted[0]!.orgId).toBe(authenticatedEnvironment.organizationId); + // Payload is a JSON-serialised MollifierSnapshot (the engine.trigger + // input). Schema is internal to the engine, so we only assert that + // it parses and references the friendlyId — anything more specific + // would couple the mollifier-layer test to engine-layer fields. + const snapshot = JSON.parse(buffer.accepted[0]!.payload) as { + traceId?: string; + spanId?: string; + traceContext?: { traceparent?: string }; + }; - // payload is the canonical replay shape - const payload = JSON.parse(buffer.accepted[0]!.payload); - expect(payload.runFriendlyId).toBe(result!.run.friendlyId); - expect(payload.taskId).toBe(taskIdentifier); - expect(payload.envId).toBe(authenticatedEnvironment.id); - expect(payload.body).toEqual({ payload: { hello: "world" } }); + // Regression guard for the dashboard trace-tree bug: the mollifier + // snapshot MUST carry a W3C `traceparent` in `traceContext`, + // seeded from the same span traceRun opened. Without it, the + // drainer replays through engine.trigger with empty traceContext + // and every downstream `recordRunDebugLog` + // (QUEUED/EXECUTING/FINISHED/run:notify…) gets a fresh traceId + + // null parentId — the run-detail page can only show the root + // span. Both the mollify and pass-through paths now flow through + // `traceEventConcern.traceRun`; this assertion pins the + // seeding-from-the-run-span contract. + expect(snapshot.traceContext?.traceparent).toMatch( + /^00-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$/ + ); + expect(snapshot.traceContext!.traceparent).toContain(snapshot.traceId); + expect(snapshot.traceContext!.traceparent).toContain(snapshot.spanId); + // The snapshot inherits the *run span's* traceId/spanId (from the + // event handed in by traceRun), not a separately-generated OTel + // span. This is what lets the drainer's `mollifier.drained` span + // and downstream engine.trigger materialisation parent on the + // same ClickHouse trace the customer sees from the moment trigger + // returns. + expect(snapshot.traceId).toBe(MOCK_TRACE_ID); + expect(snapshot.spanId).toBe(MOCK_SPAN_ID); + + // Postgres has NOT been written: engine.trigger was never called on + // the mollify path. The run materialises only when the drainer + // replays the snapshot. Regression intent: if a future change makes + // the mollify branch fall through to engine.trigger (re-introducing + // phase-1 dual-write), this assertion fails loudly. + const pgRun = await prisma.taskRun.findFirst({ + where: { friendlyId: result!.run.friendlyId }, + }); + expect(pgRun).toBeNull(); await engine.quit(); }, @@ -1398,108 +1495,6 @@ describe("RunEngineTriggerTaskService", () => { }, ); - containerTest( - "mollifier · engine.trigger throwing AFTER buffer.accept leaves an orphan entry (documented behaviour)", - async ({ prisma, redisOptions }) => { - // SCENARIO: dual-write where buffer.accept succeeds but engine.trigger - // throws. The throw propagates to the caller (correct: customer sees - // the same 4xx as today), and the buffer entry remains as an "orphan" - // — the no-op drainer will pop+ack it on its next poll, so the - // orphan is bounded (~drainer pollIntervalMs) but observable in the - // audit trail (mollifier.buffered with no matching TaskRun). - // - // Why engine.trigger can throw post-buffer: - // - RunDuplicateIdempotencyKeyError (Prisma P2002 on idempotencyKey): - // a concurrent non-mollified trigger with the same idempotencyKey - // wins the DB UNIQUE constraint between IdempotencyKeyConcern's - // pre-check and engine.trigger's INSERT. - // - RunOneTimeUseTokenError (Prisma P2002 on oneTimeUseToken). - // - Transient Prisma errors (FK constraint, connection drop, etc.). - // - // Why we don't "fix" this race now: - // The customer correctly gets the error. State eventually converges - // (drainer pops the orphan). The audit-trail explicitly surfaces - // "buffered without TaskRun" entries to operators. A real fix is - // a later change's responsibility once the buffer becomes the primary write - // — at that point we add the mollifier-specific idempotency index. - // - // This test pins the current ordering: buffer.accept fires synchronously - // BEFORE engine.trigger, and engine.trigger failure does NOT roll back - // the buffer write. Any future change that reverses the order or adds - // a silent rollback will fail this assertion and force a design - // decision rather than a silent behaviour change. - - const engine = new RunEngine({ - prisma, - worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, - queue: { redis: redisOptions }, - runLock: { redis: redisOptions }, - machines: { - defaultMachine: "small-1x", - machines: { "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 } }, - baseCostInCents: 0.0005, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const taskIdentifier = "test-task"; - await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); - - const buffer = new CapturingMollifierBuffer(); - - // Force engine.trigger to throw on this single call. We spy AFTER - // setupBackgroundWorker so the worker setup still uses the real - // engine.trigger (which has its own engine.trigger-ish calls for - // worker bootstrap — though in practice setupBackgroundWorker doesn't - // call trigger). - const simulatedFailure = new Error("simulated engine.trigger failure post-buffer"); - vi.spyOn(engine, "trigger").mockRejectedValueOnce(simulatedFailure); - - const triggerTaskService = new RunEngineTriggerTaskService({ - engine, - prisma, - payloadProcessor: new MockPayloadProcessor(), - queueConcern: new DefaultQueueManager(prisma, engine), - idempotencyKeyConcern: new IdempotencyKeyConcern(prisma, engine, new MockTraceEventConcern()), - validator: new MockTriggerTaskValidator(), - traceEventConcern: new MockTraceEventConcern(), - tracer: trace.getTracer("test", "0.0.0"), - metadataMaximumSize: 1024 * 1024, - evaluateGate: async () => ({ - action: "mollify", - decision: { - divert: true, - reason: "per_env_rate", - count: 150, - threshold: 100, - windowMs: 200, - holdMs: 500, - }, - }), - getMollifierBuffer: () => buffer as never, - isMollifierGloballyEnabled: () => true, - }); - - await expect( - triggerTaskService.call({ - taskId: taskIdentifier, - environment: authenticatedEnvironment, - body: { payload: { test: "x" } }, - }), - ).rejects.toThrow(/simulated engine.trigger failure post-buffer/); - - // The buffer write happened BEFORE engine.trigger threw. The orphan - // remains; the audit-trail will surface it (mollifier.buffered with - // no matching TaskRun row). The no-op drainer cleans it up. - expect(buffer.accepted).toHaveLength(1); - const orphanPayload = JSON.parse(buffer.accepted[0]!.payload); - expect(orphanPayload.taskId).toBe(taskIdentifier); - - await engine.quit(); - }, - ); - containerTest( "mollifier · idempotency-key match short-circuits BEFORE the gate is consulted", async ({ prisma, redisOptions }) => { @@ -1607,143 +1602,6 @@ describe("RunEngineTriggerTaskService", () => { }, ); - containerTest( - "mollifier · debounce match produces an orphan buffer entry (documented behaviour)", - async ({ prisma, redisOptions }) => { - // SCENARIO: a trigger with a debounce key arrives while a matching - // debounced run already exists. `debounceSystem.handleDebounce` runs - // INSIDE `engine.trigger` (line ~514 of run-engine/src/engine/index.ts), - // AFTER buffer.accept has already written the new friendlyId. The - // service correctly returns the existing run id to the customer, but - // the buffer is left with an orphan entry for the new friendlyId. - // - // Why this is acceptable now: - // - Customer-facing behaviour is unchanged from today: they receive - // the existing run id, same as the non-mollified path. - // - The orphan is bounded — the drainer's no-op-ack handler pops - // and acks it on its next poll. - // - The audit-trail surfaces it: a `mollifier.buffered` log line - // with `runId` that has no matching TaskRun in Postgres. - // - // Why a later change cares: - // - When the buffer becomes the primary write path, debounce can - // no longer be allowed to run AFTER buffer.accept. The drainer's - // engine.trigger replay would observe "existing" and skip the - // persist — the customer's synthesised 200 (with the new - // friendlyId) would never get a TaskRun, and the audit-trail - // divergence becomes a real data-loss bug. - // - A later change must lift `handleDebounce` into the call site BEFORE - // buffer.accept: - // 1. handleDebounce → if existing, return existing run; do NOT - // touch the buffer. - // 2. Otherwise, accept with `claimId` threaded into the - // canonical payload so the drainer's replay can - // `registerDebouncedRun` after persisting. - // - // This test pins the current ordering. A future change that "fixes" - // it by lifting handleDebounce upfront will fail the orphan - // assertion below and force an explicit choice (update the test, - // remove this scenario, or stage the lift behind a flag). - - const engine = new RunEngine({ - prisma, - worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, - queue: { redis: redisOptions }, - runLock: { redis: redisOptions }, - machines: { - defaultMachine: "small-1x", - machines: { "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 } }, - baseCostInCents: 0.0005, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - const taskIdentifier = "test-task"; - await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); - - const idempotencyKeyConcern = new IdempotencyKeyConcern( - prisma, - engine, - new MockTraceEventConcern(), - ); - - // Setup: trigger with debounce — creates the existing run + Redis claim. - const baseline = new RunEngineTriggerTaskService({ - engine, - prisma, - payloadProcessor: new MockPayloadProcessor(), - queueConcern: new DefaultQueueManager(prisma, engine), - idempotencyKeyConcern, - validator: new MockTriggerTaskValidator(), - traceEventConcern: new MockTraceEventConcern(), - tracer: trace.getTracer("test", "0.0.0"), - metadataMaximumSize: 1024 * 1024, - }); - const first = await baseline.call({ - taskId: taskIdentifier, - environment: authenticatedEnvironment, - body: { - payload: { test: "x" }, - options: { debounce: { key: "regression-debounce-6", delay: "30s" } }, - }, - }); - expect(first?.run.friendlyId).toBeDefined(); - - // Action: same debounce key, mollify-stub gate. - const buffer = new CapturingMollifierBuffer(); - const mollifierService = new RunEngineTriggerTaskService({ - engine, - prisma, - payloadProcessor: new MockPayloadProcessor(), - queueConcern: new DefaultQueueManager(prisma, engine), - idempotencyKeyConcern, - validator: new MockTriggerTaskValidator(), - traceEventConcern: new MockTraceEventConcern(), - tracer: trace.getTracer("test", "0.0.0"), - metadataMaximumSize: 1024 * 1024, - evaluateGate: async () => ({ - action: "mollify", - decision: { - divert: true, - reason: "per_env_rate", - count: 150, - threshold: 100, - windowMs: 200, - holdMs: 500, - }, - }), - getMollifierBuffer: () => buffer as never, - isMollifierGloballyEnabled: () => true, - }); - - const debounced = await mollifierService.call({ - taskId: taskIdentifier, - environment: authenticatedEnvironment, - body: { - payload: { test: "x" }, - options: { debounce: { key: "regression-debounce-6", delay: "30s" } }, - }, - }); - - // Customer-facing behaviour: the existing run is returned (correct). - expect(debounced).toBeDefined(); - expect(debounced?.run.friendlyId).toBe(first?.run.friendlyId); - - // Orphan: buffer.accept fired with the new friendlyId we generated - // upfront, and that friendlyId has no matching TaskRun in Postgres - // because engine.trigger returned the existing run via debounce. - expect(buffer.accepted).toHaveLength(1); - expect(buffer.accepted[0]!.runId).not.toBe(first?.run.friendlyId); - const orphanFriendlyId = buffer.accepted[0]!.runId; - const orphanRow = await prisma.taskRun.findFirst({ - where: { friendlyId: orphanFriendlyId }, - }); - expect(orphanRow).toBeNull(); - - await engine.quit(); - }, - ); }); describe("DefaultQueueManager task metadata cache", () => { diff --git a/apps/webapp/test/mollifierClaimResolution.test.ts b/apps/webapp/test/mollifierClaimResolution.test.ts new file mode 100644 index 00000000000..8c2e850414e --- /dev/null +++ b/apps/webapp/test/mollifierClaimResolution.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it, vi } from "vitest"; + +// Stub `~/db.server` before importing the concern — the real module +// eagerly calls `prisma.$connect()` at singleton construction, which +// would fail without a database. The concern under test receives its +// prisma via the constructor, so the stub is never used by the code path. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +// The IdempotencyKeyConcern resolves the pre-gate claim through the +// global mollifier buffer (`getMollifierBuffer`), shared by both +// `claimOrAwait` and `findBufferedRunWithIdempotency`. Control it via a +// hoisted handle so each test can script the claim/lookup responses. +const h = vi.hoisted(() => ({ buffer: null as unknown })); +vi.mock("~/v3/mollifier/mollifierBuffer.server", () => ({ + getMollifierBuffer: () => h.buffer, +})); + +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; +import type { TriggerTaskRequest } from "~/runEngine/types"; + +function makeConcern(prisma: { findFirst: () => Promise }) { + return new IdempotencyKeyConcern( + { taskRun: { findFirst: prisma.findFirst } } as never, + {} as never, // engine — unused on this path + {} as never, // traceEventConcern — unused on this path + ); +} + +function makeRequest(): TriggerTaskRequest { + return { + taskId: "my-task", + environment: { id: "env_a", organizationId: "org_1" }, + options: {}, + body: { options: { idempotencyKey: "k-1" } }, + } as unknown as TriggerTaskRequest; +} + +describe("IdempotencyKeyConcern · claim resolution", () => { + it("resolved-but-unfindable falls through to a fresh trigger (no cached run, no claim held)", async () => { + // The claim slot holds a runId that is gone from both stores: the PG + // findFirst misses and the buffer lookup misses. Regression guard for + // the resolved-but-unfindable terminal case — the concern must fall + // through to a fresh trigger rather than throw, hand back a bogus + // cached run, or claim ownership it doesn't hold. + const lookupIdempotency = vi.fn(async () => null); + h.buffer = { + claimIdempotency: vi.fn(async () => ({ kind: "resolved", runId: "run_gone" })), + lookupIdempotency, + } as unknown as MollifierBuffer; + + const findFirst = vi.fn(async () => null); // PG misses on every call + const concern = makeConcern({ findFirst }); + + const result = await concern.handleTriggerRequest(makeRequest(), undefined); + + expect(result.isCached).toBe(false); + if (result.isCached === false) { + // No claim held — we resolved someone else's (stale) claim, we did + // not win one. The caller must NOT publish/release on our behalf. + expect(result.claim).toBeUndefined(); + expect(result.idempotencyKey).toBe("k-1"); + } + // We attempted the buffer fallback before giving up. + expect(lookupIdempotency).toHaveBeenCalled(); + }); + + it("resolved-and-findable returns the existing run as a cached hit", async () => { + // Guard the happy resolved path: when the claimed runId IS findable + // (writer-side PG), the fall-through change must not swallow it. + h.buffer = { + claimIdempotency: vi.fn(async () => ({ kind: "resolved", runId: "run_winner" })), + lookupIdempotency: vi.fn(async () => null), + } as unknown as MollifierBuffer; + + const winner = { id: "run_winner", friendlyId: "run_winner" }; + // First findFirst (initial existingRun check) misses so we enter the + // claim path; the second (writer-side re-resolve) finds the winner. + let calls = 0; + const findFirst = vi.fn(async () => { + calls += 1; + return calls >= 2 ? winner : null; + }); + const concern = makeConcern({ findFirst }); + + const result = await concern.handleTriggerRequest(makeRequest(), undefined); + + expect(result.isCached).toBe(true); + if (result.isCached === true) { + expect(result.run).toBe(winner); + } + }); +}); diff --git a/apps/webapp/test/mollifierGate.test.ts b/apps/webapp/test/mollifierGate.test.ts index b81df7f0c5b..e40a29b2481 100644 --- a/apps/webapp/test/mollifierGate.test.ts +++ b/apps/webapp/test/mollifierGate.test.ts @@ -432,3 +432,82 @@ describe("evaluateGate — per-org isolation via Organization.featureFlags", () expect(unrelatedDeps.spies.evaluatorCalls).toBe(0); }); }); + +// Bypasses: the three categories of trigger that the mollifier never +// intercepts, regardless of the per-org flag or the trip-evaluator decision. +describe("evaluateGate — debounce / OTU / triggerAndWait bypasses", () => { + it("debounce triggers pass through without invoking the evaluator", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: false, + flag: true, + decision: trippedDecision, + }); + const outcome = await evaluateGate( + { ...inputs, options: { debounce: { key: "k" } } }, + deps, + ); + expect(outcome).toEqual({ action: "pass_through" }); + expect(spies.evaluatorCalls).toBe(0); + }); + + it("oneTimeUseToken triggers pass through without invoking the evaluator", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: false, + flag: true, + decision: trippedDecision, + }); + const outcome = await evaluateGate( + { ...inputs, options: { oneTimeUseToken: "jwt-otu" } }, + deps, + ); + expect(outcome).toEqual({ action: "pass_through" }); + expect(spies.evaluatorCalls).toBe(0); + }); + + it("single triggerAndWait (parentTaskRunId + resumeParentOnCompletion) passes through", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: false, + flag: true, + decision: trippedDecision, + }); + const outcome = await evaluateGate( + { + ...inputs, + options: { parentTaskRunId: "run_parent", resumeParentOnCompletion: true }, + }, + deps, + ); + expect(outcome).toEqual({ action: "pass_through" }); + expect(spies.evaluatorCalls).toBe(0); + }); + + it("parentTaskRunId alone (no resumeParentOnCompletion) does NOT bypass — must be both", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: false, + flag: true, + decision: trippedDecision, + }); + const outcome = await evaluateGate( + { ...inputs, options: { parentTaskRunId: "run_parent" } }, + deps, + ); + expect(outcome.action).toBe("mollify"); + expect(spies.evaluatorCalls).toBe(1); + }); + + it("bypass records pass_through decision (so observability counters stay accurate)", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: false, + flag: true, + decision: trippedDecision, + }); + await evaluateGate({ ...inputs, options: { debounce: { key: "k" } } }, deps); + expect(spies.recordDecisionCalls).toHaveLength(1); + expect(spies.recordDecisionCalls[0].outcome).toBe("pass_through"); + }); +}); diff --git a/apps/webapp/test/mollifierIdempotencyClaim.test.ts b/apps/webapp/test/mollifierIdempotencyClaim.test.ts new file mode 100644 index 00000000000..87c009cb1f7 --- /dev/null +++ b/apps/webapp/test/mollifierIdempotencyClaim.test.ts @@ -0,0 +1,268 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { + claimOrAwait, + publishClaim, + releaseClaim, +} from "~/v3/mollifier/idempotencyClaim.server"; +import type { + IdempotencyClaimResult, + MollifierBuffer, +} from "@trigger.dev/redis-worker"; + +type ClaimState = { + value: string | null; + // Scripted return sequence for claimIdempotency calls. When set, + // overrides the default behaviour of returning based on `value`. + scriptedClaims?: IdempotencyClaimResult[]; +}; + +function makeBuffer(initial: ClaimState = { value: null }): { + buffer: MollifierBuffer; + state: ClaimState; +} { + const state = { ...initial }; + const buffer = { + claimIdempotency: vi.fn(async (): Promise => { + if (state.scriptedClaims && state.scriptedClaims.length > 0) { + return state.scriptedClaims.shift()!; + } + if (state.value === null) { + state.value = "pending"; + return { kind: "claimed" }; + } + if (state.value === "pending") return { kind: "pending" }; + return { kind: "resolved", runId: state.value }; + }), + readClaim: vi.fn(async (): Promise => { + if (state.value === null) return null; + if (state.value === "pending") return { kind: "pending" }; + return { kind: "resolved", runId: state.value }; + }), + publishClaim: vi.fn(async ({ runId }: { runId: string }) => { + state.value = runId; + }), + releaseClaim: vi.fn(async () => { + state.value = null; + }), + } as unknown as MollifierBuffer; + return { buffer, state }; +} + +const baseInput = { + envId: "env_a", + taskIdentifier: "my-task", + idempotencyKey: "k-1", +}; + +describe("claimOrAwait", () => { + it("returns 'claimed' for the first caller — empty key wins SETNX", async () => { + const { buffer } = makeBuffer({ value: null }); + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + generateToken: () => "token-1", + }); + expect(outcome).toEqual({ kind: "claimed", token: "token-1" }); + }); + + it("returns 'resolved' immediately when the key already holds a runId", async () => { + const { buffer } = makeBuffer({ value: "run_X" }); + const outcome = await claimOrAwait({ ...baseInput, buffer }); + expect(outcome).toEqual({ kind: "resolved", runId: "run_X" }); + }); + + it("polls a pending key, then resolves when the runId is published", async () => { + const { buffer, state } = makeBuffer({ value: "pending" }); + let nowValue = 0; + let pollCount = 0; + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + now: () => nowValue, + sleep: async (ms) => { + nowValue += ms; + pollCount += 1; + if (pollCount === 3) state.value = "run_X"; + }, + safetyNetMs: 1000, + pollStepMs: 25, + }); + expect(outcome).toEqual({ kind: "resolved", runId: "run_X" }); + }); + + it("returns 'timed_out' when the key stays pending past safetyNetMs", async () => { + const { buffer } = makeBuffer({ value: "pending" }); + let nowValue = 0; + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + now: () => nowValue, + sleep: async (ms) => { + nowValue += ms; + }, + safetyNetMs: 50, + pollStepMs: 25, + }); + expect(outcome).toEqual({ kind: "timed_out" }); + }); + + it("retries the claim when a polled key vanishes (claimant released)", async () => { + const { buffer, state } = makeBuffer({ value: "pending" }); + let nowValue = 0; + let pollCount = 0; + // Scripted retry: on the second `claimIdempotency` call we win. + state.scriptedClaims = [ + { kind: "pending" }, // first call (initial) + { kind: "claimed" }, // second call (retry after release) + ]; + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + generateToken: () => "token-retry", + now: () => nowValue, + sleep: async (ms) => { + nowValue += ms; + pollCount += 1; + // First poll cycle: key vanishes (release). + if (pollCount === 1) state.value = null; + }, + safetyNetMs: 1000, + pollStepMs: 25, + }); + expect(outcome).toEqual({ kind: "claimed", token: "token-retry" }); + }); + + it("fails open with 'claimed' when buffer is null (mollifier disabled)", async () => { + const outcome = await claimOrAwait({ + ...baseInput, + buffer: null, + generateToken: () => "token-fallopen-null", + }); + expect(outcome).toEqual({ kind: "claimed", token: "token-fallopen-null" }); + }); + + it("fails open with 'claimed' if buffer.claimIdempotency throws (Redis down)", async () => { + const buffer = { + claimIdempotency: vi.fn(async () => { + throw new Error("ECONNREFUSED"); + }), + } as unknown as MollifierBuffer; + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + generateToken: () => "token-fallopen-throw", + }); + expect(outcome).toEqual({ kind: "claimed", token: "token-fallopen-throw" }); + }); + + it("respects an aborted signal during the wait loop", async () => { + const { buffer } = makeBuffer({ value: "pending" }); + const controller = new AbortController(); + let nowValue = 0; + let pollCount = 0; + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + now: () => nowValue, + sleep: async (ms) => { + nowValue += ms; + pollCount += 1; + if (pollCount === 1) controller.abort(); + }, + abortSignal: controller.signal, + safetyNetMs: 5000, + pollStepMs: 25, + }); + expect(outcome).toEqual({ kind: "timed_out" }); + }); +}); + +describe("publishClaim", () => { + it("writes the runId to the claim key", async () => { + const { buffer, state } = makeBuffer({ value: "pending" }); + await publishClaim({ ...baseInput, token: "owner-token", runId: "run_X", buffer }); + expect(state.value).toBe("run_X"); + expect(buffer.publishClaim).toHaveBeenCalledOnce(); + }); + + it("no-op when buffer is null", async () => { + await expect( + publishClaim({ ...baseInput, token: "owner-token", runId: "run_X", buffer: null }), + ).resolves.toBeUndefined(); + }); + + it("swallows errors so trigger pipeline isn't broken by Redis hiccups", async () => { + const buffer = { + publishClaim: vi.fn(async () => { + throw new Error("ECONNREFUSED"); + }), + } as unknown as MollifierBuffer; + await expect( + publishClaim({ ...baseInput, token: "owner-token", runId: "run_X", buffer }), + ).resolves.toBeUndefined(); + }); +}); + +describe("releaseClaim", () => { + it("DELs the claim so waiters can re-acquire", async () => { + const { buffer, state } = makeBuffer({ value: "pending" }); + await releaseClaim({ ...baseInput, token: "owner-token", buffer }); + expect(state.value).toBeNull(); + }); + + it("no-op when buffer is null", async () => { + await expect(releaseClaim({ ...baseInput, token: "owner-token", buffer: null })).resolves.toBeUndefined(); + }); +}); + +// End-to-end: the token from `claimOrAwait`'s `claimed` outcome must +// reach `buffer.claimIdempotency` and round-trip through publishClaim / +// releaseClaim. Without this the compare-and-act ownership protection +// in the buffer is bypassed and the stale-claimant hazard returns. +describe("claim ownership token wiring", () => { + it("threads the token from claimOrAwait into buffer.claimIdempotency", async () => { + const { buffer } = makeBuffer({ value: null }); + const outcome = await claimOrAwait({ + ...baseInput, + buffer, + generateToken: () => "owner-token-xyz", + }); + expect(outcome).toEqual({ kind: "claimed", token: "owner-token-xyz" }); + expect(buffer.claimIdempotency).toHaveBeenCalledWith({ + ...baseInput, + token: "owner-token-xyz", + ttlSeconds: 30, + }); + }); + + it("threads the token from publishClaim into buffer.publishClaim", async () => { + const { buffer } = makeBuffer({ value: "pending" }); + await publishClaim({ + ...baseInput, + token: "owner-token-xyz", + runId: "run_X", + buffer, + }); + expect(buffer.publishClaim).toHaveBeenCalledWith( + expect.objectContaining({ + token: "owner-token-xyz", + runId: "run_X", + }), + ); + }); + + it("threads the token from releaseClaim into buffer.releaseClaim", async () => { + const { buffer } = makeBuffer({ value: "pending" }); + await releaseClaim({ + ...baseInput, + token: "owner-token-xyz", + buffer, + }); + expect(buffer.releaseClaim).toHaveBeenCalledWith( + expect.objectContaining({ token: "owner-token-xyz" }), + ); + }); +}); diff --git a/apps/webapp/test/mollifierMollify.test.ts b/apps/webapp/test/mollifierMollify.test.ts new file mode 100644 index 00000000000..ec7a30b49c2 --- /dev/null +++ b/apps/webapp/test/mollifierMollify.test.ts @@ -0,0 +1,133 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; + +function fakeBuffer( + acceptResult: Awaited> = { kind: "accepted" }, +): { buffer: MollifierBuffer; accept: ReturnType } { + const accept = vi.fn(async () => acceptResult); + return { + buffer: { accept } as unknown as MollifierBuffer, + accept, + }; +} + +describe("mollifyTrigger", () => { + it("writes the snapshot to buffer and returns synthesised result", async () => { + const { buffer, accept } = fakeBuffer(); + const result = await mollifyTrigger({ + runFriendlyId: "run_abc123def456", + environmentId: "env_a", + organizationId: "org_1", + engineTriggerInput: { taskIdentifier: "my-task", payload: '{"x":1}' }, + decision: { + divert: true, + reason: "per_env_rate", + count: 150, + threshold: 100, + }, + buffer, + }); + + expect(accept).toHaveBeenCalledOnce(); + expect(accept).toHaveBeenCalledWith({ + runId: "run_abc123def456", + envId: "env_a", + orgId: "org_1", + payload: expect.any(String), + idempotencyKey: undefined, + taskIdentifier: undefined, + }); + expect(result.run.friendlyId).toBe("run_abc123def456"); + expect(result.error).toBeUndefined(); + expect(result.isCached).toBe(false); + expect(result.notice).toEqual({ + code: "mollifier.queued", + message: expect.stringContaining("burst buffer"), + docs: expect.stringContaining("trigger.dev/docs"), + }); + }); + + it("echoes the winner's runId with isCached=true on duplicate_idempotency", async () => { + const { buffer } = fakeBuffer({ + kind: "duplicate_idempotency", + existingRunId: "run_winner12345", + }); + const result = await mollifyTrigger({ + runFriendlyId: "run_loser56789a", + environmentId: "env_a", + organizationId: "org_1", + engineTriggerInput: { taskIdentifier: "t", payload: "{}" }, + decision: { divert: true, reason: "per_env_rate", count: 1, threshold: 1 }, + buffer, + idempotencyKey: "key", + taskIdentifier: "t", + }); + expect(result.run.friendlyId).toBe("run_winner12345"); + expect(result.isCached).toBe(true); + expect(result.notice).toBeUndefined(); + }); + + // Regression: the synthetic result MUST carry a populated `run.id` + // derived from the friendlyId. Without it, the route handler's + // `saveRequestIdempotency(…, result.run.id)` stores `undefined` as + // the cached entity id, and on SDK retry Prisma's + // `findFirst({ where: { id: undefined } })` silently drops the + // predicate and returns an arbitrary TaskRun — a cross-tenant leak + // path. (See Devin review on PR #3753.) + it("populates run.id from friendlyId on the happy-accept path", async () => { + const { buffer } = fakeBuffer(); + const result = await mollifyTrigger({ + runFriendlyId: "run_pri456789ab", + environmentId: "env_a", + organizationId: "org_1", + engineTriggerInput: { taskIdentifier: "t", payload: "{}" }, + decision: { divert: true, reason: "per_env_rate", count: 1, threshold: 1 }, + buffer, + }); + expect(result.run.id).toBe(RunId.fromFriendlyId("run_pri456789ab")); + expect(result.run.id).toMatch(/^[a-z0-9]+$/); // non-undefined, non-empty + }); + + it("populates run.id from the WINNER's friendlyId on duplicate_idempotency", async () => { + const { buffer } = fakeBuffer({ + kind: "duplicate_idempotency", + existingRunId: "run_winnerdup12", + }); + const result = await mollifyTrigger({ + runFriendlyId: "run_loser56789a", + environmentId: "env_a", + organizationId: "org_1", + engineTriggerInput: { taskIdentifier: "t", payload: "{}" }, + decision: { divert: true, reason: "per_env_rate", count: 1, threshold: 1 }, + buffer, + idempotencyKey: "key", + taskIdentifier: "t", + }); + expect(result.run.id).toBe(RunId.fromFriendlyId("run_winnerdup12")); + expect(result.run.id).not.toBe(RunId.fromFriendlyId("run_loser56789a")); + }); + + it("snapshot is round-trippable: payload field is parseable JSON of engineTriggerInput", async () => { + const { buffer, accept } = fakeBuffer(); + const engineInput = { taskIdentifier: "t", payload: "{}", tags: ["a", "b"] }; + await mollifyTrigger({ + runFriendlyId: "run_xabcde12345", + environmentId: "env_a", + organizationId: "org_1", + engineTriggerInput: engineInput, + decision: { divert: true, reason: "per_env_rate", count: 1, threshold: 1 }, + buffer, + }); + + const callArg = accept.mock.calls[0][0] as { payload: string }; + expect(JSON.parse(callArg.payload)).toEqual(engineInput); + }); +}); diff --git a/apps/webapp/test/mollifierReadFallback.test.ts b/apps/webapp/test/mollifierReadFallback.test.ts new file mode 100644 index 00000000000..229da5756e4 --- /dev/null +++ b/apps/webapp/test/mollifierReadFallback.test.ts @@ -0,0 +1,332 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import type { MollifierBuffer, BufferEntry } from "@trigger.dev/redis-worker"; + +function fakeBuffer(entry: BufferEntry | null): MollifierBuffer { + return { + getEntry: vi.fn(async () => entry), + } as unknown as MollifierBuffer; +} + +const NOW = new Date("2026-05-11T12:00:00Z"); + +describe("findRunByIdWithMollifierFallback", () => { + it("returns null when buffer is unavailable (mollifier disabled)", async () => { + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => null }, + ); + expect(result).toBeNull(); + }); + + it("returns null when no buffer entry exists", async () => { + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(null) }, + ); + expect(result).toBeNull(); + }); + + it("returns null when buffer entry envId does not match caller (auth mismatch)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_OTHER", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result).toBeNull(); + }); + + it("returns null when buffer entry orgId does not match caller (auth mismatch)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_OTHER", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result).toBeNull(); + }); + + it("returns synthesised QUEUED run when entry exists with matching auth", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "my-task" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result).not.toBeNull(); + expect(result!.friendlyId).toBe("run_1"); + expect(result!.status).toBe("QUEUED"); + expect(result!.taskIdentifier).toBe("my-task"); + expect(result!.createdAt).toEqual(NOW); + }); + + it("returns synthesised QUEUED for DRAINING (internal state same externally)", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "DRAINING", + attempts: 1, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.status).toBe("QUEUED"); + }); + + it("returns FAILED state with structured error for FAILED entries", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "FAILED", + attempts: 3, + createdAt: NOW, + lastError: { code: "VALIDATION", message: "task not found" }, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.status).toBe("FAILED"); + expect(result!.error).toEqual({ code: "VALIDATION", message: "task not found" }); + }); + + it("extracts snapshot-derived fields from the buffered payload", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "my-task", + payload: '{"foo":"bar"}', + payloadType: "application/json", + metadata: '{"customer":"acme"}', + metadataType: "application/json", + idempotencyKey: "client-abc", + idempotencyKeyOptions: ["payload"], + isTest: true, + depth: 2, + ttl: "1h", + tags: ["tag-a", "tag-b"], + // The engine.trigger snapshot stores the locked version string under + // `taskVersion` (see triggerTask.server.ts#buildEngineTriggerInput). + taskVersion: "20260511.1", + resumeParentOnCompletion: false, + parentTaskRunId: "run_parent", + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result).not.toBeNull(); + expect(result!.payloadType).toBe("application/json"); + expect(result!.metadata).toBe('{"customer":"acme"}'); + expect(result!.metadataType).toBe("application/json"); + expect(result!.idempotencyKey).toBe("client-abc"); + expect(result!.idempotencyKeyOptions).toEqual(["payload"]); + expect(result!.isTest).toBe(true); + expect(result!.depth).toBe(2); + expect(result!.ttl).toBe("1h"); + expect(result!.tags).toEqual(["tag-a", "tag-b"]); + expect(result!.lockedToVersion).toBe("20260511.1"); + expect(result!.resumeParentOnCompletion).toBe(false); + expect(result!.parentTaskRunId).toBe("run_parent"); + }); + + it("extracts gate-allocated trace context from the snapshot", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "t", + traceId: "trace_abc", + spanId: "span_xyz", + parentSpanId: "span_parent", + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.traceId).toBe("trace_abc"); + expect(result!.spanId).toBe("span_xyz"); + expect(result!.parentSpanId).toBe("span_parent"); + }); + + it("defaults snapshot-derived fields to safe values when absent", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.payloadType).toBeUndefined(); + expect(result!.metadata).toBeUndefined(); + expect(result!.idempotencyKey).toBeUndefined(); + expect(result!.isTest).toBe(false); + expect(result!.depth).toBe(0); + expect(result!.tags).toEqual([]); + expect(result!.resumeParentOnCompletion).toBe(false); + expect(result!.traceId).toBeUndefined(); + expect(result!.spanId).toBeUndefined(); + }); + + it("populates replay-relevant fields from the snapshot", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "my-task", + environment: { id: "env_a" }, + workerQueue: "default", + queue: "task/my-task", + concurrencyKey: "tenant-42", + machine: "medium-1x", + realtimeStreamsVersion: "v2", + seedMetadata: '{"k":"v"}', + seedMetadataType: "application/json", + tags: ["t1", "t2"], + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result).not.toBeNull(); + expect(result!.id).toBeTypeOf("string"); + expect(result!.id.length).toBeGreaterThan(0); + expect(result!.engine).toBe("V2"); + expect(result!.runtimeEnvironmentId).toBe("env_a"); + expect(result!.workerQueue).toBe("default"); + expect(result!.queue).toBe("task/my-task"); + expect(result!.concurrencyKey).toBe("tenant-42"); + expect(result!.machinePreset).toBe("medium-1x"); + expect(result!.realtimeStreamsVersion).toBe("v2"); + expect(result!.seedMetadata).toBe('{"k":"v"}'); + expect(result!.seedMetadataType).toBe("application/json"); + expect(result!.runTags).toEqual(["t1", "t2"]); + }); + + it("treats invalid date strings as undefined and does not mis-classify status as CANCELED", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "t", + cancelledAt: "not-a-date", + cancelReason: "user requested", + delayUntil: "also-not-a-date", + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result).not.toBeNull(); + expect(result!.status).toBe("QUEUED"); + expect(result!.cancelledAt).toBeUndefined(); + expect(result!.delayUntil).toBeUndefined(); + }); + + it("parses valid ISO date strings on cancelledAt and delayUntil", async () => { + const cancelledAtIso = "2026-05-11T13:00:00.000Z"; + const delayUntilIso = "2026-05-11T14:00:00.000Z"; + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + taskIdentifier: "t", + cancelledAt: cancelledAtIso, + cancelReason: "user requested", + delayUntil: delayUntilIso, + }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.status).toBe("CANCELED"); + expect(result!.cancelledAt).toEqual(new Date(cancelledAtIso)); + expect(result!.cancelReason).toBe("user requested"); + expect(result!.delayUntil).toEqual(new Date(delayUntilIso)); + }); + + it("falls back to entry.envId for runtimeEnvironmentId when snapshot lacks environment.id", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ taskIdentifier: "t" }), + status: "QUEUED", + attempts: 0, + createdAt: NOW, + }; + const result = await findRunByIdWithMollifierFallback( + { runId: "run_1", environmentId: "env_a", organizationId: "org_1" }, + { getBuffer: () => fakeBuffer(entry) }, + ); + expect(result!.runtimeEnvironmentId).toBe("env_a"); + expect(result!.workerQueue).toBeUndefined(); + expect(result!.queue).toBeUndefined(); + }); +}); diff --git a/apps/webapp/test/mollifierTripEvaluator.test.ts b/apps/webapp/test/mollifierTripEvaluator.test.ts index b9a9bf8c94a..14ac0cc55bc 100644 --- a/apps/webapp/test/mollifierTripEvaluator.test.ts +++ b/apps/webapp/test/mollifierTripEvaluator.test.ts @@ -14,7 +14,7 @@ describe("createRealTripEvaluator", () => { redisTest( "returns divert=false when the sliding window stays under threshold", async ({ redisOptions }) => { - const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 600 }); + const buffer = new MollifierBuffer({ redisOptions }); try { const evaluator = createRealTripEvaluator({ getBuffer: () => buffer, @@ -32,7 +32,7 @@ describe("createRealTripEvaluator", () => { redisTest( "returns divert=true with reason per_env_rate once the window trips", async ({ redisOptions }) => { - const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 600 }); + const buffer = new MollifierBuffer({ redisOptions }); try { // threshold=2 → the 3rd call within windowMs is the first that trips. const options = { windowMs: 5000, threshold: 2, holdMs: 5000 } as const; @@ -73,7 +73,7 @@ describe("createRealTripEvaluator", () => { redisTest( "returns divert=false when buffer throws (fail-open)", async ({ redisOptions }) => { - const buffer = new MollifierBuffer({ redisOptions, entryTtlSeconds: 600 }); + const buffer = new MollifierBuffer({ redisOptions }); // Closing the client up front means evaluateTrip will throw on the first // Redis command — a real failure mode, not a stub. await buffer.close();