Skip to content
3 changes: 2 additions & 1 deletion apps/sim/app/api/table/[tableId]/columns/run/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const parsed = await parseRequest(runColumnContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, groupIds, runMode, rowIds } = parsed.data.body
const { workspaceId, groupIds, runMode, rowIds, limit } = parsed.data.body
const access = await checkAccess(tableId, auth.userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)

Expand All @@ -35,6 +35,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
groupIds,
mode: runMode,
rowIds,
limit,
requestId,
})

Expand Down
1 change: 1 addition & 0 deletions apps/sim/app/api/table/[tableId]/dispatches/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou
isManualRun: r.isManualRun,
cursor: r.cursor,
scope: r.scope,
...(r.limit ? { limit: r.limit } : {}),
}))

return NextResponse.json({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ interface ContextMenuProps {
runningInSelectionCount?: number
/** Whether the table has any workflow columns; gates the run-workflows item. */
hasWorkflowColumns?: boolean
/** True when the menu was opened on a workflow-output cell, so Run / Re-run
* act on that cell's group only (the cascade handles dependents). Switches
* the labels from row-wide ("all cells") to cell-scoped ("cell"). */
workflowCellScoped?: boolean
disableEdit?: boolean
disableInsert?: boolean
disableDelete?: boolean
Expand All @@ -64,17 +68,26 @@ export function ContextMenu({
onStopWorkflows,
runningInSelectionCount = 0,
hasWorkflowColumns = false,
workflowCellScoped = false,
disableEdit = false,
disableInsert = false,
disableDelete = false,
}: ContextMenuProps) {
const deleteLabel = selectedRowCount > 1 ? `Delete ${selectedRowCount} rows` : 'Delete row'
const runLabel =
selectedRowCount > 1
const runLabel = workflowCellScoped
? selectedRowCount > 1
? `Run cell on ${selectedRowCount} rows`
: 'Run cell'
: selectedRowCount > 1
? `Run empty or failed cells on ${selectedRowCount} rows`
: 'Run empty or failed cells'
const refreshLabel =
selectedRowCount > 1 ? `Re-run all cells on ${selectedRowCount} rows` : 'Re-run all cells'
const refreshLabel = workflowCellScoped
? selectedRowCount > 1
? `Re-run cell on ${selectedRowCount} rows`
: 'Re-run cell'
: selectedRowCount > 1
? `Re-run all cells on ${selectedRowCount} rows`
: 'Re-run all cells'
const stopLabel =
runningInSelectionCount === 1
? 'Stop running workflow'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
PlayOutline,
Trash,
} from '@/components/emcn/icons'
import type { RunMode } from '@/lib/api/contracts/tables'
import type { RunLimit, RunMode } from '@/lib/api/contracts/tables'
import { cn } from '@/lib/core/utils/cn'
import type { WorkflowGroupType } from '@/lib/table'
import { getEnrichment } from '@/enrichments/registry'
Expand All @@ -31,6 +31,11 @@ import type { DisplayColumn } from '../types'

const WORKFLOW_META_BG_ALPHA = 12 // 0–255

/** Fixed row-cap presets for the "Run N empty rows" shortcuts. Shared by the
* group-header options menu and the inline quick-run dropdown so the two
* surfaces stay in sync. */
const LIMITED_RUN_PRESETS = [10, 1000] as const

interface ColumnOptionsMenuProps {
open: boolean
onOpenChange: (open: boolean) => void
Expand All @@ -53,6 +58,9 @@ interface ColumnOptionsMenuProps {
* exposes group-level run actions above the column actions. */
onRunColumnAll?: () => void
onRunColumnIncomplete?: () => void
/** Runs only the first `max` empty/unrun rows. Surfaces fixed "Run N rows"
* shortcuts so users can sample a large table without firing every row. */
onRunColumnLimited?: (max: number) => void
/** When set, surfaces a "Run N selected rows" item above Run all. */
onRunColumnSelected?: () => void
selectedRowCount?: number
Expand Down Expand Up @@ -81,6 +89,7 @@ export function ColumnOptionsMenu({
onDeleteGroup,
onRunColumnAll,
onRunColumnIncomplete,
onRunColumnLimited,
onRunColumnSelected,
selectedRowCount = 0,
onViewWorkflow,
Expand Down Expand Up @@ -129,6 +138,12 @@ export function ColumnOptionsMenu({
<DropdownMenuItem onSelect={() => onRunColumnIncomplete?.()}>
Run empty rows
</DropdownMenuItem>
{onRunColumnLimited &&
LIMITED_RUN_PRESETS.map((max) => (
<DropdownMenuItem key={max} onSelect={() => onRunColumnLimited(max)}>
{`Run ${max.toLocaleString()} empty rows`}
</DropdownMenuItem>
))}
</DropdownMenuSubContent>
</DropdownMenuSub>
<DropdownMenuSeparator />
Expand Down Expand Up @@ -184,7 +199,7 @@ interface WorkflowGroupMetaCellProps {
isGroupSelected: boolean
onSelectGroup: (startColIndex: number, size: number) => void
onOpenConfig: (columnName: string) => void
onRunColumn?: (groupId: string, mode?: RunMode, rowIds?: string[]) => void
onRunColumn?: (groupId: string, mode?: RunMode, rowIds?: string[], limit?: RunLimit) => void
onInsertLeft?: (columnName: string) => void
onInsertRight?: (columnName: string) => void
onDeleteColumn?: (columnName: string) => void
Expand Down Expand Up @@ -268,6 +283,13 @@ export function WorkflowGroupMetaCell({
}
}, [groupId, onRunColumn, selectedRowIds])

const handleRunLimited = useCallback(
(max: number) => {
if (groupId) onRunColumn?.(groupId, 'incomplete', undefined, { type: 'rows', max })
},
[groupId, onRunColumn]
)

const handleContextMenu = useCallback(
(e: React.MouseEvent) => {
if (!column) return
Expand Down Expand Up @@ -427,6 +449,11 @@ export function WorkflowGroupMetaCell({
)}
<DropdownMenuItem onSelect={handleRunAll}>Run all rows</DropdownMenuItem>
<DropdownMenuItem onSelect={handleRunIncomplete}>Run empty rows</DropdownMenuItem>
{LIMITED_RUN_PRESETS.map((max) => (
<DropdownMenuItem key={max} onSelect={() => handleRunLimited(max)}>
{`Run ${max.toLocaleString()} empty rows`}
</DropdownMenuItem>
))}
</DropdownMenuContent>
</DropdownMenu>
)}
Expand All @@ -444,6 +471,7 @@ export function WorkflowGroupMetaCell({
onDeleteGroup={onDeleteGroup ? () => onDeleteGroup(groupId) : undefined}
onRunColumnAll={onRunColumn ? handleRunAll : undefined}
onRunColumnIncomplete={onRunColumn ? handleRunIncomplete : undefined}
onRunColumnLimited={onRunColumn ? handleRunLimited : undefined}
onRunColumnSelected={onRunColumn && selectedCount > 0 ? handleRunSelected : undefined}
selectedRowCount={selectedCount}
onViewWorkflow={onViewWorkflow ? () => onViewWorkflow(workflowId) : undefined}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { useParams } from 'next/navigation'
import { usePostHog } from 'posthog-js/react'
import { Skeleton, toast, useToast } from '@/components/emcn'
import { TableX } from '@/components/emcn/icons'
import type { RunMode } from '@/lib/api/contracts/tables'
import type { RunLimit, RunMode } from '@/lib/api/contracts/tables'
import { cn } from '@/lib/core/utils/cn'
import { captureEvent } from '@/lib/posthog/client'
import type { ColumnDefinition, TableRow as TableRowType, WorkflowGroup } from '@/lib/table'
Expand Down Expand Up @@ -151,7 +151,7 @@ interface TableGridProps {
/** Open the delete-columns confirmation modal for `names`. Wrapper renders the modal. */
onRequestDeleteColumns: (names: string[]) => void
/** Fire run for a single column (meta-cell Run menu). */
onRunColumn: (groupId: string, runMode: RunMode, rowIds?: string[]) => void
onRunColumn: (groupId: string, runMode: RunMode, rowIds?: string[], limit?: RunLimit) => void
/** Fire every runnable column on a single row (per-row gutter Play). */
onRunRow: (rowId: string) => void
/** Fan out a run across every workflow group on `rowIds`. Used by context menu. */
Expand Down Expand Up @@ -423,8 +423,13 @@ export function TableGrid({
const deleteWorkflowGroupMutation = useDeleteWorkflowGroup({ workspaceId, tableId })
const updateWorkflowGroupMutation = useUpdateWorkflowGroup({ workspaceId, tableId })

function handleRunColumn(groupId: string, runMode: RunMode = 'all', rowIds?: string[]) {
onRunColumn(groupId, runMode, rowIds)
function handleRunColumn(
groupId: string,
runMode: RunMode = 'all',
rowIds?: string[],
limit?: RunLimit
) {
onRunColumn(groupId, runMode, rowIds, limit)
}

const handleViewWorkflow = useCallback(
Expand Down Expand Up @@ -751,12 +756,17 @@ export function TableGrid({
let contextMenuExecutionId: string | null = null
let contextMenuIsWorkflowColumn = false
let contextMenuHasStartedRun = false
// The workflow group of the right-clicked cell, when it's a workflow-output
// column. Scopes the run/re-run menu items to just that cell's group (the
// cascade re-runs dependents on its own) instead of every group on the row.
let contextMenuGroupId: string | null = null
if (contextMenu.row && contextMenu.columnName) {
const _col = columnsRef.current.find((c) => c.name === contextMenu.columnName)
const _gid = _col?.workflowGroupId
if (_col && _gid) {
const _exec = contextMenu.row.executions?.[_gid]
contextMenuIsWorkflowColumn = true
contextMenuGroupId = _gid
// Cells with a server-side execution log: `completed` / `error` /
// `running`, plus HITL-paused runs (status `pending` with a `paused-`
// jobId — has a real executionId + viewable trace). `queued` / plain
Expand Down Expand Up @@ -2846,13 +2856,18 @@ export function TableGrid({

// Context-menu wrappers: act on `contextMenuRowIds`, then close the menu.
// Mirror the action bar's Play / Refresh split: Play fills empty/failed,
// Refresh re-runs everything (including completed cells).
// Refresh re-runs everything (including completed cells). When the menu was
// opened on a workflow-output cell, scope to just that cell's group — the
// server cascade re-runs dependent groups whose deps it fills. Right-clicking
// a plain cell has no group, so fall back to every group on the row(s).
const handleRunWorkflowsOnSelection = () => {
onRunRows(contextMenuRowIds, 'incomplete')
if (contextMenuGroupId) onRunColumn(contextMenuGroupId, 'incomplete', contextMenuRowIds)
else onRunRows(contextMenuRowIds, 'incomplete')
closeContextMenu()
}
const handleRefreshWorkflowsOnSelection = () => {
onRunRows(contextMenuRowIds, 'all')
if (contextMenuGroupId) onRunColumn(contextMenuGroupId, 'all', contextMenuRowIds)
else onRunRows(contextMenuRowIds, 'all')
closeContextMenu()
}
const handleStopWorkflowsOnSelection = () => {
Expand Down Expand Up @@ -2946,10 +2961,17 @@ export function TableGrid({
)

// Drives Run vs Refresh visibility on the context menu — same classifier
// the action bar uses, so both surfaces stay in sync.
// the action bar uses, so both surfaces stay in sync. Scoped to the clicked
// cell's group when the menu opened on a workflow-output cell so visibility
// tracks that group's state, not the whole row's.
const contextMenuStats = useMemo(
() => classifyExecStatusMix(rows, new Set(contextMenuRowIds), tableWorkflowGroupIds),
[contextMenuRowIds, rows, tableWorkflowGroupIds]
() =>
classifyExecStatusMix(
rows,
new Set(contextMenuRowIds),
contextMenuGroupId ? [contextMenuGroupId] : tableWorkflowGroupIds
),
[contextMenuRowIds, rows, tableWorkflowGroupIds, contextMenuGroupId]
)

// Run scope is derived from one of two selection sources:
Expand Down Expand Up @@ -3411,6 +3433,7 @@ export function TableGrid({
}
runningInSelectionCount={runningInContextSelection}
hasWorkflowColumns={hasWorkflowColumns}
workflowCellScoped={Boolean(contextMenuGroupId)}
disableEdit={!userPermissions.canEdit}
disableInsert={!userPermissions.canEdit}
disableDelete={!userPermissions.canEdit}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ export function resolveCellExec(
if (areOutputsFilled(group, row)) return undefined
if (!areGroupDepsSatisfied(group, row)) return undefined
for (const d of activeDispatches) {
// Capped dispatches run only the first N eligible rows ahead of the
// cursor, and this per-row resolver can't tell which rows fall within the
// budget — rendering every ahead-of-cursor row as Queued would massively
// over-count. The dispatcher's real per-row pending stamps (arriving via
// cell SSE) cover the actual rows instead.
if (d.limit) continue
if (!d.scope.groupIds.includes(group.id)) continue
if (d.scope.rowIds && !d.scope.rowIds.includes(row.id)) continue
if (row.position <= d.cursor) continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export function useTableEventStream({
}

const applyDispatch = (event: Extract<TableEvent, { kind: 'dispatch' }>): void => {
const { dispatchId, status, scope, cursor, mode, isManualRun } = event
const { dispatchId, status, scope, cursor, mode, isManualRun, limit } = event
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
// SSE may arrive before the initial fetch lands. Seed an empty
// run-state so the dispatch isn't dropped; counters are reconciled
Expand Down Expand Up @@ -183,13 +183,15 @@ export function useTableEventStream({
// the cached entry's value if this is a legacy emit without the
// field, and finally to `false` if we have nothing.
const resolvedManualRun = isManualRun ?? existing?.isManualRun ?? false
const resolvedLimit = limit ?? existing?.limit
const next: ActiveDispatch = {
id: dispatchId,
status,
mode,
isManualRun: resolvedManualRun,
cursor,
scope,
...(resolvedLimit ? { limit: resolvedLimit } : {}),
}
if (idx === -1) return { ...base, dispatches: [...list, next] }
const merged = list.slice()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
toast,
} from '@/components/emcn'
import { Download, Pencil, Table as TableIcon, Trash, Upload } from '@/components/emcn/icons'
import type { RunMode } from '@/lib/api/contracts/tables'
import type { RunLimit, RunMode } from '@/lib/api/contracts/tables'
import type { ColumnDefinition, Filter, TableRow as TableRowType, WorkflowGroup } from '@/lib/table'
import {
type ColumnOption,
Expand Down Expand Up @@ -225,7 +225,7 @@ export function Table({
// gutter, action-bar Play/Refresh, right-click context menu) reduces to a
// (groupIds, rowIds?, runMode) triple. Empty groupIds = no-op.
const runScope = useCallback(
(args: { groupIds: string[]; rowIds?: string[]; runMode: RunMode }) => {
(args: { groupIds: string[]; rowIds?: string[]; runMode: RunMode; limit?: RunLimit }) => {
if (args.groupIds.length === 0) return
if (args.rowIds && args.rowIds.length === 0) return
runColumnMutate(args)
Expand All @@ -234,8 +234,8 @@ export function Table({
)

const onRunColumn = useCallback(
(groupId: string, runMode: RunMode, rowIds?: string[]) => {
runScope({ groupIds: [groupId], rowIds, runMode })
(groupId: string, runMode: RunMode, rowIds?: string[], limit?: RunLimit) => {
runScope({ groupIds: [groupId], rowIds, runMode, limit })
},
[runScope]
)
Expand Down
55 changes: 47 additions & 8 deletions apps/sim/background/workflow-column-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,58 @@ const logger = createLogger('TriggerWorkflowGroupCell')

/** Cell-task entrypoint. Holds a per-row cascade lock so only one worker
* advances a given row at a time; bails on contention. The held lock heart-
* beats every 10s so a crashed pod releases within ~30s. */
* beats every 10s so a crashed pod releases within ~30s.
*
* After the cascade finishes and the lock releases, re-checks for a runnable
* queued marker that may have landed between the cascade's final
* `pickNextEligibleGroupForRow` and the lock release (a window where a
* contender bails on the still-held lock but we're already done). If one
* appeared, re-acquire and drive it — this is the same task re-acquiring the
* lock, NOT a queue re-enqueue or a timed poll, and it loops only while a
* runnable group exists. */
export async function executeWorkflowGroupCellJob(
payload: WorkflowGroupCellPayload,
signal?: AbortSignal
) {
const { tableId, rowId, executionId } = payload
const outcome = await withCascadeLock(tableId, rowId, executionId, () =>
runRowCascadeLoop(payload, signal)
)
if (outcome.status === 'contended') {
logger.info(
`Cascade lock held — bailing (table=${tableId} row=${rowId} executionId=${executionId})`
const { tableId, rowId, workspaceId } = payload
const { getTableById, getRowById } = await import('@/lib/table/service')
const { pickNextEligibleGroupForRow } = await import('@/lib/table/workflow-columns')

let currentPayload = payload
while (true) {
if (signal?.aborted) break
const outcome = await withCascadeLock(tableId, rowId, currentPayload.executionId, () =>
runRowCascadeLoop(currentPayload, signal)
)
if (outcome.status === 'contended') {
// Another worker owns the row's cascade; it drains the queued marker.
logger.info(
`Cascade lock held — bailing (table=${tableId} row=${rowId} executionId=${currentPayload.executionId})`
)
break
}
if (signal?.aborted) break
const freshTable = await getTableById(tableId)
if (!freshTable) break
const freshRow = await getRowById(tableId, rowId, workspaceId)
if (!freshRow) break
const next = pickNextEligibleGroupForRow(freshTable, freshRow)
if (!next) break
// Only re-drive a genuine queued marker (an explicit run request whose
// cell-task bailed during our release window). The inner cascade loop has
// already drained every auto-eligible group, so re-driving a non-marker
// group here would re-run forever — e.g. a group that completed with empty
// outputs stays auto-eligible (the inner loop excludes it via
// `excludeGroupId`, but this outer pass has no such anchor).
const nextExec = freshRow.executions?.[next.id]
const hasQueuedMarker = nextExec?.status === 'pending' && nextExec.executionId == null
if (!hasQueuedMarker) break
currentPayload = {
...currentPayload,
groupId: next.id,
workflowId: next.workflowId,
executionId: generateId(),
}
Comment thread
cursor[bot] marked this conversation as resolved.
}
}

Expand Down
Loading
Loading