From 4fa7e74c4104173f4950febe7bbcffcfd0c70e7d Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 26 May 2026 16:52:05 -0700 Subject: [PATCH 1/5] feat(tools): queue hosted-key tool calls instead of failing with 429 (#4416) * Add queueing for hosted keys * feat(rate-limiter): FIFO queue for hosted-key per-workspace fairness Replace the per-call distributed lock with a Redis-backed FIFO queue so callers within a workspace get strict ordering instead of racing the bucket. Adds heartbeat-based crash recovery and dead-head reaping in a single Lua script. Bumps Exa search hosted RPM from 5 to 60. * fix(rate-limiter): bound hosted-key queue wait to execution budget; fix heartbeat + telemetry Tie the per-workspace hosted-key queue wait to the surrounding execution budget instead of a flat 5-minute cap. acquireKey now accepts the execution AbortSignal (threaded from ExecutionContext): when present, the wait is bounded by the run's actual plan timeout / cancellation, with the enterprise async ceiling as a backstop; when absent it falls back to MAX_QUEUE_WAIT_MS. This lets long-running async (Trigger.dev) runs use their full budget while no longer letting a single queued call burn a short sync run's entire budget. Also addresses Greptile review: - P1: share one lastHeartbeatAt across all wait phases and cap every sleep to HEARTBEAT_REFRESH_INTERVAL_MS so a long low-RPM retryAfterMs can no longer let the head's heartbeat lapse mid-wait and break FIFO ordering. - P2: derive hostedKeyQueueWaited telemetry reason from the actual bottleneck (queue_position / dimension / actor_requests) instead of hardcoding it. Co-Authored-By: Claude Opus 4.7 (1M context) * feat(rate-limiter): make hosted-key queue waits abort-interruptible Replace the plain capped sleeps in the queue-head and bucket-capacity wait loops with an interruptibleSleep that resolves early when the execution AbortSignal fires (timeout or cancellation), cleaning up its own timer and listener. Previously a cancelled/timed-out run could overshoot by up to the heartbeat cap (~10s) before the loop re-checked its budget; now it wakes within a tick. The cap remains for heartbeat renewal. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- .../hosted-key-rate-limiter.test.ts | 361 ++++++++++++++- .../hosted-key/hosted-key-rate-limiter.ts | 419 ++++++++++++++++-- .../rate-limiter/hosted-key/queue.test.ts | 226 ++++++++++ .../lib/core/rate-limiter/hosted-key/queue.ts | 203 +++++++++ apps/sim/lib/core/telemetry.ts | 44 ++ apps/sim/tools/index.ts | 88 +++- 6 files changed, 1293 insertions(+), 48 deletions(-) create mode 100644 apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts create mode 100644 apps/sim/lib/core/rate-limiter/hosted-key/queue.ts diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts index 50cf346222d..b14a34c634d 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts @@ -5,8 +5,13 @@ import type { TokenStatus, } from '@/lib/core/rate-limiter/storage' import { HostedKeyRateLimiter } from './hosted-key-rate-limiter' +import { HEARTBEAT_REFRESH_INTERVAL_MS, type HostedKeyQueue } from './queue' import type { CustomRateLimit, PerRequestRateLimit } from './types' +/** Force the queue wait to give up on the first iteration by reporting a retry time + * larger than the 5-minute MAX_QUEUE_WAIT_MS cap. */ +const RETRY_PAST_CAP_MS = 6 * 60 * 1000 + interface MockAdapter { consumeTokens: Mock getTokenStatus: Mock @@ -19,10 +24,30 @@ const createMockAdapter = (): MockAdapter => ({ resetBucket: vi.fn(), }) +interface MockQueue { + enqueue: Mock + checkHead: Mock + refreshHeartbeat: Mock + dequeue: Mock +} + +/** Stub queue that defaults to "you're at the head, no waiting" — i.e. acts as if the + * queue is empty or Redis is unavailable. Tests override per-call to simulate ordering. */ +const createMockQueue = (): MockQueue => { + const queue: MockQueue = { + enqueue: vi.fn().mockResolvedValue({ position: 0, enabled: true }), + checkHead: vi.fn().mockResolvedValue('head'), + refreshHeartbeat: vi.fn().mockResolvedValue(undefined), + dequeue: vi.fn().mockResolvedValue(undefined), + } + return queue +} + describe('HostedKeyRateLimiter', () => { const testProvider = 'exa' const envKeyPrefix = 'EXA_API_KEY' let mockAdapter: MockAdapter + let mockQueue: MockQueue let rateLimiter: HostedKeyRateLimiter let originalEnv: NodeJS.ProcessEnv @@ -34,7 +59,11 @@ describe('HostedKeyRateLimiter', () => { beforeEach(() => { vi.clearAllMocks() mockAdapter = createMockAdapter() - rateLimiter = new HostedKeyRateLimiter(mockAdapter as RateLimitStorageAdapter) + mockQueue = createMockQueue() + rateLimiter = new HostedKeyRateLimiter( + mockAdapter as RateLimitStorageAdapter, + mockQueue as unknown as HostedKeyQueue + ) originalEnv = { ...process.env } process.env.EXA_API_KEY_COUNT = '3' @@ -72,11 +101,12 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('No hosted keys configured') }) - it('should rate limit billing actor when they exceed their limit', async () => { + it('should rate limit billing actor when wait exceeds the queue cap', async () => { + // resetAt past the 5-minute cap forces the wait loop to bail immediately. const rateLimitedResult: ConsumeResult = { allowed: false, tokensRemaining: 0, - resetAt: new Date(Date.now() + 30000), + resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult) @@ -93,6 +123,33 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('Rate limit exceeded') }) + it('should wait for capacity then succeed when bucket refills within the cap', async () => { + // First call: bucket empty, refills in 100ms (well under cap). + // Second call: bucket has capacity, consumed. + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 100), + } + const allowed: ConsumeResult = { + allowed: true, + tokensRemaining: 9, + resetAt: new Date(Date.now() + 60000), + } + mockAdapter.consumeTokens.mockResolvedValueOnce(blocked).mockResolvedValueOnce(allowed) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-wait' + ) + + expect(result.success).toBe(true) + expect(result.key).toBe('test-key-1') + expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(2) + }) + it('should allow billing actor within their rate limit', async () => { const allowedResult: ConsumeResult = { allowed: true, @@ -184,6 +241,261 @@ describe('HostedKeyRateLimiter', () => { }) }) + describe('FIFO queue ordering', () => { + const allowed: ConsumeResult = { + allowed: true, + tokensRemaining: 9, + resetAt: new Date(Date.now() + 60000), + } + + it('enqueues every call onto the per-workspace+provider queue', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + + await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1') + + expect(mockQueue.enqueue).toHaveBeenCalledWith( + testProvider, + 'workspace-1', + expect.any(String) + ) + }) + + it('always dequeues at the end of a successful acquisition', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + + await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1') + + expect(mockQueue.dequeue).toHaveBeenCalledWith( + testProvider, + 'workspace-1', + expect.any(String) + ) + }) + + it('always dequeues even when the call fails (no keys configured)', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + process.env.EXA_API_KEY_COUNT = '0' + + await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1') + + expect(mockQueue.dequeue).toHaveBeenCalled() + }) + + it('waits at the head of the queue before consuming from the bucket', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + // First two checkHead calls say we're waiting; third says we're up. + mockQueue.checkHead + .mockResolvedValueOnce('waiting') + .mockResolvedValueOnce('waiting') + .mockResolvedValueOnce('head') + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + + expect(result.success).toBe(true) + expect(mockQueue.checkHead).toHaveBeenCalledTimes(3) + // Bucket is only consumed once we reach the head. + expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(1) + }) + + it('refreshes the heartbeat while waiting at the head of the queue', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + + // We need the wait loop to iterate long enough for HEARTBEAT_REFRESH_INTERVAL_MS + // to elapse. Use fake timers so we don't actually sleep. + vi.useFakeTimers() + try { + // Queue says we're waiting forever — except after some time we're at head. + mockQueue.checkHead.mockImplementation(async () => { + // Advance past the heartbeat interval each time we poll, then say we're up. + vi.advanceTimersByTime(15_000) + return mockQueue.checkHead.mock.calls.length >= 2 ? 'head' : 'waiting' + }) + + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + // Drain pending timers so the sleep() resolves. + await vi.runAllTimersAsync() + await promise + + expect(mockQueue.refreshHeartbeat).toHaveBeenCalled() + } finally { + vi.useRealTimers() + } + }) + + it('returns 429 when the queue wait exceeds the cap', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + mockQueue.checkHead.mockResolvedValue('waiting') + + vi.useFakeTimers() + try { + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + // Burn past the 5-minute cap. + await vi.advanceTimersByTimeAsync(6 * 60 * 1000) + const result = await promise + + expect(result.success).toBe(false) + expect(result.billingActorRateLimited).toBe(true) + } finally { + vi.useRealTimers() + } + }) + + it('treats "missing" status as proceed (queue evicted, fall through to bucket race)', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + mockQueue.checkHead.mockResolvedValueOnce('missing') + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + + expect(result.success).toBe(true) + }) + }) + + describe('execution-budget-bounded waits', () => { + it('bails immediately when the execution signal is already aborted', async () => { + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 100), + } + mockAdapter.consumeTokens.mockResolvedValue(blocked) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1', + AbortSignal.abort() + ) + + expect(result.success).toBe(false) + expect(result.billingActorRateLimited).toBe(true) + // Aborted budget => give up on the first bucket check rather than looping. + expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(1) + }) + + it('stops waiting promptly when the signal aborts mid-sleep', async () => { + // Bucket reports a long refill, so the wait sleeps up to the heartbeat cap (10s). + // Aborting mid-sleep must wake the wait within a tick, not after the full interval. + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 10_000), + } + mockAdapter.consumeTokens.mockResolvedValue(blocked) + + const controller = new AbortController() + const start = Date.now() + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1', + controller.signal + ) + // Let the first bucket check run and the sleep begin, then abort. + await new Promise((resolve) => setTimeout(resolve, 20)) + controller.abort() + const result = await promise + + expect(result.success).toBe(false) + expect(result.billingActorRateLimited).toBe(true) + // Resolved well before the 10s capped sleep would otherwise have elapsed. + expect(Date.now() - start).toBeLessThan(2000) + }) + + it('keeps waiting past the no-signal fallback cap while the signal is live', async () => { + // A live (non-aborted) signal means the run still has budget, so the wait must not + // 429 at the 5-minute MAX_QUEUE_WAIT_MS fallback. The bucket frees up after ~7 min. + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 10_000), + } + const allowedResult: ConsumeResult = { + allowed: true, + tokensRemaining: 9, + resetAt: new Date(Date.now() + 60_000), + } + mockAdapter.consumeTokens.mockResolvedValue(blocked) + + vi.useFakeTimers() + try { + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1', + new AbortController().signal + ) + // Burn well past the 5-minute fallback cap — without a signal this would have 429'd. + await vi.advanceTimersByTimeAsync(7 * 60 * 1000) + mockAdapter.consumeTokens.mockResolvedValue(allowedResult) + await vi.advanceTimersByTimeAsync(HEARTBEAT_REFRESH_INTERVAL_MS) + const result = await promise + + expect(result.success).toBe(true) + expect(result.key).toBe('test-key-1') + } finally { + vi.useRealTimers() + } + }) + + it('refreshes the heartbeat during a long low-RPM bucket wait', async () => { + // Provider with a long refill (retryAfterMs >> heartbeat TTL). The sleep must be + // capped so the heartbeat is renewed and the head is not reaped mid-wait. + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 60_000), + } + const allowedResult: ConsumeResult = { + allowed: true, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 60_000), + } + mockAdapter.consumeTokens.mockResolvedValue(blocked) + + vi.useFakeTimers() + try { + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1', + new AbortController().signal + ) + await vi.advanceTimersByTimeAsync(3 * HEARTBEAT_REFRESH_INTERVAL_MS) + mockAdapter.consumeTokens.mockResolvedValue(allowedResult) + await vi.advanceTimersByTimeAsync(HEARTBEAT_REFRESH_INTERVAL_MS) + await promise + + expect(mockQueue.refreshHeartbeat).toHaveBeenCalled() + } finally { + vi.useRealTimers() + } + }) + }) + describe('acquireKey with custom rate limit', () => { const customRateLimit: CustomRateLimit = { mode: 'custom', @@ -197,11 +509,11 @@ describe('HostedKeyRateLimiter', () => { ], } - it('should enforce requestsPerMinute for custom mode', async () => { + it('should enforce requestsPerMinute for custom mode when wait exceeds the cap', async () => { const rateLimitedResult: ConsumeResult = { allowed: false, tokensRemaining: 0, - resetAt: new Date(Date.now() + 30000), + resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult) @@ -246,7 +558,7 @@ describe('HostedKeyRateLimiter', () => { expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(1) }) - it('should block request when a dimension is depleted', async () => { + it('should block request when a dimension wait exceeds the cap', async () => { const allowedConsume: ConsumeResult = { allowed: true, tokensRemaining: 4, @@ -258,7 +570,7 @@ describe('HostedKeyRateLimiter', () => { tokensAvailable: 0, maxTokens: 2000, lastRefillAt: new Date(), - nextRefillAt: new Date(Date.now() + 45000), + nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.getTokenStatus.mockResolvedValue(depleted) @@ -274,6 +586,39 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('tokens') }) + it('should wait for dimension capacity then succeed when budget refills', async () => { + const allowedConsume: ConsumeResult = { + allowed: true, + tokensRemaining: 4, + resetAt: new Date(Date.now() + 60000), + } + mockAdapter.consumeTokens.mockResolvedValue(allowedConsume) + + const depleted: TokenStatus = { + tokensAvailable: 0, + maxTokens: 2000, + lastRefillAt: new Date(), + nextRefillAt: new Date(Date.now() + 100), + } + const refilled: TokenStatus = { + tokensAvailable: 500, + maxTokens: 2000, + lastRefillAt: new Date(), + nextRefillAt: new Date(Date.now() + 60000), + } + mockAdapter.getTokenStatus.mockResolvedValueOnce(depleted).mockResolvedValueOnce(refilled) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + customRateLimit, + 'workspace-dim-wait' + ) + + expect(result.success).toBe(true) + expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(2) + }) + it('should pre-check all dimensions and block on first depleted one', async () => { const multiDimensionConfig: CustomRateLimit = { mode: 'custom', @@ -309,7 +654,7 @@ describe('HostedKeyRateLimiter', () => { tokensAvailable: 0, maxTokens: 100, lastRefillAt: new Date(), - nextRefillAt: new Date(Date.now() + 30000), + nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.getTokenStatus .mockResolvedValueOnce(tokensBudget) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index a20cf8413f3..10046696561 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -1,9 +1,14 @@ import { createLogger } from '@sim/logger' +import { sleep } from '@sim/utils/helpers' +import { generateShortId } from '@sim/utils/id' +import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' import { createStorageAdapter, type RateLimitStorageAdapter, type TokenBucketConfig, } from '@/lib/core/rate-limiter/storage' +import { PlatformEvents } from '@/lib/core/telemetry' +import { getHostedKeyQueue, HEARTBEAT_REFRESH_INTERVAL_MS, type HostedKeyQueue } from './queue' import { type AcquireKeyResult, type CustomRateLimit, @@ -16,6 +21,60 @@ import { const logger = createLogger('HostedKeyRateLimiter') +/** + * Fallback ceiling on how long a hosted-key acquisition waits for the per-workspace + * bucket to refill when no execution `AbortSignal` is available to bound the wait (e.g. + * a caller without a wired execution deadline, or Redis no-op mode). When a signal IS + * provided, the wait is instead bounded by the surrounding execution budget — the signal + * fires when the run hits its plan timeout or is cancelled — and this constant no longer + * applies (see {@link ABSOLUTE_MAX_QUEUE_WAIT_MS} for the backstop in that case). + */ +const MAX_QUEUE_WAIT_MS = 5 * 60 * 1000 + +/** + * Hard safety ceiling applied even when an execution `AbortSignal` is present, in case the + * signal never fires. Matches the longest possible execution budget (enterprise async) so + * it never truncates a legitimately long-running background run before its own deadline. + */ +const ABSOLUTE_MAX_QUEUE_WAIT_MS = getMaxExecutionTimeout() + +/** + * Floor on per-iteration sleep when the bucket reports `retryAfterMs <= 0`, + * which can happen due to clock skew or sub-millisecond resets. Prevents a + * tight retry loop hammering the storage adapter. + */ +const MIN_QUEUE_RETRY_DELAY_MS = 50 + +/** + * Poll interval while waiting to reach the head of the FIFO queue. 200ms balances + * acquisition latency (worst-case wait for advancement is one poll period) against + * Redis load — at this cadence, N waiters generate N×5 EVAL/sec, which is fine for + * the typical low-tens contention. Revisit if telemetry shows hot Redis under load. + */ +const QUEUE_HEAD_POLL_MS = 200 + +/** + * Sleep for `ms`, resolving early if `signal` aborts. Cleans up its own timer and listener + * so neither leaks. Callers don't need to distinguish an early (aborted) return from a normal + * one — the surrounding wait loop re-checks its budget immediately after and bails when the + * signal has fired. Falls back to a plain sleep when no signal is provided. + */ +function interruptibleSleep(ms: number, signal?: AbortSignal): Promise { + if (!signal) return sleep(ms) + if (signal.aborted) return Promise.resolve() + return new Promise((resolve) => { + const onAbort = () => { + clearTimeout(timer) + resolve() + } + const timer = setTimeout(() => { + signal.removeEventListener('abort', onAbort) + resolve() + }, ms) + signal.addEventListener('abort', onAbort, { once: true }) + }) +} + /** * Resolves env var names for a numbered key prefix using a `{PREFIX}_COUNT` env var. * E.g. with `EXA_API_KEY_COUNT=5`, returns `['EXA_API_KEY_1', ..., 'EXA_API_KEY_5']`. @@ -41,6 +100,18 @@ interface AvailableKey { envVarName: string } +/** + * Mutable heartbeat bookkeeping shared across every wait phase of a single `acquireKey` + * call. Carrying one `lastHeartbeatAt` across the queue-head wait and the bucket waits + * ensures the heartbeat-refresh cadence reflects the *actual* last write to Redis rather + * than resetting per phase, which could otherwise let the ticket heartbeat lapse and get + * reaped mid-wait (breaking FIFO ordering). + */ +interface WaitState { + /** Epoch ms of the last heartbeat write to Redis. */ + lastHeartbeatAt: number +} + /** * HostedKeyRateLimiter provides: * 1. Per-billing-actor rate limiting (enforced - blocks actors who exceed their limit) @@ -52,11 +123,13 @@ interface AvailableKey { */ export class HostedKeyRateLimiter { private storage: RateLimitStorageAdapter + private queue: HostedKeyQueue /** Round-robin counter per provider for even key distribution */ private roundRobinCounters = new Map() - constructor(storage?: RateLimitStorageAdapter) { + constructor(storage?: RateLimitStorageAdapter, queue?: HostedKeyQueue) { this.storage = storage ?? createStorageAdapter() + this.queue = queue ?? getHostedKeyQueue() } private buildActorStorageKey(provider: string, billingActorId: string): string { @@ -179,71 +252,341 @@ export class HostedKeyRateLimiter { * Acquire an available key via round-robin selection. * * For both modes: - * 1. Per-billing-actor request rate limiting (enforced): blocks actors who exceed their request limit + * 1. Per-billing-actor request rate limiting (enforced): the call enqueues itself + * onto a per-workspace+provider FIFO queue. Only the head of the queue attempts + * to consume from the token bucket, guaranteeing strict ordering across callers + * within a workspace. Different workspaces have independent queues and don't + * block each other. * 2. Round-robin key selection: cycles through available keys for even distribution * * For `custom` mode additionally: - * 3. Pre-checks dimension budgets: blocks if any dimension is already depleted + * 3. Pre-checks dimension budgets: head waits on dimension refill the same way it + * waits on actor request capacity. + * + * The wait is bounded by the surrounding execution budget: when `signal` is provided it + * fires at the run's plan timeout (or on cancellation), so a queued call uses its full + * available budget rather than a flat cap. When no signal is available the wait falls + * back to `MAX_QUEUE_WAIT_MS`. On exhaustion the call returns today's 429 result. The + * ticket is removed from the queue on exit regardless of success or failure. * * @param envKeyPrefix - Env var prefix (e.g. 'EXA_API_KEY'). Keys resolved via `{prefix}_COUNT`. * @param billingActorId - The billing actor (typically workspace ID) to rate limit against + * @param signal - Optional execution `AbortSignal`; bounds the queue wait to the run's budget. */ async acquireKey( provider: string, envKeyPrefix: string, config: HostedKeyRateLimitConfig, - billingActorId: string + billingActorId: string, + signal?: AbortSignal ): Promise { - if (config.requestsPerMinute) { - const rateLimitResult = await this.checkActorRateLimit(provider, billingActorId, config) - if (rateLimitResult) { + const ticketId = generateShortId() + const startedAt = Date.now() + const waitState: WaitState = { lastHeartbeatAt: startedAt } + const enqueueResult = await this.queue.enqueue(provider, billingActorId, ticketId) + + try { + // Wait for our turn at the head of the queue (no-op when Redis unavailable). + const headStatus = await this.waitForQueueHead( + provider, + billingActorId, + ticketId, + startedAt, + waitState, + signal + ) + if (headStatus.timedOut) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + reason: 'queue_position', + }) return { success: false, billingActorRateLimited: true, - retryAfterMs: rateLimitResult.retryAfterMs, - error: `Rate limit exceeded. Please wait ${Math.ceil(rateLimitResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + retryAfterMs: MAX_QUEUE_WAIT_MS, + error: `Rate limit exceeded — request waited too long in the queue. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, } } - } - if (config.mode === 'custom' && config.dimensions.length > 0) { - const dimensionResult = await this.preCheckDimensions(provider, billingActorId, config) - if (dimensionResult) { + let dimensionWaited = false + + if (config.requestsPerMinute) { + const rateLimitResult = await this.waitForActorCapacity( + provider, + billingActorId, + ticketId, + config, + startedAt, + waitState, + signal + ) + if (rateLimitResult.rateLimited) { + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: rateLimitResult.retryAfterMs, + error: `Rate limit exceeded. Please wait ${Math.ceil(rateLimitResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } + } + } + + if (config.mode === 'custom' && config.dimensions.length > 0) { + const dimensionResult = await this.waitForDimensionCapacity( + provider, + billingActorId, + ticketId, + config, + startedAt, + waitState, + signal + ) + if (dimensionResult.rateLimited) { + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: dimensionResult.retryAfterMs, + error: `Rate limit exceeded for ${dimensionResult.dimension}. Please wait ${Math.ceil(dimensionResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } + } + dimensionWaited = dimensionResult.waited + } + + const totalWaitedMs = Date.now() - startedAt + if (enqueueResult.enabled && (enqueueResult.position > 0 || totalWaitedMs > 100)) { + // Attribute the wait to its dominant cause: queue depth takes precedence (it's + // reported alongside queuePosition), otherwise the bucket phase that actually slept. + const reason: 'queue_position' | 'actor_requests' | 'dimension' = + enqueueResult.position > 0 + ? 'queue_position' + : dimensionWaited + ? 'dimension' + : 'actor_requests' + PlatformEvents.hostedKeyQueueWaited({ + provider, + workspaceId: billingActorId, + waitedMs: totalWaitedMs, + attempts: 1, + reason, + queuePosition: enqueueResult.position, + }) + } + + const envKeys = resolveEnvKeys(envKeyPrefix) + const availableKeys = this.getAvailableKeys(envKeys) + + if (availableKeys.length === 0) { + logger.warn(`No hosted keys configured for provider ${provider}`) return { success: false, - billingActorRateLimited: true, - retryAfterMs: dimensionResult.retryAfterMs, - error: `Rate limit exceeded for ${dimensionResult.dimension}. Please wait ${Math.ceil(dimensionResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + error: `No hosted keys configured for ${provider}`, } } - } - const envKeys = resolveEnvKeys(envKeyPrefix) - const availableKeys = this.getAvailableKeys(envKeys) + const counter = this.roundRobinCounters.get(provider) ?? 0 + const selected = availableKeys[counter % availableKeys.length] + this.roundRobinCounters.set(provider, counter + 1) + + logger.debug(`Selected hosted key for ${provider}`, { + provider, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + }) - if (availableKeys.length === 0) { - logger.warn(`No hosted keys configured for provider ${provider}`) return { - success: false, - error: `No hosted keys configured for ${provider}`, + success: true, + key: selected.key, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + } + } finally { + // Always remove our ticket so the next caller can advance, regardless of whether + // we succeeded, hit the cap, or threw. Best-effort; safe to call multiple times. + await this.queue.dequeue(provider, billingActorId, ticketId) + } + } + + /** + * Remaining time budget for waiting, in milliseconds. When an execution `AbortSignal` is + * present it governs the wait: the budget is exhausted the moment the signal aborts (the + * run hit its plan timeout or was cancelled), with {@link ABSOLUTE_MAX_QUEUE_WAIT_MS} as a + * backstop should the signal never fire. Without a signal we fall back to the flat + * {@link MAX_QUEUE_WAIT_MS} ceiling. + */ + private remainingWaitBudgetMs(startedAt: number, signal?: AbortSignal): number { + if (signal?.aborted) return 0 + const ceiling = signal ? ABSOLUTE_MAX_QUEUE_WAIT_MS : MAX_QUEUE_WAIT_MS + return ceiling - (Date.now() - startedAt) + } + + /** Refresh the ticket heartbeat if the refresh interval has elapsed since the last write. */ + private async maybeRefreshHeartbeat( + provider: string, + billingActorId: string, + ticketId: string, + waitState: WaitState + ): Promise { + if (Date.now() - waitState.lastHeartbeatAt >= HEARTBEAT_REFRESH_INTERVAL_MS) { + await this.queue.refreshHeartbeat(provider, billingActorId, ticketId) + waitState.lastHeartbeatAt = Date.now() + } + } + + /** + * Sleep before the next bucket re-check, refreshing the heartbeat first if due. The sleep + * is capped at {@link HEARTBEAT_REFRESH_INTERVAL_MS} so that no single wait can outlive the + * heartbeat TTL — even when the bucket reports a long `retryAfterMs` (e.g. low-RPM + * providers). Without this cap a multi-second sleep could let the heartbeat lapse, the + * head get reaped as dead, and a second caller advance and race us for the bucket. The + * sleep also resolves early if `signal` aborts, so a cancelled/timed-out run stops waiting + * promptly rather than overshooting by up to the cap. + */ + private async heartbeatAwareSleep( + provider: string, + billingActorId: string, + ticketId: string, + desiredMs: number, + waitState: WaitState, + signal?: AbortSignal + ): Promise { + await this.maybeRefreshHeartbeat(provider, billingActorId, ticketId, waitState) + const sleepMs = Math.min( + Math.max(MIN_QUEUE_RETRY_DELAY_MS, desiredMs), + HEARTBEAT_REFRESH_INTERVAL_MS + ) + await interruptibleSleep(sleepMs, signal) + } + + /** + * Block until our ticket reaches the head of the queue. Refreshes the heartbeat on a + * regular cadence so we don't get reaped as dead. Returns `timedOut: true` once the wait + * budget is exhausted before reaching the head (see {@link remainingWaitBudgetMs}). + * + * No-op when Redis is unavailable (queue.enqueue returns enabled=false and checkHead + * always returns 'head'). + */ + private async waitForQueueHead( + provider: string, + billingActorId: string, + ticketId: string, + startedAt: number, + waitState: WaitState, + signal?: AbortSignal + ): Promise<{ timedOut: boolean }> { + while (true) { + const status = await this.queue.checkHead(provider, billingActorId, ticketId) + if (status === 'head') return { timedOut: false } + + // 'missing' shouldn't normally happen — the queue list TTL (10min) outlives a typical + // wait — but if it does (e.g. Redis flushed mid-wait), treat as "you're up" so the + // caller proceeds to the bucket race rather than hanging forever. + if (status === 'missing') return { timedOut: false } + + if (this.remainingWaitBudgetMs(startedAt, signal) <= 0) { + return { timedOut: true } + } + + await this.maybeRefreshHeartbeat(provider, billingActorId, ticketId, waitState) + await interruptibleSleep(QUEUE_HEAD_POLL_MS, signal) + } + } + + /** + * Wait for actor request-rate capacity. Called once we're at the head of the FIFO + * queue, so other callers can't race us for the next token — they're blocked behind us + * at queue level. Re-checks the bucket until the wait budget is exhausted (accounting for + * time already spent waiting in the queue). `waited` reports whether the loop ever slept. + */ + private async waitForActorCapacity( + provider: string, + billingActorId: string, + ticketId: string, + config: HostedKeyRateLimitConfig, + startedAt: number, + waitState: WaitState, + signal?: AbortSignal + ): Promise< + { rateLimited: false; waited: boolean } | { rateLimited: true; retryAfterMs: number } + > { + let waited = false + + while (true) { + const result = await this.checkActorRateLimit(provider, billingActorId, config) + if (!result) return { rateLimited: false, waited } + + const remaining = this.remainingWaitBudgetMs(startedAt, signal) + if (remaining <= 0 || result.retryAfterMs > remaining) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + reason: 'actor_requests', + }) + return { rateLimited: true, retryAfterMs: result.retryAfterMs } } + + waited = true + await this.heartbeatAwareSleep( + provider, + billingActorId, + ticketId, + result.retryAfterMs, + waitState, + signal + ) } + } - const counter = this.roundRobinCounters.get(provider) ?? 0 - const selected = availableKeys[counter % availableKeys.length] - this.roundRobinCounters.set(provider, counter + 1) - - logger.debug(`Selected hosted key for ${provider}`, { - provider, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, - }) - - return { - success: true, - key: selected.key, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, + /** + * Wait for custom-mode dimension capacity. `preCheckDimensions` is read-only — it does + * not consume — so re-running it after a sleep is safe and does not double-charge. + * Post-execution `reportUsage` performs the actual consumption. `waited` reports whether + * the loop ever slept. + */ + private async waitForDimensionCapacity( + provider: string, + billingActorId: string, + ticketId: string, + config: CustomRateLimit, + startedAt: number, + waitState: WaitState, + signal?: AbortSignal + ): Promise< + | { rateLimited: false; waited: boolean } + | { rateLimited: true; retryAfterMs: number; dimension: string } + > { + let waited = false + + while (true) { + const result = await this.preCheckDimensions(provider, billingActorId, config) + if (!result) return { rateLimited: false, waited } + + const remaining = this.remainingWaitBudgetMs(startedAt, signal) + if (remaining <= 0 || result.retryAfterMs > remaining) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + reason: 'dimension', + dimension: result.dimension, + }) + return { + rateLimited: true, + retryAfterMs: result.retryAfterMs, + dimension: result.dimension, + } + } + + waited = true + await this.heartbeatAwareSleep( + provider, + billingActorId, + ticketId, + result.retryAfterMs, + waitState, + signal + ) } } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts new file mode 100644 index 00000000000..7405c1c5e61 --- /dev/null +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts @@ -0,0 +1,226 @@ +import { redisConfigMock, redisConfigMockFns } from '@sim/testing' +import { beforeEach, describe, expect, it, type Mock, vi } from 'vitest' +import { HostedKeyQueue } from './queue' + +vi.mock('@/lib/core/config/redis', () => redisConfigMock) + +interface MockPipeline { + rpush: Mock + expire: Mock + set: Mock + lrem: Mock + del: Mock + exec: Mock +} + +interface MockRedis { + multi: Mock + set: Mock + eval: Mock + pipeline: MockPipeline +} + +function createFakeRedis(): MockRedis { + const pipeline: MockPipeline = { + rpush: vi.fn(), + expire: vi.fn(), + set: vi.fn(), + lrem: vi.fn(), + del: vi.fn(), + exec: vi.fn(), + } + // Pipeline methods return the pipeline for chaining. + pipeline.rpush.mockReturnValue(pipeline) + pipeline.expire.mockReturnValue(pipeline) + pipeline.set.mockReturnValue(pipeline) + pipeline.lrem.mockReturnValue(pipeline) + pipeline.del.mockReturnValue(pipeline) + + return { + multi: vi.fn(() => pipeline), + set: vi.fn(), + eval: vi.fn(), + pipeline, + } +} + +const provider = 'exa' +const workspaceId = 'workspace-1' +const ticketId = 'ticket-1' + +describe('HostedKeyQueue', () => { + let queue: HostedKeyQueue + let mockRedis: MockRedis + + beforeEach(() => { + vi.clearAllMocks() + mockRedis = createFakeRedis() + redisConfigMockFns.mockGetRedisClient.mockReturnValue(mockRedis) + queue = new HostedKeyQueue() + }) + + describe('enqueue', () => { + it('returns position 0 when first in line', async () => { + // RPUSH returns new list length; first push -> 1. + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 1], + [null, 1], + [null, 'OK'], + ]) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result).toEqual({ position: 0, enabled: true }) + expect(mockRedis.pipeline.rpush).toHaveBeenCalledWith( + 'hosted-queue:exa:workspace-1', + ticketId + ) + expect(mockRedis.pipeline.set).toHaveBeenCalledWith( + 'hosted-queue-tkt:exa:workspace-1:ticket-1', + '1', + 'EX', + expect.any(Number) + ) + }) + + it('returns higher position when others are ahead', async () => { + // Length 5 after push -> position 4. + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 5], + [null, 1], + [null, 'OK'], + ]) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result.position).toBe(4) + }) + + it('falls back to enabled=false when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result).toEqual({ position: 0, enabled: false }) + }) + + it('falls back to enabled=false on Redis error', async () => { + mockRedis.pipeline.exec.mockRejectedValueOnce(new Error('connection lost')) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result.enabled).toBe(false) + }) + }) + + describe('checkHead', () => { + it('returns "head" when our ticket is at the head', async () => { + mockRedis.eval.mockResolvedValueOnce('head') + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('head') + }) + + it('returns "waiting" when someone else is the head', async () => { + mockRedis.eval.mockResolvedValueOnce('waiting') + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('waiting') + }) + + it('returns "missing" when our ticket is not in the queue', async () => { + mockRedis.eval.mockResolvedValueOnce('missing') + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('missing') + }) + + it('passes queue list key, heartbeat prefix, and ticketId to the Lua script', async () => { + mockRedis.eval.mockResolvedValueOnce('head') + + await queue.checkHead(provider, workspaceId, ticketId) + + expect(mockRedis.eval).toHaveBeenCalledWith( + expect.stringContaining('lindex'), + 1, + 'hosted-queue:exa:workspace-1', + 'hosted-queue-tkt:exa:workspace-1:', + ticketId + ) + }) + + it('fails open to "head" on Redis error so callers do not hang', async () => { + mockRedis.eval.mockRejectedValueOnce(new Error('boom')) + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('head') + }) + + it('returns "head" no-op when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('head') + }) + }) + + describe('refreshHeartbeat', () => { + it('writes the heartbeat key with TTL', async () => { + mockRedis.set.mockResolvedValueOnce('OK') + + await queue.refreshHeartbeat(provider, workspaceId, ticketId) + + expect(mockRedis.set).toHaveBeenCalledWith( + 'hosted-queue-tkt:exa:workspace-1:ticket-1', + '1', + 'EX', + expect.any(Number) + ) + }) + + it('is a no-op when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined() + expect(mockRedis.set).not.toHaveBeenCalled() + }) + }) + + describe('dequeue', () => { + it('removes the ticket from the list and deletes the heartbeat', async () => { + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 1], + [null, 1], + ]) + + await queue.dequeue(provider, workspaceId, ticketId) + + expect(mockRedis.pipeline.lrem).toHaveBeenCalledWith( + 'hosted-queue:exa:workspace-1', + 1, + ticketId + ) + expect(mockRedis.pipeline.del).toHaveBeenCalledWith( + 'hosted-queue-tkt:exa:workspace-1:ticket-1' + ) + }) + + it('is a no-op when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + await expect(queue.dequeue(provider, workspaceId, ticketId)).resolves.toBeUndefined() + expect(mockRedis.multi).not.toHaveBeenCalled() + }) + + it('swallows errors so callers do not throw on cleanup', async () => { + mockRedis.pipeline.exec.mockRejectedValueOnce(new Error('connection lost')) + + await expect(queue.dequeue(provider, workspaceId, ticketId)).resolves.toBeUndefined() + }) + }) +}) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts new file mode 100644 index 00000000000..4a7ecd5aed1 --- /dev/null +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts @@ -0,0 +1,203 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('HostedKeyQueue') + +/** + * Per-ticket heartbeat TTL. Refreshed by the head while it's actively waiting + * on the bucket. If the holder crashes, the heartbeat key expires, and the next + * caller sees the head as dead and removes it (lazy cleanup). + */ +const TICKET_HEARTBEAT_TTL_SECONDS = 30 + +/** How often the head should refresh its heartbeat while waiting. */ +export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000 + +/** + * TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues + * (whole workspace went silent) from sticking around forever in Redis. + */ +const QUEUE_LIST_TTL_SECONDS = 600 + +const queueListKey = (provider: string, billingActorId: string): string => + `hosted-queue:${provider}:${billingActorId}` + +const heartbeatKey = (provider: string, billingActorId: string, ticketId: string): string => + `hosted-queue-tkt:${provider}:${billingActorId}:${ticketId}` + +/** + * Atomically reap any dead head, then return our ticket's status. Combines what + * would otherwise be 3 round-trips (reap, LINDEX, LPOS) into one EVAL — meaningful + * because callers poll this every ~200ms while waiting in the queue. + * + * `KEYS[1]` = queue list key. `ARGV[1]` = heartbeat key prefix. `ARGV[2]` = our ticketId. + * + * Reaping is bounded: at most one dead head is removed per call. If multiple dead + * tickets pile up at the head, subsequent polls will clean them one by one. This + * keeps the script O(1) rather than O(N) and is sufficient because queue depth + * is bounded by concurrent callers per workspace (typically tens). + * + * Returns one of: "head", "waiting", "missing". + */ +const CHECK_HEAD_SCRIPT = ` +local head = redis.call("lindex", KEYS[1], 0) +if head and redis.call("exists", ARGV[1] .. head) == 0 then + redis.call("lrem", KEYS[1], 1, head) + head = redis.call("lindex", KEYS[1], 0) +end +if not head then + return "missing" +end +if head == ARGV[2] then + return "head" +end +if redis.call("lpos", KEYS[1], ARGV[2]) == false then + return "missing" +end +return "waiting" +` + +export interface EnqueueResult { + /** Position at the moment of enqueue (0 = head, you go next). */ + position: number + /** Whether Redis was available — false means we're in no-op mode. */ + enabled: boolean +} + +/** + * Per-workspace+provider FIFO queue for hosted-key acquisitions. + * + * Callers `enqueue` to claim a position, then `waitForHead` until they're at + * the head, then attempt to consume from the token bucket. On success or cap + * exceeded, they `dequeue` to make room for the next caller. + * + * No-op when Redis is unavailable: every method returns "you're the head / + * empty / etc." so the rate limiter falls back to plain bucket racing. + */ +export class HostedKeyQueue { + /** + * Push a ticket onto the tail of the queue and write a heartbeat. Returns the + * position at enqueue time (0 = head, ready to proceed). + */ + async enqueue( + provider: string, + billingActorId: string, + ticketId: string + ): Promise { + const redis = getRedisClient() + if (!redis) { + return { position: 0, enabled: false } + } + + const listKey = queueListKey(provider, billingActorId) + const hbKey = heartbeatKey(provider, billingActorId, ticketId) + + try { + const pipeline = redis.multi() + pipeline.rpush(listKey, ticketId) + pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS) + pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + const results = await pipeline.exec() + // results[0] is the rpush response: [err, length] + const length = results?.[0] && typeof results[0][1] === 'number' ? results[0][1] : 1 + // Position is length - 1 (just-pushed at the tail). + return { position: length - 1, enabled: true } + } catch (error) { + logger.warn(`Queue enqueue failed for ${listKey}`, { error: toError(error).message }) + return { position: 0, enabled: false } + } + } + + /** + * Check whether `ticketId` is currently at the head of the queue. If the head + * is a different ticket but its heartbeat has expired (caller crashed), reap + * it and re-check on the next poll. + * + * Returns: + * - "head": you're at the head, proceed to consume from the bucket + * - "waiting": someone else is the head and they're alive + * - "missing": your ticket isn't in the queue at all (e.g. queue list TTL + * expired); caller should re-enqueue or treat as enabled=false + */ + async checkHead( + provider: string, + billingActorId: string, + ticketId: string + ): Promise<'head' | 'waiting' | 'missing'> { + const redis = getRedisClient() + if (!redis) { + return 'head' + } + + const listKey = queueListKey(provider, billingActorId) + const hbPrefix = `hosted-queue-tkt:${provider}:${billingActorId}:` + + try { + const result = (await redis.eval(CHECK_HEAD_SCRIPT, 1, listKey, hbPrefix, ticketId)) as + | 'head' + | 'waiting' + | 'missing' + return result + } catch (error) { + logger.warn(`Queue checkHead failed for ${listKey}`, { error: toError(error).message }) + // Fail-open: treat as head so the caller proceeds rather than hanging. + return 'head' + } + } + + /** + * Refresh the ticket's heartbeat. Called periodically by the head while it's + * waiting on the bucket so it doesn't get reaped as dead. + */ + async refreshHeartbeat( + provider: string, + billingActorId: string, + ticketId: string + ): Promise { + const redis = getRedisClient() + if (!redis) return + + const hbKey = heartbeatKey(provider, billingActorId, ticketId) + try { + await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + } catch (error) { + logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, { + error: toError(error).message, + }) + } + } + + /** + * Remove a ticket from the queue and its heartbeat key. Best-effort; safe to + * call multiple times. LREM count=1 removes at most one matching entry. + */ + async dequeue(provider: string, billingActorId: string, ticketId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const listKey = queueListKey(provider, billingActorId) + const hbKey = heartbeatKey(provider, billingActorId, ticketId) + try { + const pipeline = redis.multi() + pipeline.lrem(listKey, 1, ticketId) + pipeline.del(hbKey) + await pipeline.exec() + } catch (error) { + logger.warn(`Queue dequeue failed for ${listKey}`, { error: toError(error).message }) + } + } +} + +let cachedQueue: HostedKeyQueue | null = null + +export function getHostedKeyQueue(): HostedKeyQueue { + if (!cachedQueue) { + cachedQueue = new HostedKeyQueue() + } + return cachedQueue +} + +export function resetHostedKeyQueue(): void { + cachedQueue = null +} diff --git a/apps/sim/lib/core/telemetry.ts b/apps/sim/lib/core/telemetry.ts index 6751b4fa1e3..0ae2be8c1d9 100644 --- a/apps/sim/lib/core/telemetry.ts +++ b/apps/sim/lib/core/telemetry.ts @@ -1002,6 +1002,50 @@ export const PlatformEvents = { }) }, + /** + * Track a successful hosted-key acquisition that had to wait — either for a slot at + * the head of the FIFO queue, or for the actor/dimension bucket to refill once at the + * head. `queuePosition` is the position at the moment of enqueue (0 = ready to proceed). + */ + hostedKeyQueueWaited: (attrs: { + provider: string + workspaceId: string + waitedMs: number + attempts: number + reason: 'actor_requests' | 'dimension' | 'queue_position' + dimension?: string + queuePosition?: number + }) => { + trackPlatformEvent('platform.hosted_key.queue_waited', { + 'provider.id': attrs.provider, + 'workspace.id': attrs.workspaceId, + 'queue.waited_ms': attrs.waitedMs, + 'queue.attempts': attrs.attempts, + 'queue.reason': attrs.reason, + ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + ...(attrs.queuePosition != null && { 'queue.position': attrs.queuePosition }), + }) + }, + + /** + * Track a hosted-key acquisition that exceeded the queue wait cap and fell back to a 429. + */ + hostedKeyQueueWaitExceeded: (attrs: { + provider: string + workspaceId: string + waitedMs: number + reason: 'actor_requests' | 'dimension' | 'queue_position' + dimension?: string + }) => { + trackPlatformEvent('platform.hosted_key.queue_wait_exceeded', { + 'provider.id': attrs.provider, + 'workspace.id': attrs.workspaceId, + 'queue.waited_ms': attrs.waitedMs, + 'queue.reason': attrs.reason, + ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + }) + }, + /** * Track chat deployed (workflow deployed as chat interface) */ diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 66a641a72e6..fb2c9584c0b 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -272,7 +272,8 @@ async function injectHostedKeyIfNeeded( provider, envKeyPrefix, rateLimit, - billingActorId + billingActorId, + executionContext?.abortSignal ) if (!acquireResult.success && acquireResult.billingActorRateLimited) { @@ -318,6 +319,49 @@ async function injectHostedKeyIfNeeded( } } +/** + * Re-acquire a hosted key after upstream-429 retries have been exhausted. Calls + * `acquireKey` (which now blocks on the per-workspace bucket) and re-injects the + * fresh key into `params`. Returns false if no key could be obtained — caller + * should re-throw the original upstream 429. + * + * Does not consult BYOK. We only enter this path from inside the hosted-key + * branch of `executeTool`, so BYOK has already been ruled out for this call. + */ +async function reacquireHostedKey( + tool: ToolConfig, + params: Record, + executionContext: ExecutionContext | undefined, + requestId: string +): Promise { + if (!tool.hosting) return false + const { envKeyPrefix, apiKeyParam, byokProviderId, rateLimit } = tool.hosting + const { workspaceId } = resolveToolScope(params, executionContext) + if (!workspaceId) return false + + const provider = byokProviderId || tool.id + const acquireResult = await getHostedKeyRateLimiter().acquireKey( + provider, + envKeyPrefix, + rateLimit, + workspaceId, + executionContext?.abortSignal + ) + + if (!acquireResult.success || !acquireResult.key) { + logger.warn( + `[${requestId}] Re-acquire of hosted key for ${tool.id} failed: ${acquireResult.error ?? 'unknown'}` + ) + return false + } + + params[apiKeyParam] = acquireResult.key + logger.info( + `[${requestId}] Re-acquired hosted key for ${tool.id} (${acquireResult.envVarName}) after upstream throttling` + ) + return true +} + /** * Check if an error is a rate limit (throttling) or quota exhaustion error. * Some providers (e.g. Perplexity) return 401/403 with "insufficient_quota" @@ -344,11 +388,23 @@ interface RetryContext { toolId: string envVarName: string executionContext?: ExecutionContext + /** + * Optional callback invoked after the local exponential backoff has been exhausted by + * upstream 429s. Should re-enter the per-workspace hosted-key queue (which now blocks + * on the bucket) and return a fresh execution thunk bound to the newly acquired key. + * If the callback returns null, we give up and re-throw the last error. + */ + reacquireAfterRetriesExhausted?: () => Promise<(() => Promise) | null> } /** * Execute a function with exponential backoff retry for rate limiting errors. * Only used for hosted key requests. Tracks rate limit events via telemetry. + * + * On terminal upstream 429, optionally re-enters the hosted-key queue (which waits for + * the per-workspace bucket to refill) and retries once with a freshly acquired key. + * This handles the case where the upstream provider's limit is tighter than ours — we + * re-queue the call instead of surfacing the error. */ async function executeWithRetry( fn: () => Promise, @@ -356,7 +412,8 @@ async function executeWithRetry( maxRetries = 3, baseDelayMs = 1000 ): Promise { - const { requestId, toolId, envVarName, executionContext } = context + const { requestId, toolId, envVarName, executionContext, reacquireAfterRetriesExhausted } = + context let lastError: unknown for (let attempt = 0; attempt <= maxRetries; attempt++) { @@ -367,6 +424,23 @@ async function executeWithRetry( if (!isRateLimitError(error) || attempt === maxRetries) { if (isRateLimitError(error) && attempt === maxRetries) { + if (reacquireAfterRetriesExhausted) { + try { + const requeued = await reacquireAfterRetriesExhausted() + if (requeued) { + logger.warn( + `[${requestId}] Upstream retries exhausted for ${toolId} (${envVarName}); re-queued and retrying once with fresh key` + ) + return (await requeued()) as T + } + } catch (requeueError) { + logger.error( + `[${requestId}] Re-queue after exhausted upstream retries failed for ${toolId}`, + { error: toError(requeueError).message } + ) + } + } + PlatformEvents.hostedKeyUserThrottled({ toolId, reason: 'upstream_retries_exhausted', @@ -1099,6 +1173,16 @@ export async function executeTool( toolId, envVarName: hostedKeyInfo.envVarName!, executionContext, + reacquireAfterRetriesExhausted: async () => { + const reacquired = await reacquireHostedKey( + tool, + contextParams, + executionContext, + requestId + ) + if (!reacquired) return null + return () => executeToolRequest(toolId, tool, contextParams) + }, }) : await executeToolRequest(toolId, tool, contextParams, signal) From 0fedceb4c3c659bdd2a65e0de34daa8c9201cd04 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 26 May 2026 17:30:30 -0700 Subject: [PATCH 2/5] log(db): Add db failure cause log message (#4749) --- .../lib/workflows/executor/execution-core.ts | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index bffc4d4c868..7fb35a49103 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -5,6 +5,7 @@ import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' +import { filterUndefined } from '@sim/utils/object' import { mergeSubblockStateWithValues } from '@sim/workflow-persistence/subblocks' import type { Edge } from 'reactflow' import { z } from 'zod' @@ -39,6 +40,49 @@ const logger = createLogger('ExecutionCore') const EnvVarsSchema = z.record(z.string(), z.string()) +/** + * Surfaces the underlying driver error from a wrapped error chain. + * + * Drizzle wraps the original `postgres`/Node driver error as `error.cause`, + * which the logger's Error serializer drops (it only emits own-enumerable + * keys). Walking the chain from `error` itself and preferring the first error + * carrying a `code` exposes the diagnostic fields — notably the Postgres + * `code` — that distinguish a connection drop (`08006`), a rejected connection + * (`53300`), and a statement timeout (`57014`) behind an opaque "Failed query" + * message. Starting at `error` also captures a bare driver error that reaches + * this path unwrapped; when no error in the chain carries a `code`, it falls + * back to the first wrapped cause (the top-level error is already logged on its + * own, so it is not echoed here). + */ +function describeErrorCause(error: unknown): Record | undefined { + try { + let driver: (Error & Record) | undefined + let current: unknown = error + for (let depth = 0; depth < 10 && current instanceof Error; depth++) { + const candidate = current as Error & Record + if (candidate.code !== undefined) { + driver = candidate + break + } + if (depth === 1) driver = candidate + current = candidate.cause + } + if (!driver) return undefined + return filterUndefined({ + name: driver.name, + message: driver.message, + code: driver.code, + severity: driver.severity, + detail: driver.detail, + routine: driver.routine, + errno: driver.errno, + syscall: driver.syscall, + }) + } catch { + return undefined + } +} + export interface ExecuteWorkflowCoreOptions { snapshot: ExecutionSnapshot callbacks: ExecutionCallbacks @@ -682,7 +726,12 @@ export async function executeWorkflowCore( return result } catch (error: unknown) { - logger.error(`[${requestId}] Execution failed:`, error) + const errorCause = describeErrorCause(error) + logger.error( + `[${requestId}] Execution failed:`, + error, + ...(errorCause ? [{ cause: errorCause }] : []) + ) await waitForLifecycleCallbacks() From a1aa168cfe53dc965dff5b6c9d9fd682f9a93d6e Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 26 May 2026 18:57:35 -0700 Subject: [PATCH 3/5] improvement(schedules): jitter scheduled execution starts by 0-30s (#4750) Cron schedules all fire on the same boundary (e.g. every :00), stampeding the Postgres connection pool at the top of each minute/hour. Spread each due schedule's start across a [0, 30s) window via trigger.dev's delay option (no compute billed during the delay). Wires the previously-unused EnqueueOptions.delayMs through the trigger.dev backend. --- apps/sim/app/api/schedules/execute/route.ts | 9 +++++++++ apps/sim/lib/core/async-jobs/backends/trigger-dev.ts | 3 +++ 2 files changed, 12 insertions(+) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 7891fcd01b1..33fb374bbd5 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -26,6 +26,14 @@ const JOB_CHUNK_SIZE = 100 const MAX_TICK_DURATION_MS = 3 * 60 * 1000 const STALE_SCHEDULE_CLAIM_MS = getMaxExecutionTimeout() +/** + * Upper bound (ms) for the random start delay applied to each scheduled + * execution. Cron schedules all fire on the same boundary (e.g. every `:00`), + * which stampedes the database connection pool at the top of each minute/hour. + * Spreading starts across a [0, 30s) window smooths that burst. + */ +const SCHEDULE_JITTER_MAX_MS = 30_000 + const dueFilter = (queuedAt: Date) => and( isNull(workflowSchedule.archivedAt), @@ -217,6 +225,7 @@ async function processScheduleItem( const jobId = await jobQueue.enqueue('schedule-execution', payload, { jobId: scheduleJobId, concurrencyKey: scheduleJobId, + delayMs: Math.floor(Math.random() * SCHEDULE_JITTER_MAX_MS), metadata: { workflowId: schedule.workflowId ?? undefined, workspaceId: resolvedWorkspaceId ?? undefined, diff --git a/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts b/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts index ef0fcaed65f..6e9cbd063b4 100644 --- a/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts +++ b/apps/sim/lib/core/async-jobs/backends/trigger-dev.ts @@ -81,6 +81,9 @@ export class TriggerDevJobQueue implements JobQueueBackend { triggerOptions.idempotencyKey = options.jobId triggerOptions.idempotencyKeyTTL = '14d' } + if (options?.delayMs && options.delayMs > 0) { + triggerOptions.delay = new Date(Date.now() + options.delayMs) + } const handle = await tasks.trigger(taskId, enrichedPayload, triggerOptions) logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId, tags }) From 65e2fe81c1f5c63095d856f89508a9683bbe12d8 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 27 May 2026 00:26:46 -0700 Subject: [PATCH 4/5] feat(tables): Add enrichment table column type (#4752) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(tables): native enrichments sidebar + workflow input mapping Add a Clay-style enrichments catalog to the table view and wire per-row input mapping into workflow-backed columns. - New "Enrichments" entry in the New-column dropdown opens a sliding panel listing curated enrichment templates; picking one swaps to the workflow config in-place (no cross-slide) with a back button. - Type the workflow sidebar as manual | enrichment; enrichment hides the launch + add-column-inputs affordances. - Add a "Workflow inputs" advanced panel mapping Start-block input fields to table columns (left-of-workflow columns only), with name-match auto-fill and collapsible input-mapping-style rows. - Persist type + inputMappings on the workflow group (types, contract, route, service, hook) — jsonb, no migration. - Consume inputMappings at run time: when present, feed Start-block fields from the mapped columns; otherwise fall back to name-match spread. - Clean up inputMappings on column rename/delete (stripGroupDeps + renameColumn). Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(emcn): extract CollapsibleCard and reuse for input mapping Pull the collapsible field-card markup (surface-4 header + surface-2 body, click/keyboard toggle, truncated title + optional badge) into a shared `CollapsibleCard` emcn component, and use it in the workflow-builder input mapping rows and the table sidebar's input-mapping panel. Co-Authored-By: Claude Opus 4.7 (1M context) * feat(tables): code-defined enrichment registry run directly per row Enrichments are now TS configs in apps/sim/enrichments/ (registry, like connectors) that run directly per table row via the existing run/dispatch/ cell-write rails — no workflow execution. - enrichments/{types,registry} + work-email (heuristic) and phone-number (stub). - WorkflowGroup gains enrichmentId; WorkflowGroupOutput gains outputId (workflowId/blockId/path kept required, '' for enrichment groups). - Executor branches on group.type === 'enrichment' → maps inputMappings → enrich() → outputs by outputId → cell-write. Missing required inputs skip (blank cell) instead of erroring. - Sidebar lists the registry; enrichment-config panel maps inputs to columns and creates the enrichment group (no workflow UI). Co-Authored-By: Claude Opus 4.7 (1M context) * feat(enrichments): provider fallback cascade + hosted-key usage source Replace each enrichment's single enrich() with an ordered providers[] fallback cascade. Providers are plain data ({ id, label, toolId, buildParams, mapOutput }) so the catalog stays client-safe; the server-only runner (run.ts) calls executeTool per provider, first non-empty result wins, misses/errors fall through, all-miss = blank cell. Wire four enrichments on the hosted-safe providers (Hunter, PDL): - Work Email (fullName, companyDomain): Hunter -> PDL - Phone Number (fullName, companyDomain): PDL - Company Domain (companyName): PDL - Company Info (domain): PDL -> Hunter Person enrichments take a single canonical fullName (Clay-style); Hunter gets first/last via splitName(), PDL takes name directly. Add 'enrichment' to usage_log_source enum (+ migration) so hosted-key tool cost from these per-row calls can be billed to the table owner. Co-Authored-By: Claude Opus 4.7 (1M context) * feat(enrichments): bill hosted-key cost; surface provider errors; abort safety - runEnrichment now returns { result, cost, error }: accumulates hosted-key cost across the cascade, and sets `error` only when every provider that ran errored (auth/rate-limit/outage) vs a clean miss. - Executor records the cost to the table owner (createdBy) via recordUsage (source 'enrichment'); billing failures are logged, never error the cell. - F1: all-providers-errored now writes status 'error' instead of a blank 'completed' cell that looked like "no data found". - F2: re-check the abort signal after the cascade so a cancel mid-tool-call isn't recorded as a completed empty cell. Co-Authored-By: Claude Opus 4.7 (1M context) * feat(tables): present enrichment columns as first-class in the grid - Meta-header shows the enrichment's name + icon (Mail/Phone/Globe/Building2) instead of "Workflow" + a color chip. - Per-column header icon uses the enrichment's icon (via columnSourceInfo) instead of the generic play icon. - Hide "View execution" for enrichment cells in both the row context menu and the action bar (no workflow execution exists to open); also hide the meta-menu "View workflow" item for enrichment groups. - Clicking an enrichment column header now opens the enrichments sidebar in edit mode (pre-filled input mappings, Update via useUpdateWorkflowGroup) instead of the workflow "Configure workflow" sidebar. - Enrichment config lets the user name each output column (editable per-output, deduped defaults) since enrichments can produce multiple columns. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(tables): enrichment columns use type icon; output names editable - Drop the per-column enrichment icon (it duplicated the meta-header icon). Enrichment output columns now render the standard column-type icon (Text, etc.) — the enrichment's icon stays only on the group meta-header. - Make output column names editable in the enrichment config edit mode too; changed names rename their columns via useUpdateColumn (the rename cascades into the group's output refs server-side). Validation excludes the output's own current name. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(tables): wrap enrichment catalog descriptions instead of truncating Co-Authored-By: Claude Opus 4.7 (1M context) * fix(tables): edit enrichment output columns via the plain column editor Edit column on an enrichment output now opens the normal column-config sidebar (rename / type / unique) instead of the workflow 'Configure output column' panel, which showed workflow-only fields and blocked a simple rename. Co-Authored-By: Claude Opus 4.7 (1M context) * feat(copilot): list_enrichments + add_enrichment table tools Let the copilot enumerate the code-defined enrichment registry and add an enrichment column to a table (validating required input mappings against the table's columns), backed by the same workflow-group machinery the UI uses. * fix(enrichments): address PR review feedback - Guard the enrichment cell path on `enrichmentId` so a group typed 'enrichment' without a registry id falls through to the workflow path instead of erroring. - Clear stale output values when skipping a row for missing required inputs, so the auto cascade re-enriches once inputs return (was left completed+filled). - Write a terminal state on abort in the enrichment path (matches the workflow path) so a cancel between run and terminal-write can't leave the cell running. - Edit mode: apply the group update (mappings/deps/auto-run) before column renames so the primary edit lands even if a rename fails. - Disable Save once validation has surfaced a missing required input. - Use the workflowGroupById map instead of O(n) find in the context-menu and action-bar hot paths. * chore(commands): add /add-enrichment command Guides adding a code-defined table enrichment to the registry, with a required step to verify each provider tool has hosted-key support and chain to /add-hosted-key when it doesn't. * fix(enrichments): address second-pass PR review - updateWorkflowGroup output diff now keys on outputId (falling back to blockId::path) so enrichment outputs — which share empty blockId/path — no longer collapse to one key and drop sibling columns. - Enrichment terminal write now clears output columns absent from the result, so a partial/empty re-run doesn't leave stale values. - Editing a group whose enrichment was removed from the registry shows an explanatory panel instead of silently falling through to the new-enrichment catalog. * feat(tables): show "Not found" badge for empty completed enrichment cells An enrichment that runs to completion but matches nothing now renders a gray "Not found" badge (like the Queued/Waiting cell states) instead of a blank cell, so a real miss is distinguishable from an unrun cell. Scoped to enrichment output columns; an empty string no longer counts as a value. * fix(enrichments): don't re-run completed no-match enrichments on auto cascade A completed enrichment with empty outputs is a real no-match result, not an unfinished run. Eligibility now treats an enrichment's completed status as terminal (regardless of output fill), so the auto cascade stops re-invoking billable provider calls on every no-match row each dispatch. Input changes still clear the exec entry, so genuine re-runs are unaffected; manual Run all still re-runs. * fix(enrichments): treat provider 404 as no-match, not a cell error Providers like People Data Labs signal 'no record found' with HTTP 404, which executeTool surfaces as a failed ToolResponse (status on output.status). The cascade now treats a 404 as a clean miss — falls through to the next provider and lets the cell render 'Not found' — instead of marking the cell errored. Auth/rate-limit/5xx still propagate as real errors. * fix(tools): surface HTTP status on error ToolResponse output executeTool's catch handled Error instances in its first branch and only extracted status/statusText/data for non-Error object throws — so HTTP errors (thrown as Error instances carrying .status) lost their status on the returned output. Surface it for Error instances too, so callers can branch on the status (e.g. the enrichment cascade treating a provider 404 as a no-match). * fix lint * Revert ff --------- Co-authored-by: Claude Opus 4.7 (1M context) --- .claude/commands/add-enrichment.md | 142 + .../app/api/table/[tableId]/groups/route.ts | 4 + .../enrichments-sidebar/enrichment-config.tsx | 371 + .../enrichments-sidebar.tsx | 182 + .../components/enrichments-sidebar/index.ts | 1 + .../tables/[tableId]/components/index.ts | 1 + .../new-column-dropdown.tsx | 17 +- .../table-grid/cells/cell-content.tsx | 5 +- .../table-grid/cells/cell-render.tsx | 22 +- .../components/table-grid/data-row.tsx | 6 + .../table-grid/headers/column-header-menu.tsx | 6 +- .../headers/workflow-group-meta-cell.tsx | 39 +- .../components/table-grid/table-grid.tsx | 64 +- .../components/workflow-sidebar/index.ts | 7 +- .../input-mapping-section.tsx | 85 + .../workflow-sidebar/workflow-sidebar.tsx | 235 +- .../tables/[tableId]/hooks/use-table.ts | 4 + .../[workspaceId]/tables/[tableId]/table.tsx | 32 +- .../input-mapping/input-mapping.tsx | 172 +- .../background/workflow-column-execution.ts | 180 +- .../collapsible-card/collapsible-card.tsx | 61 + apps/sim/components/emcn/components/index.ts | 1 + .../company-domain/company-domain.ts | 33 + apps/sim/enrichments/company-domain/index.ts | 1 + .../enrichments/company-info/company-info.ts | 67 + apps/sim/enrichments/company-info/index.ts | 1 + apps/sim/enrichments/index.ts | 9 + apps/sim/enrichments/phone-number/index.ts | 1 + .../enrichments/phone-number/phone-number.ts | 41 + apps/sim/enrichments/providers.ts | 47 + apps/sim/enrichments/registry.ts | 19 + apps/sim/enrichments/run.ts | 97 + apps/sim/enrichments/types.ts | 75 + apps/sim/enrichments/work-email/index.ts | 1 + apps/sim/enrichments/work-email/work-email.ts | 57 + apps/sim/hooks/queries/tables.ts | 2 + apps/sim/lib/api/contracts/tables.ts | 28 +- apps/sim/lib/billing/core/usage-log.ts | 1 + .../lib/copilot/generated/tool-catalog-v1.ts | 49 +- .../lib/copilot/generated/tool-schemas-v1.ts | 228 +- apps/sim/lib/copilot/tools/server/router.ts | 1 + .../tools/server/table/user-table.test.ts | 186 + .../copilot/tools/server/table/user-table.ts | 130 + apps/sim/lib/table/service.ts | 17 +- apps/sim/lib/table/types.ts | 42 +- apps/sim/lib/table/workflow-columns.ts | 48 +- apps/sim/tools/index.ts | 12 + .../db/migrations/0213_wealthy_sue_storm.sql | 1 + .../db/migrations/meta/0213_snapshot.json | 17062 ++++++++++++++++ packages/db/migrations/meta/_journal.json | 7 + packages/db/schema.ts | 1 + 51 files changed, 19593 insertions(+), 308 deletions(-) create mode 100644 .claude/commands/add-enrichment.md create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichment-config.tsx create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichments-sidebar.tsx create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/index.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/input-mapping-section.tsx create mode 100644 apps/sim/components/emcn/components/collapsible-card/collapsible-card.tsx create mode 100644 apps/sim/enrichments/company-domain/company-domain.ts create mode 100644 apps/sim/enrichments/company-domain/index.ts create mode 100644 apps/sim/enrichments/company-info/company-info.ts create mode 100644 apps/sim/enrichments/company-info/index.ts create mode 100644 apps/sim/enrichments/index.ts create mode 100644 apps/sim/enrichments/phone-number/index.ts create mode 100644 apps/sim/enrichments/phone-number/phone-number.ts create mode 100644 apps/sim/enrichments/providers.ts create mode 100644 apps/sim/enrichments/registry.ts create mode 100644 apps/sim/enrichments/run.ts create mode 100644 apps/sim/enrichments/types.ts create mode 100644 apps/sim/enrichments/work-email/index.ts create mode 100644 apps/sim/enrichments/work-email/work-email.ts create mode 100644 packages/db/migrations/0213_wealthy_sue_storm.sql create mode 100644 packages/db/migrations/meta/0213_snapshot.json diff --git a/.claude/commands/add-enrichment.md b/.claude/commands/add-enrichment.md new file mode 100644 index 00000000000..04154e73abb --- /dev/null +++ b/.claude/commands/add-enrichment.md @@ -0,0 +1,142 @@ +--- +description: Add a code-defined table enrichment (registry entry) backed by a provider cascade, ensuring each provider tool has hosted-key support +argument-hint: +--- + +# Adding a Table Enrichment + +Enrichments are code-defined entries in `apps/sim/enrichments/` that run **directly per table row** (no workflow). Each enrichment declares inputs, outputs, and an ordered list of **providers**; the cascade runner tries providers in order and the first non-empty result fills the cell. Each provider calls one existing Sim tool via `executeTool`, which injects the workspace's BYOK key or a **hosted key** and bills usage automatically. + +Because enrichments run on Sim's hosted keys by default, **every provider tool you reference must have hosted-key support** — otherwise it can only run when the workspace brings its own key. This command makes that check a required step. + +## Overview + +| Step | What | Where | +|------|------|-------| +| 1 | Pick the data-source tool(s) for each output | `tools/{service}/` + `tools/registry.ts` | +| 2 | **Verify each tool has `hosting`; if not, run `/add-hosted-key`** | `tools/{service}/{action}.ts` | +| 3 | Write the enrichment definition | `enrichments/{name}/{name}.ts` + `index.ts` | +| 4 | Register it | `enrichments/registry.ts` | +| 5 | Verify | tsc / biome / manual run | + +## Architecture (what you're plugging into) + +- **`enrichments/types.ts`** — `EnrichmentConfig { id, name, description, icon, inputs, outputs, providers }` and `EnrichmentProvider { id, label, toolId, buildParams, mapOutput }`. Providers are **plain data** (no `@/tools` import) so the catalog stays client-safe. +- **`enrichments/providers.ts`** — `toolProvider(...)` (typed passthrough) plus shared input helpers: `str(v)`, `normalizeDomain(v)`, `firstNonEmpty(arr)`, `splitName(fullName)`. +- **`enrichments/run.ts`** — the server-only cascade runner. Calls `executeTool(provider.toolId, { ...params, _context: { workspaceId } })`, accumulates hosted-key cost, returns the first non-empty mapped result. **You do not edit this** — it works for any registry entry. +- **`enrichments/registry.ts`** — `ENRICHMENT_REGISTRY` / `ALL_ENRICHMENTS` / `getEnrichment`. Register new entries here. + +Outputs automatically become table columns; billing, the catalog/sidebar UI, the column meta-header icon, and per-row execution all work with no extra wiring. + +## Step 1: Pick the data-source tool(s) + +For each output the enrichment produces, decide which existing tool provides it. Look up the service's API and the tool in `apps/sim/tools/{service}/` (e.g. `hunter_email_finder`, `pdl_person_enrich`, `pdl_company_enrich`). Confirm: + +- The tool id is registered in `apps/sim/tools/registry.ts`. +- Its `params` accept what you can derive from table columns (read the tool's `params`). +- Its `outputs` / `transformResponse` actually expose the field you need (read the real output shape — don't assume). + +Order providers **cheapest / most-likely-to-hit first**; the cascade stops at the first non-empty result. Apollo / LinkedIn are not hosted-safe (ToS) — don't use them. + +## Step 2: Verify hosted-key support — chain to `/add-hosted-key` if missing + +**This is the required gate.** For every tool a provider calls, open `apps/sim/tools/{service}/{action}.ts` and check for a `hosting` block: + +```typescript +hosting: { + envKeyPrefix: 'SERVICE_API_KEY', + apiKeyParam: 'apiKey', + byokProviderId: 'service', + pricing: { /* ... */ }, + rateLimit: { /* ... */ }, +} +``` + +- **If `hosting` is present** — good. Note the `envKeyPrefix`; the deployment needs `{PREFIX}_COUNT` + `{PREFIX}_1..N` env vars set for the hosted key to actually resolve at runtime (ops concern, not code). If those env vars aren't set in the target environment, the provider will only run with a workspace BYOK key. +- **If `hosting` is absent** — the tool can't use a Sim-provided key, so the enrichment would silently produce blank cells on hosted Sim. **Stop and run `/add-hosted-key `** to add hosted-key support to that tool first, then come back. Do this for every provider tool that lacks it. + +Why it matters: the cascade runner only bills (and only reads `output.cost.total`) when `executeTool` injected a hosted key, which requires the tool's `hosting` config. No `hosting` → no hosted key → the enrichment depends entirely on per-workspace BYOK. + +## Step 3: Write the enrichment definition + +Create `apps/sim/enrichments/{name}/{name}.ts` and a barrel `index.ts`. Mirror the existing entries (`work-email`, `phone-number`, `company-domain`, `company-info`). + +```typescript +import { SomeIcon } from 'lucide-react' +import { filterUndefined } from '@sim/utils/object' +import { normalizeDomain, splitName, str, toolProvider } from '@/enrichments/providers' +import type { EnrichmentConfig } from '@/enrichments/types' + +export const myEnrichment: EnrichmentConfig = { + id: 'my-enrichment', + name: 'My Enrichment', + description: 'One concise sentence describing what it finds.', + icon: SomeIcon, + inputs: [ + // Person enrichments take a single canonical `fullName` (Clay-style); + // split it with splitName() for tools that need first/last. + { id: 'fullName', name: 'Full name', type: 'string', required: true }, + { id: 'companyDomain', name: 'Company domain', type: 'string' }, + ], + outputs: [{ id: 'value', name: 'value', type: 'string' }], + providers: [ + toolProvider({ + id: 'provider-a', + label: 'Provider A', + toolId: 'service_action', // must have `hosting` (Step 2) + buildParams: (inputs) => { + // Return null when there aren't enough inputs → cascade skips this provider. + const name = splitName(inputs.fullName) + const domain = normalizeDomain(inputs.companyDomain) + if (!name || !domain) return null + return { domain, first_name: name.firstName, last_name: name.lastName } + }, + mapOutput: (output) => { + // Return { [outputId]: value } on a hit, or null to fall through. + const value = str(output.value) + return value ? { value } : null + }, + }), + // ...additional fallback providers, in priority order. + ], +} +``` + +```typescript +// apps/sim/enrichments/{name}/index.ts +export { myEnrichment } from './my-enrichment' +``` + +Rules: +- Keep the file **client-safe**: import only `lucide-react`, `@sim/utils/*`, `@/enrichments/providers`, and the types. **Never import `@/tools`** here — the runner does the tool call. +- `buildParams` returns `null` when inputs are insufficient (provider skipped). `mapOutput` returns `null`/empty for a miss (falls through). Use `filterUndefined` when assembling optional tool params; coerce numbers explicitly (don't pass `''` to number outputs). +- Output `id`s are the keys `mapOutput` returns; output `name`s are the default column names (the user can rename them in the config). + +## Step 4: Register it + +In `apps/sim/enrichments/registry.ts`, import and add the entry (catalog order is registration order): + +```typescript +import { myEnrichment } from '@/enrichments/my-enrichment' + +export const ENRICHMENT_REGISTRY: EnrichmentRegistry = { + // ...existing + [myEnrichment.id]: myEnrichment, +} +``` + +## Step 5: Verify + +1. `bunx tsc --noEmit` (from `apps/sim`, `NODE_OPTIONS=--max-old-space-size=8192`) and `bunx biome check` on the changed files. +2. In a table → **+ New column → Enrichments** → pick the new enrichment, map its inputs to columns, name the output column(s), Save. Confirm it appears in the catalog with its icon/description. +3. With hosted keys (or a workspace BYOK key) configured for each provider's service, run a row and confirm the cell fills; the dev-server log shows `Enrichment hit { provider }`. A row whose providers all miss completes blank; a row where every provider errored shows an error cell. + +## Checklist + +- [ ] Each output mapped to a real tool field (verified against the tool's `params`/`outputs`) +- [ ] **Every provider tool has a `hosting` block — ran `/add-hosted-key` for any that didn't** +- [ ] Providers ordered cheapest / most-likely-first; Apollo/LinkedIn not used +- [ ] Enrichment file is client-safe (no `@/tools` import); uses `toolProvider` + shared helpers +- [ ] `buildParams` returns `null` on insufficient inputs; `mapOutput` returns `null` on a miss +- [ ] Registered in `enrichments/registry.ts` +- [ ] tsc + biome clean; created and ran the column end-to-end diff --git a/apps/sim/app/api/table/[tableId]/groups/route.ts b/apps/sim/app/api/table/[tableId]/groups/route.ts index bf74653212a..197a1722b1b 100644 --- a/apps/sim/app/api/table/[tableId]/groups/route.ts +++ b/apps/sim/app/api/table/[tableId]/groups/route.ts @@ -113,6 +113,10 @@ export const PATCH = withRouteHandler(async (request: NextRequest, { params }: R ...(validated.mappingUpdates !== undefined ? { mappingUpdates: validated.mappingUpdates } : {}), + ...(validated.inputMappings !== undefined + ? { inputMappings: validated.inputMappings } + : {}), + ...(validated.type !== undefined ? { type: validated.type } : {}), ...(validated.autoRun !== undefined ? { autoRun: validated.autoRun } : {}), }, requestId diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichment-config.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichment-config.tsx new file mode 100644 index 00000000000..2e021797858 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichment-config.tsx @@ -0,0 +1,371 @@ +'use client' + +import { useState } from 'react' +import { toError } from '@sim/utils/errors' +import { generateId } from '@sim/utils/id' +import { X } from 'lucide-react' +import { + Badge, + Button, + CollapsibleCard, + Combobox, + FieldDivider, + Input, + Label, + Switch, + toast, +} from '@/components/emcn' +import { ArrowLeft } from '@/components/emcn/icons' +import type { AddWorkflowGroupBodyInput } from '@/lib/api/contracts/tables' +import { cn } from '@/lib/core/utils/cn' +import type { ColumnDefinition, WorkflowGroup, WorkflowGroupOutput } from '@/lib/table' +import { deriveOutputColumnName } from '@/lib/table/column-naming' +import type { EnrichmentConfig as EnrichmentDef } from '@/enrichments/types' +import { + useAddWorkflowGroup, + useUpdateColumn, + useUpdateWorkflowGroup, +} from '@/hooks/queries/tables' +import { RunSettingsSection } from '../workflow-sidebar/run-settings-section' + +interface EnrichmentConfigProps { + enrichment: EnrichmentDef + allColumns: ColumnDefinition[] + workspaceId: string + tableId: string + onBack: () => void + onClose: () => void + /** When set, the panel edits this existing enrichment group (pre-filled, + * updates instead of creating; changed output names rename their columns). */ + existingGroup?: WorkflowGroup +} + +/** Pre-fill an input's column from a same-named column (case-insensitive). */ +function defaultColumnFor( + input: EnrichmentDef['inputs'][number], + columns: ColumnDefinition[] +): string { + const match = columns.find( + (c) => + c.name.toLowerCase() === input.id.toLowerCase() || + c.name.toLowerCase() === input.name.toLowerCase() + ) + return match?.name ?? '' +} + +/** + * Config panel for a code-defined enrichment. No workflow: the user maps each + * enrichment input to a table column; outputs are fixed by the enrichment. + * Saving creates an `enrichment` workflow group that the table runs per row. + */ +export function EnrichmentConfig({ + enrichment, + allColumns, + workspaceId, + tableId, + onBack, + onClose, + existingGroup, +}: EnrichmentConfigProps) { + const addWorkflowGroup = useAddWorkflowGroup({ workspaceId, tableId }) + const updateWorkflowGroup = useUpdateWorkflowGroup({ workspaceId, tableId }) + const updateColumn = useUpdateColumn({ workspaceId, tableId }) + const isEditing = Boolean(existingGroup) + + /** Output column's persisted name (edit mode), used to detect renames. */ + const originalOutputName = (outputId: string): string | undefined => + existingGroup?.outputs.find((o) => o.outputId === outputId)?.columnName + + const [inputMappings, setInputMappings] = useState>(() => { + if (existingGroup) { + const seed: Record = {} + for (const m of existingGroup.inputMappings ?? []) seed[m.inputName] = m.columnName + return seed + } + const seed: Record = {} + for (const input of enrichment.inputs) { + const col = defaultColumnFor(input, allColumns) + if (col) seed[input.id] = col + } + return seed + }) + // Per-output column names. Editable in both modes — edit mode seeds the + // existing column names and renames changed ones on save. + const [outputNames, setOutputNames] = useState>(() => { + const seed: Record = {} + if (existingGroup) { + for (const o of existingGroup.outputs) { + if (o.outputId) seed[o.outputId] = o.columnName + } + return seed + } + const taken = new Set(allColumns.map((c) => c.name)) + for (const o of enrichment.outputs) { + const colName = deriveOutputColumnName(o.name, taken) + taken.add(colName) + seed[o.id] = colName + } + return seed + }) + const [collapsed, setCollapsed] = useState>({}) + const [autoRun, setAutoRun] = useState(() => existingGroup?.autoRun ?? false) + const [deps, setDeps] = useState(() => existingGroup?.dependencies?.columns ?? []) + const [showValidation, setShowValidation] = useState(false) + + const columnOptions = allColumns.map((c) => ({ label: c.name, value: c.name })) + const missingRequired = enrichment.inputs.some((i) => i.required && !inputMappings[i.id]) + const depsValid = !autoRun || deps.length > 0 + + /** Per-output column-name validation (both modes). Excludes the output's own + * current column so renaming to its existing name isn't flagged. */ + function outputNameError(outputId: string): string | null { + const value = (outputNames[outputId] ?? '').trim() + if (!value) return 'Required' + const lower = value.toLowerCase() + const ownOriginal = originalOutputName(outputId)?.toLowerCase() + if ( + allColumns.some((c) => c.name.toLowerCase() === lower && c.name.toLowerCase() !== ownOriginal) + ) + return 'Column already exists' + const dup = enrichment.outputs.some( + (o) => o.id !== outputId && (outputNames[o.id] ?? '').trim().toLowerCase() === lower + ) + return dup ? 'Duplicate name' : null + } + const outputsInvalid = enrichment.outputs.some((o) => outputNameError(o.id) !== null) + const saveDisabled = + addWorkflowGroup.isPending || + updateWorkflowGroup.isPending || + updateColumn.isPending || + (showValidation && missingRequired) || + !depsValid || + outputsInvalid + + async function handleSave() { + if (missingRequired || (autoRun && deps.length === 0) || outputsInvalid) { + setShowValidation(true) + return + } + const inputMappingsList = Object.entries(inputMappings) + .filter(([, columnName]) => Boolean(columnName)) + .map(([inputName, columnName]) => ({ inputName, columnName })) + + if (existingGroup) { + try { + // Apply the group edit (mappings / deps / auto-run) first so it lands + // even if a later column rename fails. Renames run after and cascade + // into the group's output refs server-side. + await updateWorkflowGroup.mutateAsync({ + groupId: existingGroup.id, + name: enrichment.name, + dependencies: { columns: deps }, + inputMappings: inputMappingsList, + autoRun, + }) + for (const o of enrichment.outputs) { + const original = originalOutputName(o.id) + const next = (outputNames[o.id] ?? '').trim() + if (original && next && next !== original) { + await updateColumn.mutateAsync({ columnName: original, updates: { name: next } }) + } + } + toast.success(`Updated "${enrichment.name}"`) + onClose() + } catch (err) { + toast.error(toError(err).message) + } + return + } + + const groupId = generateId() + const taken = new Set(allColumns.map((c) => c.name)) + const outputColumns: AddWorkflowGroupBodyInput['outputColumns'] = [] + const outputs: WorkflowGroupOutput[] = [] + for (const o of enrichment.outputs) { + const desired = (outputNames[o.id] ?? '').trim() || o.name + const colName = deriveOutputColumnName(desired, taken) + taken.add(colName) + outputColumns.push({ + name: colName, + type: o.type, + required: false, + unique: false, + workflowGroupId: groupId, + }) + outputs.push({ blockId: '', path: '', outputId: o.id, columnName: colName }) + } + + const group: WorkflowGroup = { + id: groupId, + workflowId: '', + enrichmentId: enrichment.id, + name: enrichment.name, + type: 'enrichment', + dependencies: { columns: deps }, + outputs, + inputMappings: inputMappingsList, + autoRun, + } + try { + await addWorkflowGroup.mutateAsync({ group, outputColumns }) + toast.success(`Added "${enrichment.name}"`) + onClose() + } catch (err) { + toast.error(toError(err).message) + } + } + + return ( +
+
+
+ +

+ {enrichment.name} +

+
+ +
+ +
+
+ + {enrichment.inputs.length === 0 ? ( +

+ This enrichment needs no inputs. +

+ ) : ( +
+ {enrichment.inputs.map((input) => ( + + {input.type} + + } + collapsed={collapsed[input.id] ?? false} + onToggleCollapse={() => + setCollapsed((prev) => ({ ...prev, [input.id]: !prev[input.id] })) + } + > + + + setInputMappings((prev) => ({ ...prev, [input.id]: columnName })) + } + error={ + showValidation && input.required && !inputMappings[input.id] + ? 'Required' + : null + } + /> + + ))} +
+ )} +
+ + + +
+ +
+ {enrichment.outputs.map((output) => { + const outErr = showValidation ? outputNameError(output.id) : null + return ( + + {output.type} + + } + collapsed={collapsed[`out:${output.id}`] ?? false} + onToggleCollapse={() => + setCollapsed((prev) => ({ + ...prev, + [`out:${output.id}`]: !prev[`out:${output.id}`], + })) + } + > + + + setOutputNames((prev) => ({ ...prev, [output.id]: e.target.value })) + } + spellCheck={false} + autoComplete='off' + className={cn(outErr && 'border-[var(--text-error)]')} + /> + {outErr &&

{outErr}

} +
+ ) + })} +
+
+ + + +
+ + setAutoRun(!!v)} + /> +
+ {autoRun && ( + <> + + + + )} +
+ +
+ + +
+
+ ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichments-sidebar.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichments-sidebar.tsx new file mode 100644 index 00000000000..3594d0ca029 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/enrichments-sidebar.tsx @@ -0,0 +1,182 @@ +'use client' + +import { useState } from 'react' +import { X } from 'lucide-react' +import { Button, Input } from '@/components/emcn' +import { Search } from '@/components/emcn/icons' +import { cn } from '@/lib/core/utils/cn' +import type { ColumnDefinition, WorkflowGroup } from '@/lib/table' +import { ALL_ENRICHMENTS } from '@/enrichments' +import { getEnrichment } from '@/enrichments/registry' +import type { EnrichmentConfig as EnrichmentDef } from '@/enrichments/types' +import { EnrichmentConfig } from './enrichment-config' + +interface EnrichmentsSidebarProps { + open: boolean + onClose: () => void + allColumns: ColumnDefinition[] + workspaceId: string + tableId: string + /** When set, the sidebar opens straight into this enrichment group's config + * in edit mode (skips the catalog list). */ + editGroup?: WorkflowGroup +} + +/** + * Right-edge panel for the enrichments flow. Lists the code-defined enrichment + * registry and, once one is picked, swaps in its config panel in the *same* + * sliding panel (input mapping + outputs), which creates an enrichment column. + */ +export function EnrichmentsSidebar({ open, ...rest }: EnrichmentsSidebarProps) { + return ( + + ) +} + +function EnrichmentsSidebarBody({ + onClose, + allColumns, + workspaceId, + tableId, + editGroup, +}: Omit) { + const [selected, setSelected] = useState(null) + const [query, setQuery] = useState('') + + // Edit mode: open the picked enrichment's config directly, pre-filled from the + // existing group. No catalog list / back-to-list step. + const editEnrichment = editGroup ? getEnrichment(editGroup.enrichmentId) : undefined + if (editGroup && editEnrichment) { + return ( + + ) + } + // Editing a group whose enrichment was removed from the registry — surface it + // rather than silently dropping into the "new enrichment" catalog. + if (editGroup && !editEnrichment) { + return ( +
+
+

Enrichment

+ +
+
+

+ This enrichment ("{editGroup.enrichmentId}") is no longer available. Delete the column + and add a current enrichment. +

+
+
+ ) + } + + if (selected) { + return ( + setSelected(null)} + onClose={onClose} + /> + ) + } + + const normalized = query.trim().toLowerCase() + const filtered = normalized + ? ALL_ENRICHMENTS.filter( + (e) => + e.name.toLowerCase().includes(normalized) || + e.description.toLowerCase().includes(normalized) + ) + : ALL_ENRICHMENTS + + return ( +
+
+

Enrichments

+ +
+ +
+
+ + setQuery(e.target.value)} + placeholder='Search' + spellCheck={false} + autoComplete='off' + className='pl-7' + /> +
+
+ +
+ {filtered.length === 0 ? ( +

No enrichments found.

+ ) : ( +
    + {filtered.map((enrichment) => { + const Icon = enrichment.icon + return ( +
  • + +
  • + ) + })} +
+ )} +
+
+ ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/index.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/index.ts new file mode 100644 index 00000000000..d42dfa8815f --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/enrichments-sidebar/index.ts @@ -0,0 +1 @@ +export { EnrichmentsSidebar } from './enrichments-sidebar' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/index.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/index.ts index 0fca186c0c6..34b5f41f5fa 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/index.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/index.ts @@ -1,5 +1,6 @@ export * from './column-config-sidebar' export * from './context-menu' +export * from './enrichments-sidebar' export * from './new-column-dropdown' export * from './row-modal' export * from './run-status-control' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx index 32956f7c6ca..045e44390f2 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx @@ -1,10 +1,12 @@ 'use client' +import { Sparkles } from 'lucide-react' import { Button, DropdownMenu, DropdownMenuContent, DropdownMenuItem, + DropdownMenuSeparator, DropdownMenuTrigger, } from '@/components/emcn' import { Plus } from '@/components/emcn/icons' @@ -28,18 +30,20 @@ interface NewColumnDropdownProps { disabled: boolean onPickType: (type: ColumnDefinition['type']) => void onPickWorkflow: () => void + onPickEnrichment: () => void } /** * "+ New column" dropdown — the single entry point for creating a column. - * Lists every column type plus "Workflow"; picking a type opens the right - * sidebar pre-seeded. + * Lists every column type plus "Workflow" and "Enrichments"; picking a type + * opens the right sidebar pre-seeded. */ export function NewColumnDropdown({ trigger, disabled, onPickType, onPickWorkflow, + onPickEnrichment, }: NewColumnDropdownProps) { const menu = ( @@ -61,6 +65,15 @@ export function NewColumnDropdown({ )} + {isWorkflowColumnsEnabledClient && ( + <> + + + Enrichments + + + + )} {VISIBLE_COLUMN_TYPE_OPTIONS.map((option) => { const Icon = option.icon const onSelect = diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-content.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-content.tsx index 3447d535327..60c3cc05336 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-content.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-content.tsx @@ -20,6 +20,8 @@ interface CellContentProps { * is empty. `undefined` (or empty) means no waiting state. */ waitingOnLabels?: string[] + /** Column is an enrichment output — a completed-but-empty cell renders "Not found". */ + isEnrichmentOutput?: boolean } /** @@ -37,8 +39,9 @@ export function CellContent({ onSave, onCancel, waitingOnLabels, + isEnrichmentOutput, }: CellContentProps) { - const kind = resolveCellRender({ value, exec, column, waitingOnLabels }) + const kind = resolveCellRender({ value, exec, column, waitingOnLabels, isEnrichmentOutput }) return ( <> diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx index 27ff1f2ae44..557186b7668 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx @@ -20,6 +20,7 @@ export type CellRenderKind = | { kind: 'cancelled' } | { kind: 'error' } | { kind: 'waiting'; labels: string[] } + | { kind: 'not-found' } // Plain typed cells | { kind: 'boolean'; checked: boolean } | { kind: 'json'; text: string } @@ -34,6 +35,9 @@ interface ResolveCellRenderInput { exec: RowExecutionMetadata | undefined column: DisplayColumn waitingOnLabels: string[] | undefined + /** Column is an enrichment-group output — a completed-but-empty cell renders + * "Not found" rather than a blank, since the enrichment ran and matched nothing. */ + isEnrichmentOutput?: boolean } export function resolveCellRender({ @@ -41,8 +45,10 @@ export function resolveCellRender({ exec, column, waitingOnLabels, + isEnrichmentOutput, }: ResolveCellRenderInput): CellRenderKind { const isNull = value === null || value === undefined + const isEmpty = isNull || value === '' if (column.workflowGroupId) { const blockId = column.outputBlockId @@ -57,8 +63,9 @@ export function resolveCellRender({ if (inFlight && blockRunning) return { kind: 'running' } // Value wins over pending-upstream: a finished column stays finished even - // while other blocks in the group are still running. - if (!isNull) return { kind: 'value', text: stringifyValue(value) } + // while other blocks in the group are still running. An empty string is not + // a value — it falls through so a completed enrichment can show "Not found". + if (!isEmpty) return { kind: 'value', text: stringifyValue(value) } if (inFlight && !(groupHasBlockErrors && !blockRunning)) { // A `pending` cell whose jobId starts with `paused-` is mid-pause @@ -79,6 +86,8 @@ export function resolveCellRender({ } if (exec?.status === 'cancelled') return { kind: 'cancelled' } if (exec?.status === 'error') return { kind: 'error' } + // Enrichment ran to completion but matched nothing → "Not found". + if (isEnrichmentOutput && exec?.status === 'completed') return { kind: 'not-found' } return { kind: 'empty' } } @@ -273,6 +282,15 @@ export function CellRender({ kind, isEditing }: CellRenderProps): React.ReactEle ) + case 'not-found': + return ( + + + Not found + + + ) + case 'empty': return null diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/data-row.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/data-row.tsx index aed56ac8de4..e228edba84d 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/data-row.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/data-row.tsx @@ -332,6 +332,12 @@ export const DataRow = React.memo(function DataRow({ ? (waitingByGroupId?.get(column.workflowGroupId) ?? undefined) : undefined } + isEnrichmentOutput={ + column.workflowGroupId + ? workflowGroups.find((g) => g.id === column.workflowGroupId)?.type === + 'enrichment' + : false + } /> diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/column-header-menu.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/column-header-menu.tsx index 06a004843e2..13010ad3179 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/column-header-menu.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/column-header-menu.tsx @@ -250,7 +250,7 @@ export const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({
@@ -287,7 +287,7 @@ export const ColumnHeaderMenu = React.memo(function ColumnHeaderMenu({ > diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/workflow-group-meta-cell.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/workflow-group-meta-cell.tsx index 23fa84f2227..3b38dc3bf41 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/workflow-group-meta-cell.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/headers/workflow-group-meta-cell.tsx @@ -23,6 +23,8 @@ import { } from '@/components/emcn/icons' import type { RunMode } from '@/lib/api/contracts/tables' import { cn } from '@/lib/core/utils/cn' +import type { WorkflowGroupType } from '@/lib/table' +import { getEnrichment } from '@/enrichments/registry' import type { WorkflowMetadata } from '@/stores/workflows/registry/types' import { SELECTION_TINT_BG } from '../constants' import type { DisplayColumn } from '../types' @@ -166,6 +168,13 @@ export function ColumnOptionsMenu({ interface WorkflowGroupMetaCellProps { workflowId: string groupId: string + /** When `'enrichment'`, the cell shows the enrichment's name + icon instead + * of a backing workflow's color chip + name. */ + groupType?: WorkflowGroupType + /** Registry id for enrichment groups (resolves name/icon fallback). */ + enrichmentId?: string + /** Persisted group name (the enrichment name at creation). */ + groupName?: string size: number startColIndex: number columnName: string @@ -205,6 +214,9 @@ interface WorkflowGroupMetaCellProps { export function WorkflowGroupMetaCell({ workflowId, groupId, + groupType, + enrichmentId, + groupName, size, startColIndex, columnName, @@ -226,9 +238,14 @@ export function WorkflowGroupMetaCell({ onDragLeave, readOnly, }: WorkflowGroupMetaCellProps) { + const isEnrichment = groupType === 'enrichment' + const enrichment = isEnrichment ? getEnrichment(enrichmentId) : undefined + const EnrichmentIcon = enrichment?.icon const wf = workflows?.find((w) => w.id === workflowId) const color = wf?.color ?? 'var(--text-muted)' - const name = wf?.name ?? 'Workflow' + const name = isEnrichment + ? (groupName ?? enrichment?.name ?? 'Enrichment') + : (wf?.name ?? 'Workflow') const [optionsMenuOpen, setOptionsMenuOpen] = useState(false) const [optionsMenuPosition, setOptionsMenuPosition] = useState({ x: 0, y: 0 }) @@ -369,14 +386,18 @@ export function WorkflowGroupMetaCell({ style={{ background: color }} />
- + {isEnrichment && EnrichmentIcon ? ( + + ) : ( + + )} {name} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx index 3b8dab0c1b8..fb15adeb2cf 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx @@ -11,7 +11,7 @@ import { TableX } from '@/components/emcn/icons' import type { RunMode } from '@/lib/api/contracts/tables' import { cn } from '@/lib/core/utils/cn' import { captureEvent } from '@/lib/posthog/client' -import type { ColumnDefinition, TableRow as TableRowType } from '@/lib/table' +import type { ColumnDefinition, TableRow as TableRowType, WorkflowGroup } from '@/lib/table' import { TABLE_LIMITS } from '@/lib/table/constants' import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider' import { @@ -139,6 +139,10 @@ interface TableGridProps { */ onOpenColumnConfig: (cfg: ColumnConfig) => void onOpenWorkflowConfig: (cfg: WorkflowConfig) => void + /** Open the enrichments list (Clay-style catalog) slideout. */ + onOpenEnrichments: () => void + /** Open the enrichments slideout in edit mode for an existing enrichment group. */ + onOpenEnrichmentConfig: (group: WorkflowGroup) => void onOpenExecutionDetails: (executionId: string) => void /** Open the row-edit modal for `row`. Wrapper renders the modal. */ onOpenRowModal: (row: TableRowType) => void @@ -243,6 +247,8 @@ export function TableGrid({ sidebarReservedPx, onOpenColumnConfig, onOpenWorkflowConfig, + onOpenEnrichments, + onOpenEnrichmentConfig, onOpenExecutionDetails, onOpenRowModal, onRequestDeleteRows, @@ -508,6 +514,11 @@ export function TableGrid({ return expandToDisplayColumns(ordered, tableWorkflowGroups) }, [columns, columnOrder, tableWorkflowGroups]) + const workflowGroupById = useMemo( + () => new Map(tableWorkflowGroups.map((g) => [g.id, g])), + [tableWorkflowGroups] + ) + const hasWorkflowColumns = columns.some((c) => !!c.workflowGroupId) const { colWidth: checkboxColWidth, numDivWidth } = checkboxColLayout( tableData?.maxRows ?? 0, @@ -755,11 +766,14 @@ export function TableGrid({ _exec?.status === 'pending' && typeof _exec?.jobId === 'string' && _exec.jobId.startsWith('paused-') + // Enrichment cells have no workflow execution trace to open. + const _isEnrichmentGroup = workflowGroupById.get(_gid)?.type === 'enrichment' contextMenuHasStartedRun = - _exec?.status === 'completed' || - _exec?.status === 'error' || - _exec?.status === 'running' || - _isPaused + !_isEnrichmentGroup && + (_exec?.status === 'completed' || + _exec?.status === 'error' || + _exec?.status === 'running' || + _isPaused) contextMenuExecutionId = _exec?.executionId ?? null } } @@ -2560,27 +2574,39 @@ export function TableGrid({ /** Open the workflow-config sidebar to spawn a brand-new workflow group. */ function handleAddWorkflowColumn() { - onOpenWorkflowConfig({ mode: 'create', proposedName: generateColumnName() }) + onOpenWorkflowConfig({ mode: 'create', kind: 'manual', proposedName: generateColumnName() }) } const handleConfigureColumn = useCallback( (columnName: string) => { const column = columnsRef.current.find((c) => c.name === columnName) - if (column?.workflowGroupId) { - // Workflow-output column header → single-output sub-mode. + const group = column?.workflowGroupId + ? workflowGroupById.get(column.workflowGroupId) + : undefined + // Enrichment output columns behave like plain columns (rename / type / + // unique) — route them to the normal column editor, not the workflow + // "Configure output column" panel. + if (column?.workflowGroupId && group?.type !== 'enrichment') { onOpenWorkflowConfig({ mode: 'edit-output', columnName }) } else { onOpenColumnConfig({ mode: 'edit', columnName }) } }, - [onOpenColumnConfig, onOpenWorkflowConfig] + [onOpenColumnConfig, onOpenWorkflowConfig, workflowGroupById] ) const handleConfigureWorkflowGroup = useCallback( (groupId: string) => { + const group = workflowGroupById.get(groupId) + // Enrichment groups have no workflow — route their config to the + // enrichments sidebar (edit mode) instead of the workflow sidebar. + if (group?.type === 'enrichment') { + onOpenEnrichmentConfig(group) + return + } onOpenWorkflowConfig({ mode: 'edit-group', groupId }) }, - [onOpenWorkflowConfig] + [onOpenEnrichmentConfig, onOpenWorkflowConfig, workflowGroupById] ) const handleDeleteWorkflowGroup = useCallback((groupId: string) => { @@ -2901,14 +2927,18 @@ export function TableGrid({ // running/completed/error. const isPaused = status === 'pending' && typeof exec?.jobId === 'string' && exec.jobId.startsWith('paused-') + // Enrichment groups have no workflow execution to open — never offer "View + // execution" for them. + const isEnrichmentGroup = workflowGroupById.get(groupId)?.type === 'enrichment' return { rowId: row.id, groupId, executionId: exec?.executionId ?? null, canViewExecution: - status === 'completed' || status === 'error' || status === 'running' || isPaused, + !isEnrichmentGroup && + (status === 'completed' || status === 'error' || status === 'running' || isPaused), } - }, [normalizedSelection, rows, displayColumns]) + }, [normalizedSelection, rows, displayColumns, workflowGroupById]) const tableWorkflowGroupIds = useMemo( () => tableWorkflowGroups.map((g) => g.id), @@ -3138,6 +3168,9 @@ export function TableGrid({ normalizedSelection.endCol >= g.startColIndex + g.size - 1 } groupId={g.groupId} + groupType={workflowGroupById.get(g.groupId)?.type} + enrichmentId={workflowGroupById.get(g.groupId)?.enrichmentId} + groupName={workflowGroupById.get(g.groupId)?.name} onSelectGroup={handleGroupSelect} onOpenConfig={() => handleConfigureWorkflowGroup(g.groupId)} onRunColumn={userPermissions.canEdit ? handleRunColumn : undefined} @@ -3154,7 +3187,11 @@ export function TableGrid({ onDeleteGroup={ userPermissions.canEdit ? handleDeleteWorkflowGroup : undefined } - onViewWorkflow={handleViewWorkflow} + onViewWorkflow={ + workflowGroupById.get(g.groupId)?.type === 'enrichment' + ? undefined + : handleViewWorkflow + } readOnly={!userPermissions.canEdit} onDragStart={ userPermissions.canEdit ? handleColumnDragStart : undefined @@ -3229,6 +3266,7 @@ export function TableGrid({ disabled={addColumnMutation.isPending} onPickType={handleAddColumnOfType} onPickWorkflow={handleAddWorkflowColumn} + onPickEnrichment={onOpenEnrichments} /> )} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/index.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/index.ts index 6d45862e281..b6ea2bc1a86 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/index.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/index.ts @@ -1 +1,6 @@ -export { type WorkflowConfig, WorkflowSidebar } from './workflow-sidebar' +export { + type WorkflowConfig, + WorkflowSidebar, + WorkflowSidebarBody, + type WorkflowSidebarBodyProps, +} from './workflow-sidebar' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/input-mapping-section.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/input-mapping-section.tsx new file mode 100644 index 00000000000..c667fc04c08 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/input-mapping-section.tsx @@ -0,0 +1,85 @@ +'use client' + +import { useState } from 'react' +import { Badge, CollapsibleCard, Combobox, Label } from '@/components/emcn' +import type { ColumnDefinition } from '@/lib/table' +import type { InputFormatField } from '@/lib/workflows/types' + +interface InputMappingSectionProps { + /** The workflow Start block's input fields. Each gets one collapsible row. */ + inputFields: InputFormatField[] + /** Columns the user can feed into an input (all table columns). */ + columnOptions: ColumnDefinition[] + /** Current mapping: input field name → table column name. */ + value: Record + onChange: (next: Record) => void +} + +/** + * "Workflow inputs" panel: maps each of the workflow's Start-block input fields + * to the table column whose per-row value feeds it. Each field renders as a + * collapsible card — header shows the field name + type badge, the body holds + * the column picker — mirroring the workflow editor's input-mapping rows. + */ +export function InputMappingSection({ + inputFields, + columnOptions, + value, + onChange, +}: InputMappingSectionProps) { + const namedFields = inputFields.filter((f): f is InputFormatField & { name: string } => + Boolean(f.name?.trim()) + ) + const columns = columnOptions.map((c) => ({ label: c.name, value: c.name })) + const [collapsed, setCollapsed] = useState>({}) + + const toggle = (name: string) => setCollapsed((prev) => ({ ...prev, [name]: !prev[name] })) + + return ( +
+ + {namedFields.length === 0 ? ( +

+ This workflow has no Start block inputs. +

+ ) : ( +
+ {namedFields.map((field) => ( + + {field.type} + + ) : undefined + } + collapsed={collapsed[field.name] ?? false} + onToggleCollapse={() => toggle(field.name)} + > + + onChange({ ...value, [field.name]: columnName })} + /> + + ))} +
+ )} +
+ ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/workflow-sidebar.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/workflow-sidebar.tsx index b339d3397ef..4785764171a 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/workflow-sidebar.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/workflow-sidebar/workflow-sidebar.tsx @@ -18,6 +18,7 @@ import { Tooltip, toast, } from '@/components/emcn' +import { ArrowLeft, ChevronDown } from '@/components/emcn/icons' import { findValidationIssue, isValidationError } from '@/lib/api/client/errors' import { requestJson } from '@/lib/api/client/request' import type { @@ -33,6 +34,7 @@ import type { ColumnDefinition, WorkflowGroup, WorkflowGroupDependencies, + WorkflowGroupInputMapping, WorkflowGroupOutput, } from '@/lib/table' import { columnTypeForLeaf, deriveOutputColumnName } from '@/lib/table/column-naming' @@ -54,11 +56,22 @@ import { } from '@/hooks/queries/tables' import { useWorkflowState, workflowKeys } from '@/hooks/queries/workflows' import type { WorkflowMetadata } from '@/stores/workflows/registry/types' +import { InputMappingSection } from './input-mapping-section' import { RunSettingsSection } from './run-settings-section' +/** + * Distinguishes a user-built workflow column (`manual`) from one spawned off a + * shared enrichment template (`enrichment`). Enrichment groups hide the + * launch-workflow and add-inputs affordances and surface a back button to the + * enrichments list. + */ +export type WorkflowSidebarKind = 'manual' | 'enrichment' + /** * Discriminates the three flows the workflow sidebar handles: - * - `create`: brand-new workflow group spawned from the "+ New column" dropdown's "Workflow" item. + * - `create`: brand-new workflow group. From the "+ New column" dropdown's "Workflow" item + * (`kind: 'manual'`) or from an enrichment card (`kind: 'enrichment'`, with the template's + * workflow pre-seeded). * - `edit-group`: opened from the workflow-group meta header. Lets the user edit the whole group * (workflow id, deps, output set, group name). * - `edit-output`: opened from a single workflow-output column header. Focuses on this column's @@ -66,7 +79,15 @@ import { RunSettingsSection } from './run-settings-section' * secondary. */ export type WorkflowConfig = - | { mode: 'create'; proposedName: string } + | { + mode: 'create' + kind: WorkflowSidebarKind + proposedName: string + /** Pre-selected (and locked) workflow id for enrichment-create. */ + workflowId?: string + /** Title shown for enrichment-create (the enrichment card's name). */ + enrichmentName?: string + } | { mode: 'edit-group'; groupId: string } | { mode: 'edit-output'; columnName: string } @@ -83,8 +104,18 @@ interface WorkflowSidebarProps { /** Notify parent of a per-output-column rename so it can rewrite local * `columnOrder` / `columnWidths` keys. */ onColumnRename?: (oldName: string, newName: string) => void + /** When set and the active config is an enrichment, renders a back button + * that returns to the enrichments list. */ + onBack?: () => void } +/** Dashed hairline flanking the "Show additional fields" disclosure — mirrors + * the workflow editor's advanced-mode divider. */ +const DASHED_DIVIDER_STYLE = { + backgroundImage: + 'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)', +} as const + const OUTPUT_VALUE_SEPARATOR = '::' const encodeOutputValue = (blockId: string, path: string) => @@ -197,7 +228,7 @@ export function WorkflowSidebar(props: WorkflowSidebarProps) { function configKey(config: WorkflowConfig): string { switch (config.mode) { case 'create': - return `create:${config.proposedName}` + return `create:${config.kind}:${config.workflowId ?? ''}:${config.proposedName}` case 'edit-group': return `edit-group:${config.groupId}` case 'edit-output': @@ -205,11 +236,17 @@ function configKey(config: WorkflowConfig): string { } } -interface WorkflowSidebarBodyProps extends Omit { +export interface WorkflowSidebarBodyProps extends Omit { config: WorkflowConfig } -function WorkflowSidebarBody({ +/** + * The sidebar's inner content (header + scrollable form + footer) without the + * sliding `