From 739e843b4acc6ae34b3b0d3153880237aacf1e5a Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 27 Jun 2026 12:34:49 -0700 Subject: [PATCH 1/2] wip: produced/consumer interface Signed-off-by: vsoch --- README.md | 118 +++--- docs/coordination-handler-design.md | 384 ++++++++++++++++++ docs/handlers.md | 8 +- docs/quantum-scheduling.md | 155 ++++--- .../test/e2e/quantum/quantum-gang-pods.yaml | 33 +- pkg/fluence/fluence.go | 34 +- pkg/webhook/handlers/gang.go | 7 + pkg/webhook/handlers/quantum.go | 277 ++++++------- pkg/webhook/handlers/quantum_test.go | 264 +++++++----- test/e2e/quantum/02-sidecar-ungate.sh | 68 ++-- test/e2e/quantum/03-gang-producer.sh | 70 ++++ test/e2e/quantum/03-gang-submitter.sh | 63 --- test/e2e/quantum/04-gang-env-contract.sh | 48 +-- test/e2e/quantum/setup.sh | 4 +- 14 files changed, 1025 insertions(+), 508 deletions(-) create mode 100644 docs/coordination-handler-design.md create mode 100644 test/e2e/quantum/03-gang-producer.sh delete mode 100644 test/e2e/quantum/03-gang-submitter.sh diff --git a/README.md b/README.md index 3ee668f..d299757 100644 --- a/README.md +++ b/README.md @@ -194,10 +194,10 @@ ceiling. Types come from the same config as the graph, so they can't drift. ### `sidecars/` — quantum coordination sidecars -Vendor-specific sidecar containers injected by the webhook into leader pods -of quantum workflow groups. Each sidecar discovers the QPU task submitted by -the leader, polls the vendor queue, and ungates worker pods when the task -reaches position==1. +Vendor-specific sidecar containers injected by the webhook into the producer pod +of a shared quantum workflow group. Each sidecar discovers the QPU task submitted +by the producer, polls the vendor queue, and ungates the consumer pods when the +task reaches position==1. ```console sidecars/ @@ -221,7 +221,7 @@ spec: ``` Fluence creates the PodGroup, injects the sidecar, creates per-namespace -RBAC, and gates all non-leader pods. See `sidecars/braket/design.md` for +RBAC, and gates the consumer pods. See `sidecars/braket/design.md` for the full design including the SDK interceptor, queue position polling, and the two-queue problem motivation. @@ -369,88 +369,90 @@ Submission is **not** done by the scheduler — the workload container holds the user's credentials and submits via qrmi-go. Fluence only schedules and hands off the backend. (When we control local quantum devices this will change.) -### 3. Quantum workflow groups (leader + workers) +### 3. Quantum workflow groups (producer + consumers) -A quantum workflow group is one pod that **submits** quantum work (the leader) -plus N pods that **wait** for the result (the workers). All pods share a group -label; Fluence co-schedules them, gives the leader a sidecar that watches the -vendor queue, and gates the workers so they consume no node resources during the -(long, variable) QPU queue wait — releasing them only when the task reaches -`queue_position == 1`. +A quantum workflow group is a gang whose members share **one** quantum task: +one pod **submits** the work (the producer) and N−1 pods **wait** for the result +(the consumers). All pods share a group label and run the *same* image; Fluence +co-schedules them, gives the producer a sidecar that watches the vendor queue, and +gates the consumers so they consume no node resources during the (long, variable) +QPU queue wait — releasing them only when the task reaches `queue_position == 1`. ```yaml -# Every pod in the group carries the same group label + schedulerName: fluence +# Every pod in the group carries the same group label + schedulerName: fluence, +# and opts into shared coordination. metadata: labels: fluence.flux-framework.org/group: my-qaoa-workflow + annotations: + fluence.flux-framework.org/coordination: shared spec: schedulerName: fluence ``` -#### How the leader is chosen — two mechanisms +#### Coordination modes -There are two ways Fluence decides which pod is the leader. They are mutually -exclusive per group; pick the one that matches how your workload is built. +`fluence.flux-framework.org/coordination` selects how the gang is coordinated; it +defaults to `independent`. -**(a) Explicit role (recommended for leader/worker workflows).** Each pod -declares its role with an annotation. This is **authoritative**: admission order -is never consulted, and the same value is injected into the container as -`FLUENCE_ROLE` so your application reads the exact role Fluence used — the two -can never disagree. +- **`shared`** — the gang shares ONE quantum task. Fluence promotes one member to + producer and gates the rest as consumers (see below). Use this for a coordinated + workflow where the classical post-processing should start together as the single + result lands. +- **`independent`** (default) — every member does its own quantum work: its own + real submit, its own queue wait, no gating. N members run N tasks. This is the + honest default; Fluence never invents coordination you did not ask for, and + never dedups tasks meant to be distinct. -```yaml -metadata: - labels: - fluence.flux-framework.org/group: my-qaoa-workflow - annotations: - fluence.flux-framework.org/role: leader # or: worker -``` +#### How the producer is chosen -Use this when the leader and workers are **different** (the leader submits the -quantum task and runs the sidecar; workers process results). The leader gets the -interceptor + sidecar; workers are gated. Because the decision is declared, it is -race-free regardless of which pod the API server admits first. Your container can -branch on `$FLUENCE_ROLE` (e.g. `leader` → submit; `worker` → wait). +In `shared` mode the producer is the member the Job controller stamps with +`batch.kubernetes.io/job-completion-index: "0"` — so an **indexed Job** gives +deterministic, race-free election from a single identical template (every pod has +the same image and group label; only the index differs). This serves two contracts +with no extra configuration: -**(b) Admission order (default when no role annotation is present).** If pods -carry the group label but **no** role annotation, the **first pod admitted** -becomes the leader and every subsequent pod is a worker. This suits a -*homogeneous* pod-template gang (Deployment/Job/StatefulSet) where every replica -is byte-identical — any one of them can lead, so "first admitted" is a fine -tiebreaker. It is **not** suitable for a heterogeneous leader/worker workflow: -since admission order is nondeterministic, a worker pod could be admitted first -and wrongly elected leader. Use mechanism (a) for that case. +- an **explicit-role script** that branches on the completion index (index 0 + submits; others wait and consume the result), and +- an **identical script** where every pod calls submit — the producer's submit is + real, and each consumer's submit is transparently returned the producer's task + (the shared-result dedup), so the code need not branch at all. -> Rule of thumb: identical replicas → admission order is fine. Distinct -> leader/worker pods → use the explicit `role` annotation. +For loose pods with no completion index, the first pod admitted claims the producer +slot; an indexed Job is recommended when you need determinism. #### What Fluence does -Regardless of mechanism, the leader gets the sidecar and a PodGroup is created -(`minCount: 1`); workers get a `quantum.braket/ready` scheduling gate and consume -no node resources during the QPU queue wait. When the sidecar observes -`queue_position == 1`, it patches the task ARN onto each worker's annotations and -removes their gates atomically with setting the `fluence-quantum-classical` -priority class so they reschedule promptly. +In `shared` mode the producer gets the interceptor (real mode) + sidecar and its +own group-of-one PodGroup `-producer` (`minCount: 1`), so it schedules +alone and runs the single real submit; it is never gated. The consumers join the +`` gang (`minCount: N−1`), get a `quantum.braket/ready` scheduling gate, and +consume no node resources during the QPU queue wait. When the sidecar observes +`queue_position == 1`, it stamps the producer's task id onto each consumer +(surfaced as `FLUENCE_QUANTUM_JOB_ID`) and removes their gates atomically with +setting the `fluence-quantum-classical` priority class so they reschedule promptly. +The producer is one of the N members, so the application runs exactly N times — +never N+1, and there is no separate submitter pod. Per-namespace RBAC (`fluence-sidecar` ServiceAccount/Role/RoleBinding) and the -interceptor ConfigMap are created automatically by the webhook on first use — no +interceptor staging are created automatically by the webhook on first use — no manual setup required. ```bash -# Just apply your pods with the group label (+ optional role annotation) and +# Apply your pods with the group label + coordination annotation + # schedulerName: fluence. RBAC is created for you. kubectl apply -f my-quantum-workflow.yaml ``` -#### A note on the homogeneous "all submit" case +#### A note on the independent "all submit" case -A group where *every* pod submits its own quantum task (no leader/worker split) -is possible but rarely what you want: N independent submissions land in the -vendor queue and run at uncoordinated times, so there is no coordination benefit -from grouping them — you would just have N standalone quantum pods. For a single -quantum submission, use a standalone pod (no group label, see §2). For a -coordinated workflow, use the leader/worker form above with an explicit role. +`coordination: independent` (the default) means *every* pod submits its own +quantum task: N independent submissions land in the vendor queue and run at +uncoordinated times. That is correct and sometimes exactly what you want (N +distinct circuits), but it offers no coordination benefit from grouping — it is +equivalent to N standalone quantum pods. For a single quantum submission, use a +standalone pod (no group label, see §2). For a coordinated workflow that shares +one result, use `coordination: shared` above. ### Notes diff --git a/docs/coordination-handler-design.md b/docs/coordination-handler-design.md new file mode 100644 index 0000000..c155fef --- /dev/null +++ b/docs/coordination-handler-design.md @@ -0,0 +1,384 @@ +# Coordination handlers: producer/consumer gang split (no separate submitter) + +> **Status: implemented.** This design is live in `pkg/webhook/handlers/quantum.go` +> (the coordination router + `mutateProducer`/`mutateConsumer`/`coordinationMode`/ +> `isProducer`), `pkg/webhook/handlers/gang.go` (classical gangs defer quantum +> pods to the quantum handler), and `pkg/fluence/fluence.go` (reconcile reaps the +> `-producer` PodGroup, never the producer pod — it is a real member). +> Unit tests are in `pkg/webhook/handlers/quantum_test.go`; structural e2e in +> `test/e2e/quantum/02–04`. Both shared-mode user contracts are served by the same +> wiring: the explicit-role script (consumers never call submit; the faux +> interceptor is unused) and the identical-script (every member calls submit; the +> faux interceptor dedups consumers to the producer's cached task). + +## Why this replaces the submitter-pod model + +The `add-sidecar-interface` branch coordinates a quantum gang by creating a +*separate* one-off submitter pod (`-submitter`) that runs the user's +application image to do the real submit, then ungates a gang of N faux-submitting +members. That works, but it runs the user's application **N+1 times** for an +N-gang: once in the submitter (a full run whose post-processing nobody consumes) +plus once in each of the N members. The redundant run is not an implementation +wart — it is a symptom of modeling quantum work as a producer/consumer split +while pretending one image plays both roles, selected at runtime by a faux flag. + +This design keeps the split (it is correct) but removes the separate pod: the +**producer is one of the N members**, promoted at admission, so the application +runs exactly **N times** — the needed number — with exactly **one real submit**. + +The core thesis is unchanged: Fluence is a generic gang scheduler (native gangs +since k8s 1.36), and per-resource nuance lives in handlers. This is entirely a +change to the `quantum` handler plus a one-line deferral in the `gang` handler. + +## The fundamental constraint + +A quantum task's content (the circuit) comes from user code, so **the pod that +defines a task must run to submit it**. Therefore, per pod, *submit* and *gate* +are mutually exclusive — a pod either runs (and can submit) or is gated (and +cannot). Gating only ever buys resource savings for pods that **do not submit**: +pods that consume a result someone else produced. + +That partitions a quantum gang into two kinds, decided per pod: + +- **producer** — runs its code, submits its own task, holds a node through the + queue wait. Not gateable, ever. +- **consumer** — never submits; reads the producer's result. Fully gateable until + that result is ready. + +## Coordination modes (user-facing contract) + +Identical pod templates (a Job/Deployment) are genuinely ambiguous between "one +shared task, fan the result out to N pods" and "N independent tasks." Fluence +cannot infer this; the user declares it with one annotation on the pod template: + +```yaml +metadata: + annotations: + fluence.flux-framework.org/coordination: shared # or: independent +``` + +| mode | meaning | who submits | gating | app runs | real submits | +|------|---------|-------------|--------|----------|--------------| +| `independent` (default) | N pods each do their own quantum work | every pod | none possible (all are producers) | N | N | +| `shared` | one task; N pods consume the result | producer only | consumers gated until task ready | N | 1 | + +`coordination` is an open enum so future designs (e.g. `scatter` — index-paired +task↔pod, §6.2 of the quantum doc) slot in as new modes without changing the +mechanism. Default is `independent`: never invent coordination the user did not +ask for, and never dedup tasks that were meant to be distinct. + +### What each mode does to resources, honestly + +- **shared**: the producer (1 node) holds its node through the queue wait; + consumers (N−1) consume **zero** node resources while gated, then start at + position==1. Idle cost during the wait ≈ 1 node, vs N for a traditional gang. +- **independent**: every pod is a producer, so every pod holds its node through + its own queue wait — N nodes idle. There is nothing to coordinate (no shared + result), so this is not a Fluence deficiency; it is the physics of "N + independent tasks," and it is the user's explicit choice. The only way to + reclaim even the producer's node in either mode is a resumable `.result()` + (replay), which reuses the faux mechanism and is deliberately **out of scope + for v1** (one idle node is cheap; replay imposes a replay-safe-code contract). + +## Producer election + +Exactly one member must be the producer. Election is deterministic for the +recommended workload and best-effort otherwise: + +- **Indexed Job (recommended):** the pod carries + `batch.kubernetes.io/job-completion-index`. **Index `0` is the producer**; + every other index is a consumer. Deterministic, race-free, no recorded state — + the controller already stamped the index, and identical templates yield + differentiated behavior purely from it. This is why an indexed Job is the right + shape and is what the experiments use. +- **Non-indexed gang (Deployment / raw grouped pods):** first arrival claims the + producer slot by creating the producer PodGroup (create-if-absent); later pods + find it present and become consumers. Best-effort (racy under simultaneous + admission); documented, with indexed Job recommended for determinism. + +## The two-group split + +| | producer (index 0) | consumers (indices 1..N−1) | +|---|---|---| +| PodGroup | `-producer`, `minCount=1` | ``, `minCount=N−1` | +| schedules | immediately, alone | atomically as a gang, **after ungate** | +| gate | none | `quantum.braket/ready` + preempting priority | +| interceptor | staged, **real (tag) mode** | staged, **faux mode** | +| sidecar | yes — polls the task, ungates `` at position==1 | no | +| app run | full; its `.run()` is the one real submit | full; its `.run()` is a faux no-op returning the producer's task | + +`minCount=1` on the producer group is what removes the deadlock that forced a +separate submitter: a single-member group schedules alone, so the producer runs +during the wait while the `minCount=N−1` consumer group sits gated. The two +groups have independent minCounts; neither blocks the other. The consumer group +keeps a real gang `minCount` (N−1), so **gang scheduling is preserved and +demonstrable** (experiment requirement 1). + +The faux path is retained verbatim, but its meaning is now honest: it is the +shared-result dedup. A consumer runs the same image and calls `.run()`; the faux +interceptor returns the producer's existing task (handed over as +`FLUENCE_QUANTUM_JOB_ID`, stamped by the sidecar at ungate) instead of +submitting. One real task, N consumers, each app run once, in full. + +## Gate / ungate flow (shared mode) + +``` +1. Producer (index 0) admitted -> own group-of-one, ungated, sidecar attached + (FLUENCE_GANG_GROUP=), interceptor in REAL mode. + Consumers (1..N-1) admitted -> group (minCount N-1), GATED, faux mode, + depends-on producer=-producer. + +2. Scheduler places the producer immediately (minCount=1). It runs the user app, + .run() submits the ONE real task (tagged fluence-pod-uid). + +3. Producer sidecar discovers the task by tag, polls queue position. + +4. At position==1 (or RUNNING): for each gated pod in : + annotate fluence.flux-framework.org/quantum-job-id= + remove the quantum.braket/ready gate (priority already set at admission) + +5. Consumer group (now ungated, minCount N-1) gang-schedules atomically and + starts as the quantum result arrives. Each consumer's .run() is faux: returns + the producer's task; .result() returns the shared result; app post-processes. +``` + +`independent` mode skips all of this: each pod is its own group-of-one, ungated, +real submit, optional observe-only sidecar — i.e. today's standalone path applied +per pod. + +--- + +## Patch + +All changes are in `pkg/webhook/handlers/`. The webhook core, the `fluxion` +handler, `dependency.go`, `sidecar.go`, and the Python interceptor/sidecar are +**unchanged**. + +### `gang.go` — defer on quantum pods (removes the ordering dependency) + +The gang handler currently calls `EnsurePodGroup` unconditionally and relies on +idempotency to coexist with the quantum handler. With the two-group split the +quantum handler owns *both* quantum PodGroups (and the producer's group differs +from its admission-time label), so the gang handler must not also gang quantum +pods. Make it skip them: + +```go +func (h *gangHandler) Applies(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) bool { + // Classical gangs only. A pod that requests the quantum resource is gang- + // scheduled by the quantum handler (which owns the producer/consumer split); + // handling it here too would create a second, conflicting PodGroup. + if spec.PodRequestsResource(pod, QuantumResource) { + return false + } + return webhook.GroupName(pod) != "" +} +``` + +### `quantum.go` — replace `Mutate` and the submitter machinery + +**Add** these constants (near the existing const block): + +```go +const ( + // CoordinationAnnotation selects how a quantum gang is coordinated. Open enum + // so new designs (e.g. "scatter") add a mode without changing the mechanism. + CoordinationAnnotation = "fluence.flux-framework.org/coordination" + // CoordinationShared: one real task; the producer (index 0) submits, the + // other members are gated consumers that dedup to the producer's task. + CoordinationShared = "shared" + // CoordinationIndependent (default): every member does its own quantum work; + // no coordination, no gating, each holds its node through its own queue wait. + CoordinationIndependent = "independent" + + // ProducerGroupSuffix names the producer's own group-of-one: -producer + // (minCount 1) so it schedules alone and never deadlocks against the gated + // consumer gang. + ProducerGroupSuffix = "-producer" + + // CompletionIndexAnnotation is the indexed-Job completion index the Job + // controller stamps on each pod; index "0" is the producer (deterministic + // election, no recorded state). + CompletionIndexAnnotation = "batch.kubernetes.io/job-completion-index" + // ProducerIndex is the completion index promoted to producer. + ProducerIndex = "0" +) +``` + +Keep `GangGroupEnv` (`FLUENCE_GANG_GROUP`) — it now tells the **producer's** +sidecar which consumer group to ungate. **Delete** the separate-submitter +constants and helpers: `SubmitterAnnotation`, `GangGroupAnnotation`, +`SubmitterGroupSuffix`, `SubmitterPodSuffix`, and the functions +`mutateSubmitter` and `ensureSubmitterPod`. Everything else in the file +(`resolveGroup`, `resolveGangSize`, `ownerReplicaSetN`, `countGroupPods`, +`linkGroupOps`, the faux-submit section, the sidecar section) is reused unchanged. + +**Replace** `Mutate` with the coordination router plus two small role functions: + +```go +func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) []spec.Op { + g := resolveGroup(pod) + n := resolveGangSize(ctx, m, pod, g) + mode := coordinationMode(pod) + observe := spec.Label(pod, ObserveLabel) == "true" + + // No coordination: a standalone quantum pod, or an explicitly independent + // member. The REAL submit happens in THIS pod; sidecar only for observe-only + // telemetry. (independent mode routes every member here -> N standalone + // producers, each owning its task and its own queue wait.) + if mode != CoordinationShared || g == "" || n <= 1 { + ops := interceptorOps(pod) + if observe { + sc := sidecarFor(m) + sc.EnsureRBAC(ctx, pod.Namespace) + ops = append(ops, sc.ContainerOps(pod, true, nil)...) + } + log.Printf("[fluence-webhook] quantum %s/%s mode=%s (standalone/independent, observe=%v)", + pod.Namespace, pod.Name, mode, observe) + return ops + } + + // shared mode: promote one member to producer; the rest are gated consumers. + if isProducer(ctx, m, pod, g) { + return h.mutateProducer(ctx, m, pod, g) + } + return h.mutateConsumer(ctx, m, pod, g, n) +} + +// mutateProducer: index-0 member. Its own group-of-one (minCount 1) so it +// schedules alone and runs the REAL submit; sidecar polls the task and ungates +// the consumer group. NOT gated, no faux. The producer is one of the N members, +// so the application is NOT run an extra time. +func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) []spec.Op { + pg := group + ProducerGroupSuffix + m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1) + ops := linkGroupOps(pod, pg) + ops = append(ops, interceptorOps(pod)...) // real (tag) mode — no fauxSubmitEnvOps + sc := sidecarFor(m) + sc.EnsureRBAC(ctx, pod.Namespace) + // Tell the sidecar which consumer group (the base group) to list + ungate. + ops = append(ops, sc.ContainerOps(pod, false, []corev1.EnvVar{{Name: GangGroupEnv, Value: group}})...) + log.Printf("[fluence-webhook] quantum producer %s/%s — group %s (ungates %q)", + pod.Namespace, pod.Name, pg, group) + return ops +} + +// mutateConsumer: a non-producer member. Joins the consumer gang +// (minCount N-1), is gated until the producer's task is ready, and runs the same +// image with the interceptor in FAUX mode so its .run() returns the producer's +// task instead of resubmitting (the shared-result dedup). +func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op { + m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1) + ops := linkGroupOps(pod, group) + dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate} + ops = append(ops, dep.applyOps(pod)...) // gate + preempting priority + depends-on + ops = append(ops, interceptorOps(pod)...) + ops = append(ops, fauxSubmitEnvOps(pod)...) + log.Printf("[fluence-webhook] quantum consumer %s/%s — group %s minCount=%d, gated+faux", + pod.Namespace, pod.Name, group, n-1) + return ops +} + +// coordinationMode reads the coordination annotation; default independent. +func coordinationMode(pod *corev1.Pod) string { + if v := spec.Annotation(pod, CoordinationAnnotation); v != "" { + return v + } + return CoordinationIndependent +} + +// isProducer decides whether THIS pod is the gang's single producer. Indexed Job +// (recommended): completion index 0 is the producer — deterministic, race-free. +// Otherwise: first arrival claims the producer slot by the absence of the +// producer PodGroup (best-effort; prefer an indexed Job). +func isProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) bool { + if idx, ok := pod.Annotations[CompletionIndexAnnotation]; ok { + return idx == ProducerIndex + } + c := m.Client() + if c == nil { + return true // tests / no client: treat as producer + } + pg := group + ProducerGroupSuffix + if _, err := c.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(ctx, pg, metav1.GetOptions{}); err == nil { + return false // already claimed by an earlier arrival + } + return true +} +``` + +Note: `pod.Annotations` may be nil; the `idx, ok := pod.Annotations[...]` form is +nil-safe in Go (indexing a nil map yields the zero value, `ok=false`). + +### Sidecar (Python) — no change + +The producer's sidecar already resolves the vendor at runtime from +`FLUXION_BACKEND`, discovers the task by the `fluence-pod-uid` tag, polls queue +position, and ungates the group named by `FLUENCE_GANG_GROUP` (now the consumer +group) at position==1, stamping `quantum-job-id` on each consumer before removing +its gate. That is exactly the existing flow with the producer in place of the +submitter pod. + +--- + +## Experiments + +Two requirements, both demonstrable on a kind cluster with the mock/faux path and +on a real cluster with Braket. + +### Requirement 1 — Fluence still gang-schedules + +Unchanged classical-gang coverage plus a shared-mode assertion: + +- **Classical gang (regression):** keep `test/e2e/gang/*`. A `minCount=N` classical + PodGroup schedules all-or-nothing. This proves the generic gang machinery is + intact (the change only adds a quantum-pod deferral to `gang.Applies`). +- **Shared consumer gang (new assertion):** submit a `coordination: shared` + indexed Job of N. Assert: exactly one `-producer` PodGroup (minCount 1) and + one `` PodGroup (minCount N−1); the producer runs while the N−1 consumers + are `SchedulingGated`; after ungate the N−1 schedule **together** (gang), not + one-by-one. This proves gang scheduling still holds for the consumer group. + +### Requirement 2 — Both modes work and shared beats a traditional gang + +The metric that isolates the win is **classical node-seconds consumed during the +quantum queue wait** (lower is better), alongside correctness checks. + +Three arms, same N, same workload (the QAOA sampler), same backend: + +| arm | how | expected node-seconds during queue wait | correctness | +|-----|-----|------------------------------------------|-------------| +| **traditional gang** (baseline) | N pods all running, each waits the full queue (no Fluence coordination — e.g. a plain native gang, or `independent` with N=N) | ≈ **N × T_queue** | N pods each run; if they each submit, N real tasks | +| **shared** (new) | `coordination: shared` indexed Job, N pods | ≈ **1 × T_queue** (producer only; consumers gated) | **1** real task; all N pods produce the **same** result; app runs N times, never N+1 | +| **independent** (new) | `coordination: independent` indexed Job, N pods | ≈ **N × T_queue** (no coordination possible) | N distinct tasks/results; correct and the user's explicit choice (reported as the honest baseline, **not** claimed as an improvement) | + +Headline comparison is **shared vs traditional**: same observable result to the N +pods, but shared idles ~1 node through the queue wait instead of N, saving +≈ (N−1) × T_queue node-seconds, and runs the application N times rather than N+1 +(the submitter-pod model's extra run is gone). + +Instrumentation (reuse the Experiment 2 harness): +- per-pod `TIMING` lines → derive each pod's gated interval vs running interval; + sum running-but-pre-result node-seconds per arm. +- producer's sidecar logs queue position over time → T_queue. +- assert real-submit count: shared = 1 (one tagged task on the backend), + independent/traditional = N (count tagged tasks). +- assert shared correctness: all N pods log the **same** task id / result hash. + +Suggested location: a new `experiments/4-coordination/` modeled on +`experiments/2-gang/` (it already measures idle reclamation), parameterized by the +`coordination` annotation and N, emitting node-seconds-during-wait, real-submit +count, and result-agreement per arm. Plot node-seconds vs N for the three arms: +traditional and independent rise ~linearly in N; shared stays ~flat at one node. + +### Build/run notes + +- The producer/consumer split needs no new image: producers and consumers run the + same sampler image; faux vs real is selected by `FLUENCE_FAUX_SUBMIT` (set on + consumers by `fauxSubmitEnvOps`), exactly as today. +- Use an **indexed** Job (`completionMode: Indexed`, `parallelism == completions == N`) + so producer election is deterministic (index 0) and `resolveGangSize` reads N + from the owner. Stamp `fluence.flux-framework.org/coordination` in the pod + template's annotations. +- kind/mock runs exercise the structural assertions (groups, gating, ungate + ordering) without a backend; real-Braket runs add the node-seconds and + real-submit-count measurements. diff --git a/docs/handlers.md b/docs/handlers.md index 1da169a..ee70519 100644 --- a/docs/handlers.md +++ b/docs/handlers.md @@ -2,9 +2,9 @@ Fluence's value is not creating gangs (Kubernetes 1.36 native gang scheduling already does that). It is **customizing the gang on the fly based on the -resources a pod requests** — e.g. a quantum leader/worker workload becomes a -size-1 leader gang plus a size-(N-1) worker gang, with the leader running a -sidecar that ungates its workers when the quantum task is ready. +resources a pod requests** — e.g. a shared quantum gang becomes a size-1 +producer gang plus a size-(N-1) consumer gang, with the producer running a +sidecar that ungates its consumers when the quantum task is ready. ## Handlers @@ -79,5 +79,5 @@ implementation can delegate — handlers do not call them directly. 3. else 1, logged. This is a common default available to every gang; handler-specific annotations -(quantum role, expected-workers, etc.) live in their handlers and are not +(quantum coordination mode, completion index, etc.) live in their handlers and are not required by the core. diff --git a/docs/quantum-scheduling.md b/docs/quantum-scheduling.md index a6967ba..e1a98c4 100644 --- a/docs/quantum-scheduling.md +++ b/docs/quantum-scheduling.md @@ -5,15 +5,15 @@ Hybrid quantum-classical workflows submit work to two independent queues: the Kubernetes scheduler (classical compute) and a QPU vendor API (quantum execution). Classical pods waste node resources while waiting for QPU queue -results. Fluence's coordination system thus gates classical worker pods until +results. Fluence's coordination system thus gates classical consumer pods until the QPU task is one position from executing, then releases them with high priority so they preempt lower-priority work and start immediately as the QPU result arrives. Yes, it could be the case the one task in the queue before -it takes a long time, but I think this is an improved approach than having worker +it takes a long time, but I think this is an improved approach than having consumer pods running (and waiting) for a much longer queue. This only is important -given that you have gangs, or leader worker designs where some leader is launching -the quantum work and otherwise the workers would be waiting and doing nothing -(and wasting resources). +given that you have gangs, or producer/consumer designs where one member is +launching the quantum work and otherwise the other members would be waiting and +doing nothing (and wasting resources). ## 1. The Two-Queue Problem @@ -67,11 +67,13 @@ queue wait — which is worse than the original problem. The design combines four mechanisms: -1. **SDK interceptor** — tags every QPU task with the pod UID -2. **Fluence webhook** — gates worker pods, injects sidecar into quantum pods +1. **SDK interceptor** — tags every QPU task with the pod UID (real mode), or + returns the producer's task instead of submitting (faux mode) +2. **Fluence webhook** — splits a shared quantum gang into one producer and N-1 + gated consumers; injects the sidecar into the producer 3. **Sidecar controller** — discovers the QPU task, polls queue position, - ungates workers when position==1 -4. **High-priority ungating** — workers preempt lower-priority work at the + ungates the consumers when position==1 +4. **High-priority ungating** — consumers preempt lower-priority work at the last responsible moment ### 3.0 When Fluence acts: the decision matrix @@ -84,19 +86,26 @@ determine what Fluence does: work and there is a vendor backend behind it. - **G (gang?)** — does the pod carry `fluence.flux-framework.org/group`? -| | not quantum | quantum | -|--------------|------------------------|----------------------------------------------------------------| -| **not gang** | group of 1 (nothing) | inject provider interceptor + env; **sidecar only in observe-only mode if telemetry requested** (no workers to ungate) | -| **gang** | gang-schedule only | leader: interceptor + env + sidecar (gates + ungates workers); workers: gate only | +A third property applies only to quantum gangs: the **coordination mode** +(`fluence.flux-framework.org/coordination`, default `independent`). In `shared` +mode the gang produces ONE quantum task that all members share; in `independent` +mode every member does its own quantum work. + +| | not quantum | quantum | +|--------------|------------------------|--------------------------------------------------------------------------------| +| **not gang** | group of 1 (nothing) | inject provider interceptor + env; **sidecar only in observe-only mode if telemetry requested** (nothing to ungate) | +| **gang** (independent) | gang-schedule only | every member is a standalone producer: interceptor + env, real submit, no gate | +| **gang** (shared) | — | producer (index 0): interceptor + env + sidecar, real submit, not gated, group-of-one `-producer`; consumers: gate + faux interceptor, gang `` (minCount N-1) | The crucial rule: **sidecar/interceptor injection is triggered by the quantum resource request, not the group label.** The group label only controls gang -scheduling and worker gating. A group leader that requests no quantum resource -(e.g. a classical pod that happens to set `BRAKET_DEVICE` itself) is just -gang-scheduled — Fluence injects no sidecar, because there is no quantum work -for it to coordinate. `BRAKET_DEVICE` (or any direct device selection by the -user) is the signal that Fluence is *not* scheduling the quantum resource; -`fluxion.flux-framework.org/qpu` is the signal that it is. +scheduling and (in shared mode) the producer/consumer split. A grouped pod that +requests no quantum resource (e.g. a classical pod that happens to set +`BRAKET_DEVICE` itself) is just gang-scheduled — Fluence injects no sidecar, +because there is no quantum work for it to coordinate. `BRAKET_DEVICE` (or any +direct device selection by the user) is the signal that Fluence is *not* +scheduling the quantum resource; `fluxion.flux-framework.org/qpu` is the signal +that it is. ### 3.1 User interface @@ -106,10 +115,19 @@ The user labels all pods in a workflow group with: metadata: labels: fluence.flux-framework.org/group: my-workflow + annotations: + # only for a quantum gang that shares ONE task across members: + fluence.flux-framework.org/coordination: shared spec: schedulerName: fluence ``` +`coordination` defaults to `independent` (every member does its own quantum +work). Set it to `shared` when the members should share a single quantum task — +then Fluence promotes one member (the indexed-Job completion index 0) to producer +and gates the rest as consumers. The user authors no roles and no submitter pod; +the split is derived from the completion index the Job controller already stamps. + I initially started with having the user create a PodGroup object, and I found that annoying. I do not want to require a PodGroup object when an annotation is easier, and then I have fine-grained control of what the groups looks like. Fluence can handle @@ -117,7 +135,7 @@ everything else automatically. The namespace distinction: - `fluence.flux-framework.org/*` — Fluence scheduler-plugin concerns - (group label, leader annotation, gate name) + (group label, coordination mode, gate name) - `fluxion.flux-framework.org/*` — Fluxion resource-graph concerns (extended resource types, backend attribute env vars) @@ -140,31 +158,54 @@ The three handlers (`pkg/webhook/handlers/`): (backend + attributes) sourced from the annotations the scheduler writes in PreBind. Generic to all Fluxion resources. -**`gang` (`gang.go`)** — applies when the pod carries the group label. Creates a -Fluence-owned PodGroup (`minCount: 1`) on first admission, records that first -pod as the admission-order leader, and stamps `spec.schedulingGroup.podGroupName` -on every pod in the group so the scheduler gangs them. The user only ever sets -the LABEL; the webhook translates it into the native field, so the user never -creates a PodGroup or knows it exists. Knows nothing about quantum — a purely -classical gang is fully handled here, with no sidecar. +**`gang` (`gang.go`)** — applies when the pod carries the group label **and does +not request the quantum resource** (a quantum pod is gang-scheduled by the quantum +handler instead, which owns the producer/consumer split). Creates a Fluence-owned +PodGroup on first admission and stamps `spec.schedulingGroup.podGroupName` on +every pod in the group so the scheduler gangs them. The user only ever sets the +LABEL; the webhook translates it into the native field, so the user never creates +a PodGroup or knows it exists. Knows nothing about quantum — a purely classical +gang is fully handled here, with no sidecar. **`quantum` (`quantum.go`)** — the only handler that knows about quantum -resources, gates, and observe semantics. Applies to a pod in either role: -- **submitter** (requests `fluxion.flux-framework.org/qpu`): a group leader, or - a standalone quantum pod. Always gets the interceptor staged (so its task is - tagged). Gets the **sidecar** only when there is coordination to do — it is a - group leader (workers to ungate) or observe-only telemetry is requested. -- **worker** (a non-leader member of a group whose recorded leader is a quantum - pod): gets the `quantum.braket/ready` scheduling gate, entering - `SchedulingGated` state — invisible to Fluxion, consuming no resources — until - the leader's sidecar ungates it. - -Role is decided by **admission order**, not resource request. In a pod-template -gang (Deployment/Job/StatefulSet) every pod is identical — same group label, -every pod requests the quantum resource — so the leader is simply the first pod -admitted (recorded on the PodGroup); every other pod is a worker, regardless of -its own request. The gate holds workers at PreEnqueue, so the scheduler does not -run PreFilter for them (and `groupPods` excludes gated pods) until ungated. +resources, gates, coordination, and observe semantics. A quantum task's circuit +comes from user code, so the pod that defines a task must RUN to submit it: submit +and gate are mutually exclusive per pod, and gating only helps pods that do not +submit. The handler therefore routes each quantum pod to one of three roles: +- **standalone / independent** (a lone quantum pod, or any member of a gang in + the default `independent` mode): gets the interceptor staged (real mode) so its + own task is tagged, performs its own real submit, is never gated, and gets the + sidecar only when observe-only telemetry is requested. Independent mode means N + members run N tasks and hold N node-waits — honest physics, the user's explicit + default. +- **producer** (in `shared` mode, the completion index 0 member): its own + group-of-one `-producer` (minCount 1) so it schedules alone and runs the + SINGLE real submit; interceptor in real mode; gets the **sidecar**, told which + consumer group to ungate (`FLUENCE_GANG_GROUP`); never gated. The producer is + one of the N members, so the application runs exactly N times — never N+1. +- **consumer** (in `shared` mode, the other N-1 members): joins the `` + gang (minCount N-1), gets the `quantum.braket/ready` scheduling gate (entering + `SchedulingGated` — invisible to Fluxion, consuming no resources — until the + producer's sidecar ungates it), and is staged with the interceptor in **faux** + mode so its submit returns the producer's task (handed over as + `FLUENCE_QUANTUM_JOB_ID` at ungate) instead of resubmitting. + +Role is decided by the **completion index**, not resource request or admission +order. In an indexed Job every pod is identical — same group label, same image, +every pod requests the quantum resource — so the producer is simply the pod the +Job controller stamps with `batch.kubernetes.io/job-completion-index: "0"`; every +other index is a consumer. (For loose pods with no completion index, the first +arrival claims the producer slot by the absence of the `-producer` +PodGroup; an indexed Job is recommended for deterministic election.) The two +groups carry independent minCounts (producer=1, consumers=N-1), which is what lets +the producer schedule and submit while the consumers stay gated — no deadlock, and +no separate submitter pod. + +Shared mode serves two user contracts with the *same* wiring: an explicit-role +script where consumers never call submit (the faux interceptor is simply unused), +and an identical script where every member calls submit (the faux interceptor +dedups the consumers to the producer's cached task). Fluence does not need to know +which contract the user wrote. ### 3.3 Interceptor and Model C delivery @@ -207,7 +248,11 @@ def patched_run(self, task_specification, *args, **kwargs): This is completely transparent to the user application — no code changes, no package install, no vendor SDK added to the user image (the hook patches whatever SDK the user already has). -leader pod, sharing its AWS credentials and network namespace. + +### 3.4 Sidecar controller + +The sidecar runs as a container alongside the producer pod, sharing its AWS +credentials and network namespace. ```console 1. READ FLUXION_ARN, FLUENCE_POD_UID from env @@ -221,7 +266,7 @@ leader pod, sharing its AWS credentials and network namespace. On timeout: fall back to time-window heuristic (tasks submitted after pod start time on the same device). -3. DISCOVER worker pods: +3. DISCOVER consumer pods: List pods in namespace with fluence.flux-framework.org/group label matching this pod's group, having quantum.braket/ready gate present. @@ -229,7 +274,7 @@ leader pod, sharing its AWS credentials and network namespace. Log position for experiment instrumentation. 5. WHEN is_ready_to_ungate(task) (position == 1 OR state == RUNNING): - For each worker pod: + For each consumer pod: kubectl annotate pod fluence.flux-framework.org/quantum-job-id= kubectl patch pod --type=json \ -p='[{"op":"add","path":"/spec/priorityClassName", @@ -240,7 +285,7 @@ leader pod, sharing its AWS credentials and network namespace. ``` The priority class and gate removal are applied atomically in one patch. -This ensures workers enter the scheduling queue with high priority +This ensures consumers enter the scheduling queue with high priority immediately, without a window where they are ungated but low-priority. ### 3.5 Priority and preemption @@ -250,10 +295,10 @@ by the sidecar at ungate time, not by the webhook at pod creation. Setting it at creation time causes an admission controller conflict (priority integer already defaulted to 0). -When workers are ungated with high priority, Kubernetes preemption evicts +When consumers are ungated with high priority, Kubernetes preemption evicts lower-priority pods to make room. Fluence's pod deletion informer catches these evictions, calls `Cancel(jobid)` in Fluxion, and frees the graph -vertices so Fluxion can allocate them to the incoming high-priority workers. +vertices so Fluxion can allocate them to the incoming high-priority consumers. ### 3.6 Classical allocation follows quantum execution order @@ -298,7 +343,7 @@ Provider: find_my_task(pod_uid, ...) # search by the fluence-pod-uid tag → opaque Task is_ready_to_ungate(task) # decision primitive: position==1 OR running queue_position(task) # optional richer telemetry; None if unavailable - job_id(task) # cross-vendor id handed to workers (NOT the ARN) + job_id(task) # cross-vendor id handed to consumers (NOT the ARN) ``` Vendor-specific identifiers (a Braket task ARN, an IBM job id, a GCP operation @@ -320,7 +365,7 @@ matching provider). Nothing else changes — no build script, no concatenation. #### Observe-only (telemetry) mode -A quantum pod that is *not* a gang (a single quantum pod, no workers to ungate) +A quantum pod that is *not* a gang (a single quantum pod, no consumers to ungate) gets the interceptor and env only — no sidecar — by default, so no surprise machinery is injected. Telemetry is opt-in via the label `fluence.flux-framework.org/observe: "true"`, surfaced to the sidecar as @@ -345,7 +390,7 @@ singleton and gang runs. ### 5.1 Preemption disrupts lower-priority work -At position==1, workers preempt running lower-priority pods. This work is +At position==1, consumers preempt running lower-priority pods. This work is re-queued and eventually runs, but there is a disruption cost. A future design using a `MatchReserveAt(time_at, spec)` Fluxion primitive — where `time_at` is supplied by the QPU vendor via an ETA or task-start event — @@ -364,7 +409,7 @@ heuristic (e.g. a time window) rather than the tag mechanism. ### 5.3 Single task per workflow -The sidecar tracks one QPU task ARN per leader pod. Parameter-shift gradient +The sidecar tracks one QPU task ARN per producer pod. Parameter-shift gradient estimation and other multi-circuit workflows require tracking a set of ARNs. See the scatter design issue for the proposed extension. @@ -388,7 +433,7 @@ function to be exposed through the Go bindings with a `starttime` parameter. For workflows with N independent QPU tasks each paired with one classical pod, an index-based pairing mechanism (`fluence.flux-framework.org/index`) -would allow the sidecar to ungate specific worker pods when their specific +would allow the sidecar to ungate specific consumer pods when their specific task reaches position==1. See the open scatter design issue. ### 6.3 Vendor task-start events @@ -401,7 +446,7 @@ precise ungating. ### 6.4 PostFilter topology-aware preemption A custom Fluence `PostFilter` plugin would ask Fluxion which graph vertices -are blocking a high-priority worker pod, then target preemption at exactly +are blocking a high-priority consumer pod, then target preemption at exactly those pods — rather than the default Kubernetes preemption which picks lowest-priority pods regardless of graph topology. This ensures preemption always produces a valid Fluxion allocation. diff --git a/examples/test/e2e/quantum/quantum-gang-pods.yaml b/examples/test/e2e/quantum/quantum-gang-pods.yaml index b345398..af11e6f 100644 --- a/examples/test/e2e/quantum/quantum-gang-pods.yaml +++ b/examples/test/e2e/quantum/quantum-gang-pods.yaml @@ -1,13 +1,22 @@ -# Gang + submitter quantum workload for the e2e (no leader/worker). +# Shared-coordination quantum gang for the e2e (producer/consumer, no submitter). # -# Two pods, identical, both requesting the quantum resource, in group "qgang". -# The user authors NO roles and NO submitter — the webhook treats this as a gang -# of full size N=2 (group-size makes N deterministic for raw pods, which have no -# owning Job/Deployment to derive it from), gates every pod, and ADDITIONALLY -# creates the one-off submitter pod "qgang-submitter" (its own group-of-one) that -# runs the real submit and ungates the gang. busybox stands in for the quantum -# app; the interceptor staging fails soft (no python), which is fine for the -# structural assertions in 02/03/04. +# Two identical pods, both requesting the quantum resource, in group "qgang" with +# coordination=shared. The user authors NO roles and NO submitter pod. The webhook +# splits the gang by completion index: +# qgang-0 (index 0) -> PRODUCER: its own group-of-one "qgang-producer" +# (minCount 1), real submit, sidecar, NOT gated. It is a +# real member, so the app runs N times, never N+1. +# qgang-1 (index 1+) -> CONSUMER: the "qgang" gang (minCount N-1=1), gated on +# quantum.braket/ready + preempting priority, interceptor +# in faux mode (its submit returns the producer's task). +# +# These are raw pods (not a Job) so the e2e can reference stable names; the +# completion-index annotation is set manually to make producer election +# deterministic (a real workload uses an indexed Job, which the controller stamps +# with batch.kubernetes.io/job-completion-index automatically). group-size makes N +# deterministic for raw pods, which have no owning Job to derive it from. busybox +# stands in for the quantum app; the interceptor staging fails soft (no python), +# which is fine for the structural assertions in 02/03/04. apiVersion: v1 kind: Pod metadata: @@ -17,6 +26,8 @@ metadata: fluence.flux-framework.org/group: qgang annotations: fluence.flux-framework.org/group-size: "2" + fluence.flux-framework.org/coordination: shared + batch.kubernetes.io/job-completion-index: "0" # -> producer spec: schedulerName: fluence restartPolicy: Never @@ -37,6 +48,8 @@ metadata: fluence.flux-framework.org/group: qgang annotations: fluence.flux-framework.org/group-size: "2" + fluence.flux-framework.org/coordination: shared + batch.kubernetes.io/job-completion-index: "1" # -> consumer spec: schedulerName: fluence restartPolicy: Never @@ -46,4 +59,4 @@ spec: command: ["sh", "-c", "echo gang member; sleep 600"] resources: requests: {fluxion.flux-framework.org/qpu: "1"} - limits: {fluxion.flux-framework.org/qpu: "1"} \ No newline at end of file + limits: {fluxion.flux-framework.org/qpu: "1"} diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index fd3b080..6c3dc13 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -828,18 +828,18 @@ func (f *Fluence) reconcileGroup(ctx context.Context, namespace, group string) { log.Printf("fluence: reconciled completed gang %s/%s — deleted Fluence-created PodGroup, allocation freed", namespace, group) - // Gang+submitter cleanup: the one-off quantum submitter pod and its - // group-of-one PodGroup (-submitter) are not owned by the user's - // workload, so reap them alongside the gang. The submitter pod also carries - // an ownerReference to this gang PodGroup (so its deletion cascades via GC); - // this explicit delete is the backstop and also removes the submitter's own - // PodGroup. Skip when this group is itself a submitter group, to avoid - // recursing on -submitter-submitter. - if !strings.HasSuffix(group, submitterGroupSuffix) { - sg := group + submitterGroupSuffix - _ = f.handle.ClientSet().SchedulingV1alpha2().PodGroups(namespace).Delete(ctx, sg, metav1.DeleteOptions{}) - _ = f.handle.ClientSet().CoreV1().Pods(namespace).Delete(ctx, sg, metav1.DeleteOptions{}) - log.Printf("fluence: reaped submitter %s/%s for gang %s", namespace, sg, group) + // Producer-group cleanup: in shared coordination the gang is split into the + // consumer group (this group) and the producer's group-of-one + // -producer (a Fluence-created PodGroup, minCount 1). The producer POD + // is a real member of the user's workload (indexed-Job index 0), so we must + // NOT delete it — only its Fluence-created PodGroup, as a backstop to free its + // allocation (its own reconcile pass also reaps it once the producer pod is + // terminal). Skip when this group is itself a producer group, to avoid + // recursing on -producer-producer. + if !strings.HasSuffix(group, producerGroupSuffix) { + pg := group + producerGroupSuffix + _ = f.handle.ClientSet().SchedulingV1alpha2().PodGroups(namespace).Delete(ctx, pg, metav1.DeleteOptions{}) + log.Printf("fluence: reaped producer PodGroup %s/%s for gang %s", namespace, pg, group) } } @@ -852,11 +852,11 @@ const reconcileGraceForEmpty = 2 * time.Minute // package (the scheduler must not depend on the webhook). Kept in sync with it. const webhookGroupLabel = "fluence.flux-framework.org/group" -// submitterGroupSuffix mirrors handlers.SubmitterGroupSuffix: the one-off quantum -// submitter for gang is named -submitter (both the pod and its PodGroup). -// Duplicated here to avoid importing the webhook handlers package into the -// scheduler plugin; keep the two in sync. -const submitterGroupSuffix = "-submitter" +// producerGroupSuffix mirrors handlers.ProducerGroupSuffix: in shared +// coordination the producer (indexed-Job index 0) is its own group-of-one named +// -producer. Duplicated here to avoid importing the webhook handlers package +// into the scheduler plugin; keep the two in sync. +const producerGroupSuffix = "-producer" // onPodGroupDeleted frees the gang's allocation when its PodGroup is deleted. func (f *Fluence) onPodGroupDeleted(obj interface{}) { diff --git a/pkg/webhook/handlers/gang.go b/pkg/webhook/handlers/gang.go index 0469c11..8ba83f3 100644 --- a/pkg/webhook/handlers/gang.go +++ b/pkg/webhook/handlers/gang.go @@ -26,6 +26,13 @@ type gangHandler struct{} func (h *gangHandler) Name() string { return "gang" } func (h *gangHandler) Applies(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) bool { + // Classical gangs only. A pod that requests the quantum resource is gang- + // scheduled by the quantum handler, which owns the producer/consumer split and + // creates both the -producer and PodGroups itself; handling it + // here too would create a second, conflicting PodGroup for the group. + if spec.PodRequestsResource(pod, QuantumResource) { + return false + } return webhook.GroupName(pod) != "" } diff --git a/pkg/webhook/handlers/quantum.go b/pkg/webhook/handlers/quantum.go index 47e1714..2990f85 100644 --- a/pkg/webhook/handlers/quantum.go +++ b/pkg/webhook/handlers/quantum.go @@ -24,22 +24,35 @@ func init() { // Quantum-specific policy. The webhook core knows NONE of these — they live only // here, in the quantum handler. // -// Model (no leader/worker): a workload requesting the quantum resource (Job, -// Deployment, or loose pods — the trigger is the resource, not the kind) becomes -// a GANG of full size N: one PodGroup, every pod fully gated and raised to a -// preempting priority, each staged with the interceptor in FAUX mode (the submit -// is a no-op). Fluence ALSO creates a separate one-off SUBMITTER pod — a -// group-of-one running the SAME application container plus the real sidecar — -// which submits the quantum task for real, tags it, stamps the resulting job-id -// onto the gang, and ungates the gang. There is no leader among the user's pods; -// the submitter is the only submitting pod and Fluence owns it. +// Model (producer/consumer split, no separate submitter pod). A quantum task's +// circuit comes from user code, so the pod that defines a task must RUN to submit +// it — submit and gate are mutually exclusive per pod. Gating therefore only +// helps pods that do NOT submit. A quantum gang in CoordinationShared mode is +// split, per pod, into two roles decided at admission: +// +// - PRODUCER (one member, the indexed-Job completion index 0): its own +// group-of-one -producer (minCount 1) so it schedules alone and runs +// the SINGLE real submit; staged with the interceptor in REAL (tag) mode and +// given the sidecar, which polls the task and ungates the consumers at +// position==1. NOT gated. The producer is one of the N members, so the +// application is run exactly N times — never N+1. +// - CONSUMERS (the other N-1 members): the gang (minCount N-1), each +// gated on quantum.braket/ready + preempting priority, staged with the +// interceptor in FAUX mode so its submit returns the producer's task instead +// of resubmitting (the shared-result dedup). Ungated together when the +// producer's task is ready. +// +// In CoordinationIndependent mode (the default) there is no shared result to +// coordinate: every member is its own standalone producer (real submit, no gate), +// each owning its task and its own queue wait. A lone quantum pod (no group) is +// always standalone. const ( // QuantumResource is the Fluxion resource a pod requests to ask Fluence to // schedule quantum work. Requesting it is the sole trigger for this handler. QuantumResource = "fluxion.flux-framework.org/qpu" - // QuantumGate holds a gang pod unscheduled until the submitter's task is - // ready (the submitter's sidecar removes it). + // QuantumGate holds a consumer pod unscheduled until the producer's task is + // ready (the producer's sidecar removes it). QuantumGate = "quantum.braket/ready" // ObserveLabel opts a STANDALONE quantum pod (a group of one) into @@ -48,108 +61,157 @@ const ( ObserveLabel = "fluence.flux-framework.org/observe" // DependencyKindQuantumSubmit is the readiness Kind for the quantum resource - // type: gang pods wait for a quantum submission to reach the device queue. + // type: consumer pods wait for a quantum submission to reach the device queue. // First concrete instance of the general Dependency primitive (dependency.go). DependencyKindQuantumSubmit = "quantum-submit" - // SubmitterAnnotation marks the Fluence-created submitter pod so its own - // admission is recognized (real sidecar, real submit, not gated) instead of - // being treated as another gang member. - SubmitterAnnotation = "fluence.flux-framework.org/submitter" - - // GangGroupAnnotation, set on the submitter at creation, names the gang group - // the submitter must ungate. Surfaced to its sidecar as FLUENCE_GANG_GROUP. - GangGroupAnnotation = "fluence.flux-framework.org/gang-group" - - // SubmitterGroupSuffix: the submitter is its own group-of-one named - // -submitter (a distinct PodGroup, minCount 1, so it schedules alone - // and never deadlocks against the gated gang). - SubmitterGroupSuffix = "-submitter" - - // GangGroupEnv tells the submitter's sidecar which gang group label to list + // CoordinationAnnotation selects how a quantum gang is coordinated. It is an + // open enum so future designs (e.g. index-paired "scatter") add a mode + // without changing the mechanism. + CoordinationAnnotation = "fluence.flux-framework.org/coordination" + + // CoordinationShared: one real task; the producer (index 0) submits and the + // other members are gated consumers that dedup to the producer's task. Serves + // BOTH the explicit-role contract (workers never call submit) and the + // identical-script contract (workers call submit; the faux interceptor returns + // the producer's cached task) — the handler is the same for both. + CoordinationShared = "shared" + + // CoordinationIndependent (default): every member does its own quantum work; + // no coordination, no gating. Never invent coordination the user did not ask + // for, and never dedup tasks meant to be distinct. + CoordinationIndependent = "independent" + + // ProducerGroupSuffix names the producer's own group-of-one: -producer + // (minCount 1) so it schedules alone and never deadlocks against the gated + // consumer gang. + ProducerGroupSuffix = "-producer" + + // CompletionIndexAnnotation is the indexed-Job completion index the Job + // controller stamps on each pod; index "0" is the producer (deterministic + // election with no recorded state). + CompletionIndexAnnotation = "batch.kubernetes.io/job-completion-index" + + // ProducerIndex is the completion index promoted to producer. + ProducerIndex = "0" + + // GangGroupEnv tells the producer's sidecar which consumer group label to list // and ungate when the task is ready. GangGroupEnv = "FLUENCE_GANG_GROUP" ) -// quantumHandler creates, for a quantum workload, a fully-gated faux-submitting -// gang plus a one-off real submitter (see the package-level model comment). It -// is the only place in the webhook that knows about quantum resources, gates, -// submitters, or observe semantics. +// quantumHandler splits a shared quantum gang into a single producer (real +// submit + sidecar) and N-1 gated faux consumers, or runs every member +// standalone in independent mode (see the package-level model comment). It is the +// only place in the webhook that knows about quantum resources, gates, +// coordination, or observe semantics. type quantumHandler struct{} func (h *quantumHandler) Name() string { return "quantum" } -// Applies to any pod requesting the quantum resource. Gang members run the same -// image as the submitter and request it; the submitter (a copy) requests it; a -// standalone quantum pod requests it. Nothing without the resource needs quantum -// handling, so this is the single, unambiguous trigger. +// Applies to any pod requesting the quantum resource. Producers, consumers, and +// standalone quantum pods all request it; nothing without the resource needs +// quantum handling, so this is the single, unambiguous trigger. func (h *quantumHandler) Applies(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) bool { return spec.PodRequestsResource(pod, QuantumResource) } func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) []spec.Op { - // The Fluence-created submitter: real interceptor + real sidecar, its own - // group-of-one, NOT gated. Recognized by the marker set at creation. - if spec.Annotation(pod, SubmitterAnnotation) == "true" { - return h.mutateSubmitter(ctx, m, pod) - } - g := resolveGroup(pod) - observe := spec.Label(pod, ObserveLabel) == "true" n := resolveGangSize(ctx, m, pod, g) + mode := coordinationMode(pod) + observe := spec.Label(pod, ObserveLabel) == "true" - // Standalone quantum pod (a group of one): it performs its own real submit. - // No gang, no gating, no faux, no separate submitter. The sidecar is added - // only for observe-only telemetry. - if g == "" || n <= 1 { + // No coordination: a standalone quantum pod, or an explicitly independent + // member. The REAL submit happens in THIS pod; the sidecar is added only for + // observe-only telemetry. (independent mode routes every member here -> N + // standalone producers, each owning its task and its own queue wait.) + if mode != CoordinationShared || g == "" || n <= 1 { ops := interceptorOps(pod) if observe { sc := sidecarFor(m) sc.EnsureRBAC(ctx, pod.Namespace) ops = append(ops, sc.ContainerOps(pod, true, nil)...) } - log.Printf("[fluence-webhook] quantum standalone %s/%s (observe=%v)", pod.Namespace, pod.Name, observe) + log.Printf("[fluence-webhook] quantum %s/%s mode=%s (standalone/independent, observe=%v)", + pod.Namespace, pod.Name, mode, observe) return ops } - // Gang member: full gang of N in one PodGroup, fully gated + preempting - // priority + faux interceptor. Fluence also ensures the one-off submitter - // (idempotent) that does the real submit and ungates this gang. - m.EnsurePodGroup(ctx, pod.Namespace, g, pod.Name, n) - ensureSubmitterPod(ctx, m, pod, g) + // shared mode: promote one member to producer; the rest are gated consumers. + if isProducer(ctx, m, pod, g) { + return h.mutateProducer(ctx, m, pod, g) + } + return h.mutateConsumer(ctx, m, pod, g, n) +} - ops := linkGroupOps(pod, g) - // Express the wait as the GENERAL dependency primitive: this gang pod depends - // on the quantum submission produced by -submitter, held by the quantum +// mutateProducer wires the single producer member (indexed-Job completion index +// 0): its own group-of-one -producer (minCount 1) so it schedules alone +// and runs the REAL submit, the interceptor in tag mode, RBAC, and the sidecar +// told which consumer group to ungate (FLUENCE_GANG_GROUP). The producer is one +// of the N members, so the application is NOT run an extra time. Never gated. +func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) []spec.Op { + pg := group + ProducerGroupSuffix + m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1) + ops := linkGroupOps(pod, pg) + ops = append(ops, interceptorOps(pod)...) // real (tag) mode — no fauxSubmitEnvOps + sc := sidecarFor(m) + sc.EnsureRBAC(ctx, pod.Namespace) + extra := []corev1.EnvVar{{Name: GangGroupEnv, Value: group}} + ops = append(ops, sc.ContainerOps(pod, false, extra)...) + log.Printf("[fluence-webhook] quantum producer %s/%s — group %s (ungates consumers %q)", + pod.Namespace, pod.Name, pg, group) + return ops +} + +// mutateConsumer wires a non-producer member: it joins the consumer gang +// (minCount N-1), is gated until the producer's task is ready, and runs the same +// image with the interceptor in FAUX mode so its submit returns the producer's +// task (handed over as FLUENCE_QUANTUM_JOB_ID at ungate) instead of resubmitting. +// In the explicit-role contract the consumer's script never calls submit, so the +// faux interceptor is simply unused — harmless either way. +func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op { + m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1) + ops := linkGroupOps(pod, group) + // Express the wait as the GENERAL dependency primitive: this consumer depends + // on the quantum submission produced by -producer, held by the quantum // gate. applyOps gates the pod, raises priority, and stamps depends-on-*. - dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: g + SubmitterGroupSuffix, Gate: QuantumGate} + dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate} ops = append(ops, dep.applyOps(pod)...) - // Same interceptor as the submitter, but FAUX mode so the gang pod never - // resubmits; it receives the real task id via FLUENCE_QUANTUM_JOB_ID. ops = append(ops, interceptorOps(pod)...) ops = append(ops, fauxSubmitEnvOps(pod)...) - log.Printf("[fluence-webhook] quantum gang member %s/%s — group %s minCount=%d, gated+faux", - pod.Namespace, pod.Name, g, n) + log.Printf("[fluence-webhook] quantum consumer %s/%s — group %s minCount=%d, gated+faux", + pod.Namespace, pod.Name, group, n-1) return ops } -// mutateSubmitter wires the Fluence-created submitter pod: its own PodGroup of -// one, the real interceptor (tag mode), RBAC, and the sidecar container told -// which gang group to ungate (FLUENCE_GANG_GROUP). The submitter is never gated. -func (h *quantumHandler) mutateSubmitter(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) []spec.Op { - sg := webhook.GroupName(pod) // the submitter's own group: -submitter - gang := spec.Annotation(pod, GangGroupAnnotation) - if sg != "" { - m.EnsurePodGroup(ctx, pod.Namespace, sg, pod.Name, 1) +// coordinationMode reads the coordination annotation; default independent. +func coordinationMode(pod *corev1.Pod) string { + if v := spec.Annotation(pod, CoordinationAnnotation); v != "" { + return v } - sc := sidecarFor(m) - ops := sc.InterceptorOps(pod) - sc.EnsureRBAC(ctx, pod.Namespace) - extra := []corev1.EnvVar{{Name: GangGroupEnv, Value: gang}} - ops = append(ops, sc.ContainerOps(pod, false, extra)...) - log.Printf("[fluence-webhook] quantum submitter %s/%s — group %s (ungates gang %q)", - pod.Namespace, pod.Name, sg, gang) - return ops + return CoordinationIndependent +} + +// isProducer decides whether THIS pod is the gang's single producer. Indexed Job +// (recommended): completion index 0 is the producer — deterministic, race-free, +// no recorded state. Otherwise: first arrival claims the producer slot by the +// absence of the producer PodGroup (best-effort under concurrent admission; +// prefer an indexed Job for determinism). Indexing a nil annotations map yields +// ok=false, so the indexed branch is nil-safe. +func isProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) bool { + if idx, ok := pod.Annotations[CompletionIndexAnnotation]; ok { + return idx == ProducerIndex + } + c := m.Client() + if c == nil { + return true // tests / no client: treat as producer + } + pg := group + ProducerGroupSuffix + if _, err := c.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(ctx, pg, metav1.GetOptions{}); err == nil { + return false // already claimed by an earlier arrival + } + return true } // resolveGroup returns the gang group identity: the explicit group label, else @@ -234,68 +296,7 @@ func countGroupPods(ctx context.Context, m webhook.MutatorAPI, namespace, group return int32(len(list.Items)) } -// SubmitterPodSuffix names the Fluence-created submitter for a group: -// -submitter. It also serves as the submitter's own PodGroup name. -const SubmitterPodSuffix = SubmitterGroupSuffix - -// ensureSubmitterPod creates the one-off quantum submitter pod for a group -// (idempotent create-if-absent — a client side-effect of admission, like -// EnsurePodGroup/EnsureSidecarRBAC; NOT a separate controller). It is built from -// the admitted gang pod so it runs the SAME application + credentials, is its own -// group-of-one (-submitter), is marked the submitter (so its admission -// gets the real sidecar and is not gated), and records the gang group it must -// ungate. An ownerReference to the gang's PodGroup cascades GC: when the gang -// PodGroup is deleted (gang completed/deleted), the submitter is collected too. -func ensureSubmitterPod(ctx context.Context, m webhook.MutatorAPI, gangPod *corev1.Pod, group string) { - c := m.Client() - if c == nil { - return - } - name := group + SubmitterGroupSuffix - if _, err := c.CoreV1().Pods(gangPod.Namespace).Get(ctx, name, metav1.GetOptions{}); err == nil { - return // already created (idempotent) - } - // Clean copy of the user's application: same containers (image, env, creds, - // the quantum resource request) and app volumes — none of the gang's gating - // or faux wiring. - src := gangPod.DeepCopy() - submitter := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: gangPod.Namespace, - Labels: map[string]string{webhook.GroupLabel: name}, - Annotations: map[string]string{ - SubmitterAnnotation: "true", - GangGroupAnnotation: group, - }, - }, - Spec: corev1.PodSpec{ - SchedulerName: webhook.SchedulerName, - RestartPolicy: corev1.RestartPolicyNever, - Containers: src.Spec.Containers, - Volumes: src.Spec.Volumes, - }, - } - // Cascade GC: own the submitter by the gang's PodGroup (created moments ago by - // the caller). Best-effort — only set when the PodGroup UID is known (it is on - // a real cluster; the fake client in tests may leave it empty, in which case - // we skip the ref rather than emit an invalid one). - if pg, err := c.SchedulingV1alpha2().PodGroups(gangPod.Namespace).Get(ctx, group, metav1.GetOptions{}); err == nil && pg.UID != "" { - submitter.OwnerReferences = []metav1.OwnerReference{{ - APIVersion: "scheduling.k8s.io/v1alpha2", - Kind: "PodGroup", - Name: group, - UID: pg.UID, - }} - } - if _, err := c.CoreV1().Pods(gangPod.Namespace).Create(ctx, submitter, metav1.CreateOptions{}); err != nil { - log.Printf("[fluence-webhook] submitter pod %s/%s: %v", gangPod.Namespace, name, err) - } else { - log.Printf("[fluence-webhook] created submitter pod %s/%s for gang %s", gangPod.Namespace, name, group) - } -} - -// linkGroupOps ensures the gang pod carries the group label (so the submitter's +// linkGroupOps ensures the gang pod carries the group label (so the producer's // sidecar can list it) and is linked to the gang PodGroup via // spec.schedulingGroup.podGroupName. Idempotent. func linkGroupOps(pod *corev1.Pod, group string) []spec.Op { diff --git a/pkg/webhook/handlers/quantum_test.go b/pkg/webhook/handlers/quantum_test.go index 613724d..7330a2a 100644 --- a/pkg/webhook/handlers/quantum_test.go +++ b/pkg/webhook/handlers/quantum_test.go @@ -4,8 +4,9 @@ Copyright 2024 Lawrence Livermore National Security, LLC SPDX-License-Identifier: Apache-2.0 */ -// quantum_test.go — all tests for the quantum handler: the gang + submitter -// model, faux-submit, the sidecar wiring, the Dependency primitive, and the +// quantum_test.go — all tests for the quantum handler: the producer/consumer +// shared-coordination split (no separate submitter pod), independent mode, +// faux-submit, the sidecar wiring, the Dependency primitive, and the // standalone/observe paths. Shared fixtures (qpuPod, cpuPod, op helpers) live in // handlers_test.go. package handlers @@ -56,11 +57,28 @@ func TestObserveLabelInjectsSidecar(t *testing.T) { } } -// ── gang + submitter ──────────────────────────────────────────────────────────── +// ── shared coordination: producer / consumer split ────────────────────────────── -// gangQPUPod is a quantum workload pod (requests the resource) in a group, -// owned by a Job of parallelism N — the common real shape (a MiniCluster / -// indexed Job). No role annotation: the new model has no leader/worker. +// sharedQPUPod is a quantum workload pod (requests the resource) in a group, +// owned by a Job of parallelism N, with coordination=shared and a completion +// index. Index "0" is the producer; any other index is a consumer. This is the +// real shape: an indexed Job whose identical template yields differentiated +// roles purely from the completion index. +func sharedQPUPod(ns, group, name, job, index string) *corev1.Pod { + p := qpuPod("fluence") + p.Name = name + p.Namespace = ns + p.Labels = map[string]string{webhook.GroupLabel: group} + p.Annotations = map[string]string{ + CoordinationAnnotation: CoordinationShared, + CompletionIndexAnnotation: index, + } + p.OwnerReferences = []metav1.OwnerReference{{Kind: "Job", Name: job}} + return p +} + +// gangQPUPod is a quantum workload pod in a group owned by a Job, with NO +// coordination annotation — i.e. the default (independent) mode. func gangQPUPod(ns, group, name, job string) *corev1.Pod { p := qpuPod("fluence") p.Name = name @@ -80,10 +98,11 @@ func mincount(t *testing.T, cs *fake.Clientset, ns, group string) (int32, bool) return pg.Spec.SchedulingPolicy.Gang.MinCount, true } -// A quantum gang member (owned by Job parallelism=3) is gated + faux, its gang -// PodGroup is minCount 3 (full N — no N-1 split), and Fluence creates the -// separate -submitter pod. It gets NO sidecar (it is gated). -func TestQuantumGangGatedFauxAndSubmitterCreated(t *testing.T) { +// A shared-mode CONSUMER (completion index != 0, owned by Job parallelism=3) is +// gated + faux, joins the consumer gang at minCount N-1 (the split), and +// gets NO sidecar (it is gated). No separate submitter pod is ever created — the +// producer is one of the N members. +func TestSharedConsumerGatedFauxAndSplit(t *testing.T) { ns, group, job := "default", "qg", "qg-job" par := int32(3) cs := fake.NewSimpleClientset(&batchv1.Job{ @@ -91,44 +110,33 @@ func TestQuantumGangGatedFauxAndSubmitterCreated(t *testing.T) { Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) m := &webhook.Mutator{Clientset: cs} - ops := m.Mutate(context.Background(), gangQPUPod(ns, group, "qg-0", job)) + ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "qg-1", job, "1")) if !hasGateOp(ops) { - t.Error("gang member must be gated") + t.Error("consumer must be gated") } if hasSidecarOp(ops) { - t.Error("gang member (gated) must NOT get a sidecar") + t.Error("consumer (gated) must NOT get a sidecar") } if e, ok := envOp(ops, FauxSubmitEnv); !ok || e.Value != "true" { - t.Errorf("gang member must get %s=true", FauxSubmitEnv) - } - if mc, ok := mincount(t, cs, ns, group); !ok || mc != 3 { - t.Errorf("gang PodGroup minCount=%d (ok=%v), want 3 (full N, no split)", mc, ok) + t.Errorf("consumer must get %s=true", FauxSubmitEnv) } - // No -workers subgroup in the new model. - if _, ok := mincount(t, cs, ns, group+"-workers"); ok { - t.Error("there must be no -workers subgroup in the gang+submitter model") + // Consumer gang is minCount N-1 (the producer/consumer split). + if mc, ok := mincount(t, cs, ns, group); !ok || mc != 2 { + t.Errorf("consumer PodGroup minCount=%d (ok=%v), want 2 (N-1 split)", mc, ok) } - // Fluence created the submitter. - sub, err := cs.CoreV1().Pods(ns).Get(context.Background(), group+SubmitterGroupSuffix, metav1.GetOptions{}) - if err != nil { - t.Fatalf("submitter pod not created: %v", err) - } - if sub.Annotations[SubmitterAnnotation] != "true" { - t.Error("submitter must carry the submitter marker") - } - if sub.Annotations[GangGroupAnnotation] != group { - t.Errorf("submitter gang-group=%q, want %q", sub.Annotations[GangGroupAnnotation], group) - } - if len(sub.Spec.SchedulingGates) != 0 { - t.Error("submitter must NOT be gated") + // No separate submitter pod is created. + pods, _ := cs.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) + if len(pods.Items) != 0 { + t.Errorf("shared mode must NOT spawn a separate submitter pod; found %d pods", len(pods.Items)) } } -// The submitter pod, on its own admission, is wired as the real coordinator: its -// own PodGroup minCount 1, the real sidecar (not faux), not gated, and told which -// gang to ungate via FLUENCE_GANG_GROUP. -func TestSubmitterWiredAsRealSidecar(t *testing.T) { +// The shared-mode PRODUCER (completion index 0) is wired as the real coordinator: +// its own group-of-one -producer at minCount 1, the real sidecar (not +// faux), not gated, and told which consumer group to ungate via +// FLUENCE_GANG_GROUP. It is one of the N members — no extra pod is created. +func TestSharedProducerWiredAsRealSidecar(t *testing.T) { ns, group, job := "default", "qg2", "qg2-job" par := int32(2) cs := fake.NewSimpleClientset(&batchv1.Job{ @@ -136,24 +144,18 @@ func TestSubmitterWiredAsRealSidecar(t *testing.T) { Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) m := &webhook.Mutator{Clientset: cs} - // First a gang member, which creates the submitter. - m.Mutate(context.Background(), gangQPUPod(ns, group, "qg2-0", job)) - sub, err := cs.CoreV1().Pods(ns).Get(context.Background(), group+SubmitterGroupSuffix, metav1.GetOptions{}) - if err != nil { - t.Fatalf("submitter not created: %v", err) - } + ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "qg2-0", job, "0")) - ops := m.Mutate(context.Background(), sub) if !hasSidecarOp(ops) { - t.Error("submitter must get the real sidecar") + t.Error("producer must get the real sidecar") } if hasGateOp(ops) { - t.Error("submitter must NOT be gated") + t.Error("producer must NOT be gated") } if _, ok := envOp(ops, FauxSubmitEnv); ok { - t.Error("submitter must NOT be in faux mode") + t.Error("producer must NOT be in faux mode") } - // FLUENCE_GANG_GROUP is on the sidecar container itself. + // FLUENCE_GANG_GROUP (the consumer group to ungate) is on the sidecar. var sidecar *corev1.Container for _, op := range ops { if c, ok := op.Value.(corev1.Container); ok && c.Name == SidecarContainerName { @@ -162,7 +164,7 @@ func TestSubmitterWiredAsRealSidecar(t *testing.T) { } } if sidecar == nil { - t.Fatal("no sidecar container on submitter") + t.Fatal("no sidecar container on producer") } var gotGang bool for _, e := range sidecar.Env { @@ -171,16 +173,79 @@ func TestSubmitterWiredAsRealSidecar(t *testing.T) { } } if !gotGang { - t.Errorf("submitter sidecar must get %s=%q", GangGroupEnv, group) + t.Errorf("producer sidecar must get %s=%q", GangGroupEnv, group) + } + // Producer is its own group-of-one (minCount 1). + if mc, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); !ok || mc != 1 { + t.Errorf("producer PodGroup %s minCount=%d (ok=%v), want 1", group+ProducerGroupSuffix, mc, ok) + } + // No separate submitter pod. + pods, _ := cs.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) + if len(pods.Items) != 0 { + t.Errorf("producer is a member, not a spawned pod; found %d pods", len(pods.Items)) + } +} + +// Shared mode never creates an extra pod: a full gang (producer index 0 + +// consumers) is N members, so the application runs exactly N times (not N+1 as +// the old submitter-pod model did). +func TestSharedGangNoSeparateSubmitterPod(t *testing.T) { + ns, group, job := "default", "qauto", "qauto-job" + par := int32(2) + cs := fake.NewSimpleClientset(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) + m := &webhook.Mutator{Clientset: cs} + + m.Mutate(context.Background(), sharedQPUPod(ns, group, "qauto-0", job, "0")) // producer + m.Mutate(context.Background(), sharedQPUPod(ns, group, "qauto-1", job, "1")) // consumer + + pods, _ := cs.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) + if len(pods.Items) != 0 { + t.Errorf("shared mode must not create any pods (no submitter); found %d", len(pods.Items)) + } + // Both groups exist with the right minCounts. + if mc, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); !ok || mc != 1 { + t.Errorf("producer group minCount=%d (ok=%v), want 1", mc, ok) + } + if mc, ok := mincount(t, cs, ns, group); !ok || mc != 1 { + t.Errorf("consumer group minCount=%d (ok=%v), want N-1=1", mc, ok) + } +} + +// ── independent mode (default) ────────────────────────────────────────────────── + +// A grouped quantum pod with no coordination annotation is INDEPENDENT (default): +// it does its own real submit, is not gated, not faux, and triggers no group +// split and no submitter pod. +func TestIndependentGroupedQuantumIsStandalone(t *testing.T) { + ns, group, job := "default", "indep", "indep-job" + par := int32(3) + cs := fake.NewSimpleClientset(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) + m := &webhook.Mutator{Clientset: cs} + + ops := m.Mutate(context.Background(), gangQPUPod(ns, group, "indep-0", job)) + + if hasGateOp(ops) { + t.Error("independent member must not be gated") } - if mc, ok := mincount(t, cs, ns, group+SubmitterGroupSuffix); !ok || mc != 1 { - t.Errorf("submitter PodGroup minCount=%d (ok=%v), want 1", mc, ok) + if _, ok := envOp(ops, FauxSubmitEnv); ok { + t.Error("independent member must not be faux") + } + if _, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); ok { + t.Error("independent mode must not create a producer group") + } + pods, _ := cs.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) + if len(pods.Items) != 0 { + t.Error("independent mode must not spawn a submitter pod") } } // A standalone quantum pod (no group, no owner → group of one) does its own real // submit: interceptor staged, but no gating, no faux, and no separate submitter. -func TestStandaloneQuantumIsRealNoSubmitter(t *testing.T) { +func TestStandaloneQuantumIsReal(t *testing.T) { ns := "default" cs := fake.NewSimpleClientset() m := &webhook.Mutator{Clientset: cs} @@ -202,10 +267,29 @@ func TestStandaloneQuantumIsRealNoSubmitter(t *testing.T) { } } +// Even with coordination=shared, a group of one (Job parallelism 1) has no +// consumers to coordinate, so it falls through to the standalone real-submit path. +func TestSharedGroupOfOneIsStandalone(t *testing.T) { + ns, group, job := "default", "one", "one-job" + par := int32(1) + cs := fake.NewSimpleClientset(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) + m := &webhook.Mutator{Clientset: cs} + + ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "one-0", job, "0")) + if hasGateOp(ops) { + t.Error("shared group-of-one must not be gated") + } + if _, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); ok { + t.Error("shared group-of-one must not create a producer group") + } +} + // ── faux-submit + dependency ──────────────────────────────────────────────────── -// envValueFrom returns the env var op with the given name, if present (covers -// both single-EnvVar and []EnvVar op shapes). +// envOp returns the env var op with the given name, if present (covers both +// single-EnvVar and []EnvVar op shapes). func envOp(ops []spec.Op, name string) (corev1.EnvVar, bool) { for _, op := range ops { switch v := op.Value.(type) { @@ -271,10 +355,10 @@ func unescapeJSONPointer(s string) string { return out } -// A quantum worker (no group-size of its own) is expressed as a general -// Dependency: gated, stamped with depends-on-{kind,producer,gate}, and the -// producer is the base group. -func TestQuantumWorkerIsGeneralDependency(t *testing.T) { +// A shared-mode consumer is expressed as a general Dependency: gated, stamped +// with depends-on-{kind,producer,gate}, and the producer is the -producer +// group. +func TestQuantumConsumerIsGeneralDependency(t *testing.T) { ns, group, job := "default", "depq", "depq-job" par := int32(3) cs := fake.NewSimpleClientset(&batchv1.Job{ @@ -282,17 +366,17 @@ func TestQuantumWorkerIsGeneralDependency(t *testing.T) { Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) m := &webhook.Mutator{Clientset: cs} - ops := m.Mutate(context.Background(), gangQPUPod(ns, group, "depq-0", job)) + ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "depq-1", job, "1")) if !hasGateOp(ops) { - t.Errorf("worker not gated by the dependency (ops: %+v)", ops) + t.Errorf("consumer not gated by the dependency (ops: %+v)", ops) } ann := annotationOps(ops) if ann[DependsOnKindAnnotation] != DependencyKindQuantumSubmit { t.Errorf("depends-on-kind=%q, want %q", ann[DependsOnKindAnnotation], DependencyKindQuantumSubmit) } - if ann[DependsOnProducerAnnotation] != group+SubmitterGroupSuffix { - t.Errorf("depends-on-producer=%q, want %q (the submitter group)", ann[DependsOnProducerAnnotation], group+SubmitterGroupSuffix) + if ann[DependsOnProducerAnnotation] != group+ProducerGroupSuffix { + t.Errorf("depends-on-producer=%q, want %q (the producer group)", ann[DependsOnProducerAnnotation], group+ProducerGroupSuffix) } if ann[DependsOnGateAnnotation] != QuantumGate { t.Errorf("depends-on-gate=%q, want %q", ann[DependsOnGateAnnotation], QuantumGate) @@ -316,11 +400,11 @@ func TestDependencyOfRoundTrip(t *testing.T) { } } -// The worker is staged with the SAME interceptor as the submitter (PYTHONPATH + +// The consumer is staged with the SAME interceptor as the producer (PYTHONPATH + // FLUENCE_POD_UID), put into faux mode (FLUENCE_FAUX_SUBMIT=true), and handed the // existing task id via the FLUENCE_QUANTUM_JOB_ID downward-API env. One // mechanism, two modes — no separate ConfigMap shim. The user sets nothing. -func TestQuantumWorkerStagedWithFauxSubmit(t *testing.T) { +func TestQuantumConsumerStagedWithFauxSubmit(t *testing.T) { ns, group, job := "default", "fauxq", "fauxq-job" par := int32(2) cs := fake.NewSimpleClientset(&batchv1.Job{ @@ -328,22 +412,22 @@ func TestQuantumWorkerStagedWithFauxSubmit(t *testing.T) { Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) m := &webhook.Mutator{Clientset: cs} - ops := m.Mutate(context.Background(), gangQPUPod(ns, group, "fauxq-0", job)) + ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "fauxq-1", job, "1")) - // Same interceptor staging as the submitter (PYTHONPATH set on the worker). + // Same interceptor staging as the producer (PYTHONPATH set on the consumer). if _, ok := envOp(ops, "PYTHONPATH"); !ok { - t.Errorf("worker not staged with the interceptor (no PYTHONPATH); ops: %+v", ops) + t.Errorf("consumer not staged with the interceptor (no PYTHONPATH); ops: %+v", ops) } // Faux mode selected. if e, ok := envOp(ops, FauxSubmitEnv); !ok || e.Value != "true" { - t.Errorf("worker missing %s=true (got %+v, ok=%v)", FauxSubmitEnv, e, ok) + t.Errorf("consumer missing %s=true (got %+v, ok=%v)", FauxSubmitEnv, e, ok) } // Existing task id sourced from the annotation the ungating sidecar stamps. e, ok := envOp(ops, QuantumJobIDEnv) if !ok { - t.Fatalf("worker missing %s env", QuantumJobIDEnv) + t.Fatalf("consumer missing %s env", QuantumJobIDEnv) } if e.ValueFrom == nil || e.ValueFrom.FieldRef == nil || e.ValueFrom.FieldRef.FieldPath != "metadata.annotations['"+QuantumJobIDAnnotation+"']" { @@ -379,9 +463,9 @@ func TestSidecarInheritsWorkloadSecretEnv(t *testing.T) { pod := &corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Name: "gang", + Name: "app", Env: []corev1.EnvVar{ - {Name: "GANG_ROLE", Value: "leader"}, // plain value: NOT copied + {Name: "PLAIN_VALUE", Value: "x"}, // plain value: NOT copied {Name: "AWS_ACCESS_KEY_ID", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{Name: "aws-braket-credentials"}, @@ -406,7 +490,7 @@ func TestSidecarInheritsWorkloadSecretEnv(t *testing.T) { if e.Name == "AWS_ACCESS_KEY_ID" && e.ValueFrom != nil && e.ValueFrom.SecretKeyRef != nil { gotSecret = true } - if e.Name == "GANG_ROLE" { + if e.Name == "PLAIN_VALUE" { gotPlain = true } } @@ -414,35 +498,15 @@ func TestSidecarInheritsWorkloadSecretEnv(t *testing.T) { t.Error("sidecar should inherit the workload's secret-sourced AWS creds") } if gotPlain { - t.Error("sidecar should NOT copy plain-value workload env like GANG_ROLE") + t.Error("sidecar should NOT copy plain-value workload env") } } -// A plain quantum workload pod (no role, owned by a Job of N>1) is gated as a -// faux gang member AND triggers creation of the one-off submitter. The user -// authors no submitter and no roles. -func TestGangMemberTriggersSubmitter(t *testing.T) { - ns, group, job := "default", "qauto", "qauto-job" - par := int32(2) - cs := fake.NewSimpleClientset(&batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, - Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) - m := &webhook.Mutator{Clientset: cs} - - workload := gangQPUPod(ns, group, "qauto-0", job) - ops := m.Mutate(context.Background(), workload) - - if !hasGateOp(ops) { - t.Error("gang member must be gated") - } - if _, ok := envOp(ops, FauxSubmitEnv); !ok { - t.Error("gang member must get FLUENCE_FAUX_SUBMIT") - } - sub, err := cs.CoreV1().Pods(ns).Get(context.Background(), group+SubmitterGroupSuffix, metav1.GetOptions{}) - if err != nil { - t.Fatalf("submitter pod not created: %v", err) - } - if !spec.PodRequestsResource(sub, QuantumResource) { - t.Error("submitter must request the quantum resource (it runs the real submit)") +// The producer member of a shared gang requests the quantum resource (it runs the +// real submit). Sanity check that the helper builds a quantum pod. +func TestSharedProducerRequestsQuantumResource(t *testing.T) { + p := sharedQPUPod("default", "g", "g-0", "g-job", "0") + if !spec.PodRequestsResource(p, QuantumResource) { + t.Error("producer must request the quantum resource (it runs the real submit)") } } diff --git a/test/e2e/quantum/02-sidecar-ungate.sh b/test/e2e/quantum/02-sidecar-ungate.sh index 88f047b..a4ae79f 100755 --- a/test/e2e/quantum/02-sidecar-ungate.sh +++ b/test/e2e/quantum/02-sidecar-ungate.sh @@ -1,14 +1,15 @@ #!/usr/bin/env bash -# Gang + submitter webhook test (no leader/worker). +# Shared-coordination webhook test (producer/consumer, no submitter pod). # -# When a quantum workload (a gang of N pods all requesting QPU, no roles) is +# When a shared quantum gang (coordination=shared, N pods all requesting QPU) is # submitted, the webhook must: # 1. create the fluence-sidecar RBAC in the namespace automatically -# 2. gate every gang pod with quantum.braket/ready -# 3. raise every gang pod to the fluence-quantum-classical priority class -# 4. ADDITIONALLY create the one-off submitter pod -submitter +# 2. gate every CONSUMER pod with quantum.braket/ready +# 3. raise every CONSUMER pod to the fluence-quantum-classical priority class +# 4. leave the PRODUCER (completion index 0) UNGATED, as a real member (NOT a +# separate spawned pod) # 5. inject the fluence-stage init container + the sidecar container into the -# submitter (Model C staging + the real coordinator) +# producer (Model C staging + the real coordinator) # # Does NOT test the sidecar runtime (task discovery, interceptor, queue polling) # — that needs real AWS creds (sidecars/providers/braket/test/integration.sh). @@ -16,9 +17,10 @@ set -euo pipefail HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE%/test/e2e/*}/test/e2e/lib.sh" GROUP=qgang -SUBMITTER=${GROUP}-submitter +PRODUCER=${GROUP}-0 # completion index 0 +CONSUMER=${GROUP}-1 # completion index 1 -log "TEST 4: gang+submitter webhook — RBAC, gating, priority, submitter creation" +log "TEST 4: shared-gang webhook — RBAC, consumer gating, priority, producer wiring" kubectl apply -f examples/test/e2e/quantum/quantum-gang-pods.yaml sleep 3 @@ -37,46 +39,38 @@ kubectl get role fluence-sidecar -n default || fail "no fluence-sidec kubectl get rolebinding fluence-sidecar -n default || fail "no fluence-sidecar RoleBinding" log " RBAC present" -# 2 + 3. Every gang pod is gated and at the preempting priority class. -for p in ${GROUP}-0 ${GROUP}-1; do - gate="$(kubectl get pod "$p" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" - [ "$gate" = "quantum.braket/ready" ] || fail "$p not gated (gate=$gate)" - pc="$(kubectl get pod "$p" -o jsonpath='{.spec.priorityClassName}' 2>/dev/null || true)" - [ "$pc" = "fluence-quantum-classical" ] || fail "$p priorityClass=$pc, want fluence-quantum-classical" -done -log " gang pods gated + fluence-quantum-classical priority" +# 2 + 3. The CONSUMER is gated and at the preempting priority class. +gate="$(kubectl get pod "$CONSUMER" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" +[ "$gate" = "quantum.braket/ready" ] || fail "$CONSUMER not gated (gate=$gate)" +pc="$(kubectl get pod "$CONSUMER" -o jsonpath='{.spec.priorityClassName}' 2>/dev/null || true)" +[ "$pc" = "fluence-quantum-classical" ] || fail "$CONSUMER priorityClass=$pc, want fluence-quantum-classical" +log " consumer gated + fluence-quantum-classical priority" -# 4. Fluence created the submitter pod. -log "checking webhook created the submitter pod $SUBMITTER..." -for i in $(seq 1 30); do - kubectl get pod "$SUBMITTER" -n default >/dev/null 2>&1 && break - sleep 2 -done -kubectl get pod "$SUBMITTER" -n default || fail "webhook did not create submitter pod $SUBMITTER" -sub_marker="$(kubectl get pod "$SUBMITTER" -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/submitter}' 2>/dev/null || true)" -[ "$sub_marker" = "true" ] || fail "submitter missing the submitter marker" -log " submitter pod created" +# 4. The PRODUCER is NOT a separate spawned pod and is NOT gated. No -submitter. +if kubectl get pod "${GROUP}-submitter" -n default >/dev/null 2>&1; then + fail "found ${GROUP}-submitter pod — the obsolete separate-submitter model must not exist" +fi +pgate="$(kubectl get pod "$PRODUCER" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" +[ -z "$pgate" ] || fail "producer must NOT be gated (gate=$pgate)" +log " producer is a real member, not gated; no separate submitter pod" -# 5. Submitter has the staging init container + the sidecar container, and is NOT gated. -wait_pod_phase "$SUBMITTER" Running 120 \ - || { kubectl describe pod "$SUBMITTER"; fail "$SUBMITTER did not reach Running"; } -initc="$(kubectl get pod "$SUBMITTER" -o jsonpath='{.spec.initContainers[*].name}')" +# 5. Producer has the staging init container + the sidecar container. +wait_pod_phase "$PRODUCER" Running 120 \ + || { kubectl describe pod "$PRODUCER"; fail "$PRODUCER did not reach Running"; } +initc="$(kubectl get pod "$PRODUCER" -o jsonpath='{.spec.initContainers[*].name}')" echo "$initc" | grep -q fluence-stage || fail "fluence-stage init container not injected (init: $initc)" -conts="$(kubectl get pod "$SUBMITTER" -o jsonpath='{.spec.containers[*].name}')" +conts="$(kubectl get pod "$PRODUCER" -o jsonpath='{.spec.containers[*].name}')" echo "$conts" | grep -q fluence-sidecar || fail "fluence-sidecar container not injected (containers: $conts)" -sgate="$(kubectl get pod "$SUBMITTER" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" -[ -z "$sgate" ] || fail "submitter must NOT be gated (gate=$sgate)" -log " submitter has fluence-stage + fluence-sidecar, not gated" +log " producer has fluence-stage + fluence-sidecar" -log "PASS: webhook gated the gang, set priority, created RBAC + the submitter" +log "PASS: webhook gated the consumers, set priority, created RBAC + wired the producer" log "NOTE: priority is set at admission (immutable post-creation)" log "NOTE: braket sidecar runtime (SDK intercept, tag discovery, queue polling)" log " is in sidecars/providers/braket/test/integration.sh" # Clean up pods + PodGroups; RBAC is namespace infra and persists. kubectl delete -f examples/test/e2e/quantum/quantum-gang-pods.yaml --wait=false || true -kubectl delete pod "$SUBMITTER" --wait=false 2>/dev/null || true -for g in "$GROUP" "$SUBMITTER"; do +for g in "$GROUP" "${GROUP}-producer"; do kubectl patch podgroup "$g" --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true done kubectl wait --for=delete pod -l app="$GROUP" --timeout=60s 2>/dev/null || true diff --git a/test/e2e/quantum/03-gang-producer.sh b/test/e2e/quantum/03-gang-producer.sh new file mode 100644 index 0000000..fce4248 --- /dev/null +++ b/test/e2e/quantum/03-gang-producer.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# Producer/consumer structure (replaces the old leader/worker and submitter-pod +# models). +# +# The structural guarantee the ungate path depends on: a shared quantum gang of +# size N is split, by completion index, into the CONSUMER gang +# (minCount N-1, gated) and the PRODUCER's group-of-one -producer +# (minCount 1, not gated). The producer is a real member of the user's workload — +# there is NO separate -submitter pod, NO -workers subgroup, and no +# leader among the user's pods. (The runtime ungate is covered by the braket +# integration test; here we prove the shape.) +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE%/test/e2e/*}/test/e2e/lib.sh" + +GROUP=qgang +PRODUCER_GROUP=${GROUP}-producer +PRODUCER=${GROUP}-0 # completion index 0 +CONSUMER=${GROUP}-1 # completion index 1 + +log "TEST 7: consumer gang(N-1, gated) + producer(1, member) structure" +kubectl apply -f examples/test/e2e/quantum/quantum-gang-pods.yaml + +# Consumer PodGroup exists with minCount N-1 = 1 (the split). +log "checking consumer group '$GROUP' minCount == 1 (N-1)" +for i in $(seq 1 30); do + gc="$(kubectl get podgroup "$GROUP" -o jsonpath='{.spec.schedulingPolicy.gang.minCount}' 2>/dev/null || true)" + [ -n "$gc" ] && break; sleep 2 +done +[ "$gc" = "1" ] || fail "consumer group $GROUP minCount=$gc, want 1 (N-1)" + +# There must be NO -workers subgroup and NO -submitter pod. +if kubectl get podgroup "${GROUP}-workers" >/dev/null 2>&1; then + fail "found ${GROUP}-workers PodGroup — the obsolete leader/worker split must not exist" +fi +if kubectl get pod "${GROUP}-submitter" >/dev/null 2>&1; then + fail "found ${GROUP}-submitter pod — the obsolete separate-submitter model must not exist" +fi +log " consumer group minCount=1, no -workers subgroup, no -submitter pod" + +# Producer PodGroup -producer exists with minCount 1 (schedules alone). +log "checking producer group '$PRODUCER_GROUP' minCount == 1" +for i in $(seq 1 30); do + sc="$(kubectl get podgroup "$PRODUCER_GROUP" -o jsonpath='{.spec.schedulingPolicy.gang.minCount}' 2>/dev/null || true)" + [ -n "$sc" ] && break; sleep 2 +done +[ "$sc" = "1" ] || fail "producer group $PRODUCER_GROUP minCount=$sc, want 1" + +# Producer pod (index 0) is relinked into its own group-of-one and is NOT gated. +pl="$(kubectl get pod "$PRODUCER" -o jsonpath='{.metadata.labels.fluence\.flux-framework\.org/group}' 2>/dev/null || true)" +[ "$pl" = "$PRODUCER_GROUP" ] || fail "producer group label=$pl, want $PRODUCER_GROUP" +pgate="$(kubectl get pod "$PRODUCER" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" +[ -z "$pgate" ] || fail "producer must NOT be gated (gate=$pgate)" +log " producer in '$PRODUCER_GROUP' (minCount 1), not gated" + +# Consumer pod (index 1+) stays in and is gated. +g="$(kubectl get pod "$CONSUMER" -o jsonpath='{.metadata.labels.fluence\.flux-framework\.org/group}' 2>/dev/null || true)" +[ "$g" = "$GROUP" ] || fail "$CONSUMER group label=$g, want $GROUP" +gate="$(kubectl get pod "$CONSUMER" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" +[ "$gate" = "quantum.braket/ready" ] || fail "$CONSUMER not gated (gate=$gate)" +# The consumer's dependency points at the producer group. +dp="$(kubectl get pod "$CONSUMER" -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/depends-on-producer}' 2>/dev/null || true)" +[ "$dp" = "$PRODUCER_GROUP" ] || fail "consumer depends-on-producer=$dp, want $PRODUCER_GROUP" +log " consumer in '$GROUP', gated, depends on '$PRODUCER_GROUP'" + +log "PASS 7: consumer gang(N-1, gated) + producer(1, member, ungates gang), no submitter/leader/worker" +kubectl delete -f examples/test/e2e/quantum/quantum-gang-pods.yaml --wait=false || true +for g in "$GROUP" "$PRODUCER_GROUP"; do + kubectl patch podgroup "$g" --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +done +kubectl wait --for=delete pod -l app="$GROUP" --timeout=60s 2>/dev/null || true diff --git a/test/e2e/quantum/03-gang-submitter.sh b/test/e2e/quantum/03-gang-submitter.sh deleted file mode 100644 index 46905ca..0000000 --- a/test/e2e/quantum/03-gang-submitter.sh +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env bash -# Gang + submitter structure (replaces the old leader/worker split). -# -# The structural guarantee the ungate path depends on: a quantum gang of size N -# is ONE fully-gated PodGroup (minCount N), and Fluence creates a -# SEPARATE submitter pod in its OWN group-of-one -submitter (minCount 1, -# not gated) that does the real submit and ungates the gang. There is no -# -workers subgroup and no leader among the user's pods. (The runtime -# ungate is covered by the braket integration test; here we prove the shape.) -set -euo pipefail -HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE%/test/e2e/*}/test/e2e/lib.sh" - -GROUP=qgang -SUBMITTER=${GROUP}-submitter - -log "TEST 7: gang(N, gated) + separate submitter(1) structure" -kubectl apply -f examples/test/e2e/quantum/quantum-gang-pods.yaml - -# Gang PodGroup exists with minCount N=2 (full gang, no split). -log "checking gang group '$GROUP' minCount == 2 (full N)" -for i in $(seq 1 30); do - gc="$(kubectl get podgroup "$GROUP" -o jsonpath='{.spec.schedulingPolicy.gang.minCount}' 2>/dev/null || true)" - [ -n "$gc" ] && break; sleep 2 -done -[ "$gc" = "2" ] || fail "gang group $GROUP minCount=$gc, want 2 (full N)" - -# There must be NO -workers subgroup (the old split is gone). -if kubectl get podgroup "${GROUP}-workers" >/dev/null 2>&1; then - fail "found ${GROUP}-workers PodGroup — the obsolete leader/worker split must not exist" -fi -log " gang group minCount=2, no -workers subgroup" - -# Submitter PodGroup -submitter exists with minCount 1 (schedules alone). -log "checking submitter group '$SUBMITTER' minCount == 1" -for i in $(seq 1 30); do - sc="$(kubectl get podgroup "$SUBMITTER" -o jsonpath='{.spec.schedulingPolicy.gang.minCount}' 2>/dev/null || true)" - [ -n "$sc" ] && break; sleep 2 -done -[ "$sc" = "1" ] || fail "submitter group $SUBMITTER minCount=$sc, want 1" - -# Submitter pod records the gang group it ungates, and is its own group. -gg="$(kubectl get pod "$SUBMITTER" -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/gang-group}' 2>/dev/null || true)" -[ "$gg" = "$GROUP" ] || fail "submitter gang-group annotation=$gg, want $GROUP" -sl="$(kubectl get pod "$SUBMITTER" -o jsonpath='{.metadata.labels.fluence\.flux-framework\.org/group}' 2>/dev/null || true)" -[ "$sl" = "$SUBMITTER" ] || fail "submitter group label=$sl, want $SUBMITTER" -log " submitter group minCount=1, ungates gang '$GROUP'" - -# Gang pods stay in (NOT relinked) and are gated. -for p in ${GROUP}-0 ${GROUP}-1; do - g="$(kubectl get pod "$p" -o jsonpath='{.metadata.labels.fluence\.flux-framework\.org/group}' 2>/dev/null || true)" - [ "$g" = "$GROUP" ] || fail "$p group label=$g, want $GROUP (gang pods must not be relinked)" - gate="$(kubectl get pod "$p" -o jsonpath='{.spec.schedulingGates[0].name}' 2>/dev/null || true)" - [ "$gate" = "quantum.braket/ready" ] || fail "$p not gated (gate=$gate)" -done -log " gang pods remain in '$GROUP' and are gated" - -log "PASS 7: gang(N=2, gated) + submitter(1, ungates gang), no leader/worker split" -kubectl delete -f examples/test/e2e/quantum/quantum-gang-pods.yaml --wait=false || true -kubectl delete pod "$SUBMITTER" --wait=false 2>/dev/null || true -for g in "$GROUP" "$SUBMITTER"; do - kubectl patch podgroup "$g" --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true -done -kubectl wait --for=delete pod -l app="$GROUP" --timeout=60s 2>/dev/null || true diff --git a/test/e2e/quantum/04-gang-env-contract.sh b/test/e2e/quantum/04-gang-env-contract.sh index 19f2439..f887448 100755 --- a/test/e2e/quantum/04-gang-env-contract.sh +++ b/test/e2e/quantum/04-gang-env-contract.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Env-contract e2e (gang + submitter): verify the webhook injects, at admission, +# Env-contract e2e (producer/consumer): verify the webhook injects, at admission, # the env the runtime depends on — IN-CLUSTER, on the real pod specs, with no # Braket/AWS and WITHOUT requiring scheduling. Guards the seam that, if broken, # makes a gang schedule then hang or double-submit. @@ -7,15 +7,16 @@ # Spec layer only (these are downward-API valueFrom refs whose VALUES resolve at # placement, but whose PRESENCE is deterministic at admission), so no scheduling, # no qpu capacity, no logs — it cannot flake on capacity. Contract: -# gang pod (faux): FLUENCE_FAUX_SUBMIT, FLUENCE_QUANTUM_JOB_ID, PYTHONPATH, FLUXION_BACKEND -# submitter: FLUENCE_GANG_GROUP on the sidecar (real submit, ungates the gang) +# consumer (faux): FLUENCE_FAUX_SUBMIT, FLUENCE_QUANTUM_JOB_ID, PYTHONPATH, FLUXION_BACKEND +# producer: FLUENCE_GANG_GROUP on the sidecar (real submit, ungates the consumers) set -euo pipefail HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE%/test/e2e/*}/test/e2e/lib.sh" GROUP=qgang -SUBMITTER=${GROUP}-submitter +PRODUCER=${GROUP}-0 # completion index 0 +CONSUMER=${GROUP}-1 # completion index 1 -log "TEST 8: gang+submitter env contract — spec layer" +log "TEST 8: producer/consumer env contract — spec layer" kubectl apply -f examples/test/e2e/quantum/quantum-gang-pods.yaml # does container $2 of pod $1 have an env entry named $3 ? (spec-level only) @@ -24,35 +25,34 @@ has_env() { 2>/dev/null | tr ' ' '\n' | grep -qx "$3" } -log "checking the webhook wired the faux contract onto a gang pod" -for i in $(seq 1 15); do has_env ${GROUP}-0 app FLUENCE_FAUX_SUBMIT && break; sleep 2; done +log "checking the webhook wired the faux contract onto the consumer" +for i in $(seq 1 15); do has_env "$CONSUMER" app FLUENCE_FAUX_SUBMIT && break; sleep 2; done for v in FLUENCE_FAUX_SUBMIT FLUENCE_QUANTUM_JOB_ID PYTHONPATH FLUXION_BACKEND; do - has_env ${GROUP}-0 app "$v" \ - || { kubectl get pod ${GROUP}-0 -o yaml | sed -n '/containers:/,/status:/p'; \ - fail "gang pod 'app' container missing env '$v'"; } - log " gang pod has env: $v" + has_env "$CONSUMER" app "$v" \ + || { kubectl get pod "$CONSUMER" -o yaml | sed -n '/containers:/,/status:/p'; \ + fail "consumer 'app' container missing env '$v'"; } + log " consumer has env: $v" done -# The submitter's sidecar must know which gang to ungate. -log "checking the submitter sidecar has FLUENCE_GANG_GROUP=$GROUP" -for i in $(seq 1 30); do kubectl get pod "$SUBMITTER" >/dev/null 2>&1 && break; sleep 2; done -gg="$(kubectl get pod "$SUBMITTER" \ +# The producer's sidecar must know which consumer group to ungate. +log "checking the producer sidecar has FLUENCE_GANG_GROUP=$GROUP" +for i in $(seq 1 30); do kubectl get pod "$PRODUCER" >/dev/null 2>&1 && break; sleep 2; done +gg="$(kubectl get pod "$PRODUCER" \ -o jsonpath="{.spec.containers[?(@.name=='fluence-sidecar')].env[?(@.name=='FLUENCE_GANG_GROUP')].value}" \ 2>/dev/null || true)" -[ "$gg" = "$GROUP" ] || fail "submitter sidecar FLUENCE_GANG_GROUP=$gg, want $GROUP" -log " submitter sidecar has FLUENCE_GANG_GROUP=$gg" +[ "$gg" = "$GROUP" ] || fail "producer sidecar FLUENCE_GANG_GROUP=$gg, want $GROUP" +log " producer sidecar has FLUENCE_GANG_GROUP=$gg" -# And the submitter must NOT be in faux mode (it does the real submit). -if has_env "$SUBMITTER" app FLUENCE_FAUX_SUBMIT; then - fail "submitter must NOT carry FLUENCE_FAUX_SUBMIT (it submits for real)" +# And the producer must NOT be in faux mode (it does the real submit). +if has_env "$PRODUCER" app FLUENCE_FAUX_SUBMIT; then + fail "producer must NOT carry FLUENCE_FAUX_SUBMIT (it submits for real)" fi -log " submitter is not faux" +log " producer is not faux" -log "PASS 8: webhook injects the gang(faux) + submitter(real) env contract at admission" +log "PASS 8: webhook injects the consumer(faux) + producer(real) env contract at admission" kubectl delete -f examples/test/e2e/quantum/quantum-gang-pods.yaml --wait=false || true -kubectl delete pod "$SUBMITTER" --wait=false 2>/dev/null || true -for g in "$GROUP" "$SUBMITTER"; do +for g in "$GROUP" "${GROUP}-producer"; do kubectl patch podgroup "$g" --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true done kubectl wait --for=delete pod -l app="$GROUP" --timeout=60s 2>/dev/null || true diff --git a/test/e2e/quantum/setup.sh b/test/e2e/quantum/setup.sh index cf35020..57f375a 100644 --- a/test/e2e/quantum/setup.sh +++ b/test/e2e/quantum/setup.sh @@ -9,9 +9,9 @@ # # Also points the webhook-injected sidecar/stage image at the CI-loaded image: # the default sidecar image (ghcr.io/.../fluence-sidecar:latest) is not loaded in -# kind, so the submitter's containers could not pull. The fluence-stage init is +# kind, so the producer's containers could not pull. The fluence-stage init is # fail-soft (no python in this image -> it logs and exits 0), which is fine for -# the structural assertions; the submitter still schedules and runs. +# the structural assertions; the producer still schedules and runs. set -euo pipefail HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE%/test/e2e/*}/test/e2e/lib.sh" IMAGE="${IMAGE:-vanessa/fluence:test}" From 3f997d4713c267029e5942d770626c553e821023 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sat, 27 Jun 2026 14:00:52 -0700 Subject: [PATCH 2/2] test: remove submit role, fall back to producer/consumer Signed-off-by: vsoch --- Makefile | 4 +- deploy/fluence-pull-test.yaml | 31 ++-- docs/coordination-handler-design.md | 57 ++++--- docs/quantum-scheduling.md | 25 +-- .../test/e2e/quantum/quantum-gang-pods.yaml | 2 +- pkg/webhook/handlers/handlers_test.go | 14 ++ pkg/webhook/handlers/quantum.go | 159 +++++++++++------- pkg/webhook/handlers/quantum_test.go | 75 +++++---- python/Dockerfile | 19 +-- python/fluence/providers/braket.py | 23 +-- test/e2e/quantum/04-gang-env-contract.sh | 36 ++-- 11 files changed, 255 insertions(+), 190 deletions(-) diff --git a/Makefile b/Makefile index 5912c5a..5e2c050 100644 --- a/Makefile +++ b/Makefile @@ -27,8 +27,8 @@ build: ## Build all binaries (scheduler needs flux-sched; helpers are pure Go) .PHONY: python python: - docker build -f python/Dockerfile -t ghcr.io/converged-computing/fluence-sidecar:latest ./python - docker push ghcr.io/converged-computing/fluence-sidecar:latest + docker build -f python/Dockerfile -t vanessa/fluence-sidecar:latest ./python + docker push vanessa/fluence-sidecar:latest # kind load docker-image ghcr.io/converged-computing/fluence-sidecar:latest .PHONY: test diff --git a/deploy/fluence-pull-test.yaml b/deploy/fluence-pull-test.yaml index 94c2425..8e42158 100644 --- a/deploy/fluence-pull-test.yaml +++ b/deploy/fluence-pull-test.yaml @@ -138,8 +138,7 @@ spec: containers: - name: fluence image: vanessa/fluence:test - # Allows for kind load - imagePullPolicy: Always + imagePullPolicy: Always command: - /bin/fluence - --config=/etc/fluence/scheduler-config.yaml @@ -148,13 +147,6 @@ spec: # Without these its PodGroup/GangScheduling plugin is inactive, pods # schedule with no gang semantics, and PodGroup status stays Pending. - --feature-gates=GenericWorkload=true,GangScheduling=true - # Re-attempt unschedulable pods more often than the 5m default. In the - # contention experiment a gang that loses the initial race for nodes is - # marked Unschedulable; this is how soon it is re-tried after capacity - # frees (the event-driven QueueingHint is best-effort; this is the - # backstop that bounds worst-case requeue latency). 30s keeps contended - # gangs draining promptly without thrashing the queue. - - --pod-max-in-unschedulable-pods-duration=30s - --v=4 env: # Path to the resources config (e.g. quantum backends). Unset/empty @@ -194,19 +186,29 @@ spec: containers: - name: webhook image: vanessa/fluence:test - # Allows for kind load imagePullPolicy: Always command: ["/bin/fluence-webhook"] + # The webhook derives the FLUXION_* env contract (FLUXION_VENDOR, + # FLUXION_QRMI_TYPE, ...) from the resource graph's attribute keys, so + # it needs the same graph the scheduler and device plugin read. Without + # this it injects only FLUXION_BACKEND, and the sidecar can't route to + # a provider (which keys on qrmi_type). env: - # Use busybox as sidecar image in tests — avoids pulling the real - # sidecar image which is large and not cached in CI. - - name: FLUENCE_SIDECAR_IMAGE - value: "busybox:latest" + - name: FLUENCE_RESOURCES + value: /etc/fluence/resources.yaml ports: - containerPort: 8443 readinessProbe: httpGet: {path: /healthz, port: 8443, scheme: HTTPS} initialDelaySeconds: 2 + volumeMounts: + - name: config + mountPath: /etc/fluence + volumes: + - name: config + projected: + sources: + - configMap: {name: fluence-resources, optional: true} --- apiVersion: v1 kind: Service @@ -247,7 +249,6 @@ webhooks: - key: kubernetes.io/metadata.name operator: NotIn values: ["kube-system"] ---- # fluence-sidecar.yaml # # RBAC and supporting resources for the Fluence quantum sidecar. diff --git a/docs/coordination-handler-design.md b/docs/coordination-handler-design.md index c155fef..cdcfd38 100644 --- a/docs/coordination-handler-design.md +++ b/docs/coordination-handler-design.md @@ -6,10 +6,11 @@ > pods to the quantum handler), and `pkg/fluence/fluence.go` (reconcile reaps the > `-producer` PodGroup, never the producer pod — it is a real member). > Unit tests are in `pkg/webhook/handlers/quantum_test.go`; structural e2e in -> `test/e2e/quantum/02–04`. Both shared-mode user contracts are served by the same -> wiring: the explicit-role script (consumers never call submit; the faux -> interceptor is unused) and the identical-script (every member calls submit; the -> faux interceptor dedups consumers to the producer's cached task). +> `test/e2e/quantum/02–04`. Coordination is **role-aware**: the webhook stamps +> `FLUENCE_COORDINATION_ROLE` (producer/consumer) and hands consumers the +> producer's task id (`FLUENCE_QUANTUM_JOB_ID`); the workload branches on the role +> (producer submits, consumer fetches the shared result by id). No submit +> interception, no faux flag — that earlier mechanism has been removed. ## Why this replaces the submitter-pod model @@ -77,7 +78,7 @@ ask for, and never dedup tasks that were meant to be distinct. result), so this is not a Fluence deficiency; it is the physics of "N independent tasks," and it is the user's explicit choice. The only way to reclaim even the producer's node in either mode is a resumable `.result()` - (replay), which reuses the faux mechanism and is deliberately **out of scope + (replay), and is deliberately **out of scope for v1** (one idle node is cheap; replay imposes a replay-safe-code contract). ## Producer election @@ -103,9 +104,9 @@ recommended workload and best-effort otherwise: | PodGroup | `-producer`, `minCount=1` | ``, `minCount=N−1` | | schedules | immediately, alone | atomically as a gang, **after ungate** | | gate | none | `quantum.braket/ready` + preempting priority | -| interceptor | staged, **real (tag) mode** | staged, **faux mode** | +| interceptor | staged (tags the real submit) | **not staged** (a consumer never submits) | | sidecar | yes — polls the task, ungates `` at position==1 | no | -| app run | full; its `.run()` is the one real submit | full; its `.run()` is a faux no-op returning the producer's task | +| app run | full; submits the one real task | full; reads role=consumer and fetches the shared result by id (no submit) | `minCount=1` on the producer group is what removes the deadlock that forced a separate submitter: a single-member group schedules alone, so the producer runs @@ -114,19 +115,19 @@ groups have independent minCounts; neither blocks the other. The consumer group keeps a real gang `minCount` (N−1), so **gang scheduling is preserved and demonstrable** (experiment requirement 1). -The faux path is retained verbatim, but its meaning is now honest: it is the -shared-result dedup. A consumer runs the same image and calls `.run()`; the faux -interceptor returns the producer's existing task (handed over as -`FLUENCE_QUANTUM_JOB_ID`, stamped by the sidecar at ungate) instead of -submitting. One real task, N consumers, each app run once, in full. +Coordination is role-aware rather than interception-based: the consumer is told +`FLUENCE_COORDINATION_ROLE=consumer` and handed the producer's task id +(`FLUENCE_QUANTUM_JOB_ID`, stamped by the sidecar at ungate), and the workload +fetches the shared result by that id instead of submitting. One real task, N +consumers, each app run once, in full — and no SDK submit-interception. ## Gate / ungate flow (shared mode) ``` 1. Producer (index 0) admitted -> own group-of-one, ungated, sidecar attached (FLUENCE_GANG_GROUP=), interceptor in REAL mode. - Consumers (1..N-1) admitted -> group (minCount N-1), GATED, faux mode, - depends-on producer=-producer. + Consumers (1..N-1) admitted -> group (minCount N-1), GATED, + role=consumer, depends-on producer=-producer. 2. Scheduler places the producer immediately (minCount=1). It runs the user app, .run() submits the ONE real task (tagged fluence-pod-uid). @@ -138,8 +139,9 @@ submitting. One real task, N consumers, each app run once, in full. remove the quantum.braket/ready gate (priority already set at admission) 5. Consumer group (now ungated, minCount N-1) gang-schedules atomically and - starts as the quantum result arrives. Each consumer's .run() is faux: returns - the producer's task; .result() returns the shared result; app post-processes. + starts as the quantum result arrives. Each consumer reads role=consumer and + fetches the producer's task by FLUENCE_QUANTUM_JOB_ID (.result() returns the + shared result); app post-processes. No consumer submits. ``` `independent` mode skips all of this: each pod is its own group-of-one, ungated, @@ -210,7 +212,7 @@ constants and helpers: `SubmitterAnnotation`, `GangGroupAnnotation`, `SubmitterGroupSuffix`, `SubmitterPodSuffix`, and the functions `mutateSubmitter` and `ensureSubmitterPod`. Everything else in the file (`resolveGroup`, `resolveGangSize`, `ownerReplicaSetN`, `countGroupPods`, -`linkGroupOps`, the faux-submit section, the sidecar section) is reused unchanged. +`linkGroupOps`, the role/job-id env section, the sidecar section) is reused unchanged. **Replace** `Mutate` with the coordination router plus two small role functions: @@ -252,7 +254,8 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP pg := group + ProducerGroupSuffix m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1) ops := linkGroupOps(pod, pg) - ops = append(ops, interceptorOps(pod)...) // real (tag) mode — no fauxSubmitEnvOps + ops = append(ops, interceptorOps(pod)...) // tags the real submit + ops = append(ops, roleEnvOps(pod, RoleProducer)...) // FLUENCE_COORDINATION_ROLE=producer sc := sidecarFor(m) sc.EnsureRBAC(ctx, pod.Namespace) // Tell the sidecar which consumer group (the base group) to list + ungate. @@ -263,17 +266,17 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP } // mutateConsumer: a non-producer member. Joins the consumer gang -// (minCount N-1), is gated until the producer's task is ready, and runs the same -// image with the interceptor in FAUX mode so its .run() returns the producer's -// task instead of resubmitting (the shared-result dedup). +// (minCount N-1), is gated until the producer's task is ready, and is told its +// role (FLUENCE_COORDINATION_ROLE=consumer) + the producer's task id +// (FLUENCE_QUANTUM_JOB_ID). A consumer fetches the shared result by id; it never +// submits, so it gets neither the interceptor nor a faux flag. func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op { m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1) ops := linkGroupOps(pod, group) dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate} ops = append(ops, dep.applyOps(pod)...) // gate + preempting priority + depends-on - ops = append(ops, interceptorOps(pod)...) - ops = append(ops, fauxSubmitEnvOps(pod)...) - log.Printf("[fluence-webhook] quantum consumer %s/%s — group %s minCount=%d, gated+faux", + ops = append(ops, consumerEnvOps(pod)...) // role=consumer + FLUENCE_QUANTUM_JOB_ID + log.Printf("[fluence-webhook] quantum consumer %s/%s — group %s minCount=%d, gated (role=consumer)", pod.Namespace, pod.Name, group, n-1) return ops } @@ -322,7 +325,7 @@ submitter pod. ## Experiments -Two requirements, both demonstrable on a kind cluster with the mock/faux path and +Two requirements, both demonstrable on a kind cluster with the mock path and on a real cluster with Braket. ### Requirement 1 — Fluence still gang-schedules @@ -373,8 +376,8 @@ traditional and independent rise ~linearly in N; shared stays ~flat at one node. ### Build/run notes - The producer/consumer split needs no new image: producers and consumers run the - same sampler image; faux vs real is selected by `FLUENCE_FAUX_SUBMIT` (set on - consumers by `fauxSubmitEnvOps`), exactly as today. + same role-aware sampler; the branch is `FLUENCE_COORDINATION_ROLE` + (producer submits; consumer fetches the shared result by `FLUENCE_QUANTUM_JOB_ID`). - Use an **indexed** Job (`completionMode: Indexed`, `parallelism == completions == N`) so producer election is deterministic (index 0) and `resolveGangSize` reads N from the owner. Stamp `fluence.flux-framework.org/coordination` in the pod diff --git a/docs/quantum-scheduling.md b/docs/quantum-scheduling.md index e1a98c4..de32220 100644 --- a/docs/quantum-scheduling.md +++ b/docs/quantum-scheduling.md @@ -67,8 +67,8 @@ queue wait — which is worse than the original problem. The design combines four mechanisms: -1. **SDK interceptor** — tags every QPU task with the pod UID (real mode), or - returns the producer's task instead of submitting (faux mode) +1. **SDK interceptor** — tags every submitted QPU task with the pod UID so the + sidecar can find it (staged only on pods that submit) 2. **Fluence webhook** — splits a shared quantum gang into one producer and N-1 gated consumers; injects the sidecar into the producer 3. **Sidecar controller** — discovers the QPU task, polls queue position, @@ -95,7 +95,7 @@ mode every member does its own quantum work. |--------------|------------------------|--------------------------------------------------------------------------------| | **not gang** | group of 1 (nothing) | inject provider interceptor + env; **sidecar only in observe-only mode if telemetry requested** (nothing to ungate) | | **gang** (independent) | gang-schedule only | every member is a standalone producer: interceptor + env, real submit, no gate | -| **gang** (shared) | — | producer (index 0): interceptor + env + sidecar, real submit, not gated, group-of-one `-producer`; consumers: gate + faux interceptor, gang `` (minCount N-1) | +| **gang** (shared) | — | producer (index 0): interceptor + env + sidecar, real submit, not gated, group-of-one `-producer`, role=producer; consumers: gate + role=consumer + producer's task id, gang `` (minCount N-1) | The crucial rule: **sidecar/interceptor injection is triggered by the quantum resource request, not the group label.** The group label only controls gang @@ -186,9 +186,11 @@ submit. The handler therefore routes each quantum pod to one of three roles: - **consumer** (in `shared` mode, the other N-1 members): joins the `` gang (minCount N-1), gets the `quantum.braket/ready` scheduling gate (entering `SchedulingGated` — invisible to Fluxion, consuming no resources — until the - producer's sidecar ungates it), and is staged with the interceptor in **faux** - mode so its submit returns the producer's task (handed over as - `FLUENCE_QUANTUM_JOB_ID` at ungate) instead of resubmitting. + producer's sidecar ungates it), and is told its role + (`FLUENCE_COORDINATION_ROLE=consumer`) and the producer's task id + (`FLUENCE_QUANTUM_JOB_ID`, stamped at ungate). A consumer does **not** submit — + it fetches the shared result by that id — so it gets neither the interceptor nor + any faux flag. Role is decided by the **completion index**, not resource request or admission order. In an indexed Job every pod is identical — same group label, same image, @@ -201,11 +203,12 @@ groups carry independent minCounts (producer=1, consumers=N-1), which is what le the producer schedule and submit while the consumers stay gated — no deadlock, and no separate submitter pod. -Shared mode serves two user contracts with the *same* wiring: an explicit-role -script where consumers never call submit (the faux interceptor is simply unused), -and an identical script where every member calls submit (the faux interceptor -dedups the consumers to the producer's cached task). Fluence does not need to know -which contract the user wrote. +The workload is **role-aware**: every shared-mode pod is told its role positively +via `FLUENCE_COORDINATION_ROLE` (the webhook's election is the single source of +truth), and the application branches on it — the producer submits, a consumer +fetches the shared result by `FLUENCE_QUANTUM_JOB_ID`. The same image plays both +roles with one cheap branch; there is no submit-interception magic and no faux +flag. ### 3.3 Interceptor and Model C delivery diff --git a/examples/test/e2e/quantum/quantum-gang-pods.yaml b/examples/test/e2e/quantum/quantum-gang-pods.yaml index af11e6f..aacce44 100644 --- a/examples/test/e2e/quantum/quantum-gang-pods.yaml +++ b/examples/test/e2e/quantum/quantum-gang-pods.yaml @@ -8,7 +8,7 @@ # real member, so the app runs N times, never N+1. # qgang-1 (index 1+) -> CONSUMER: the "qgang" gang (minCount N-1=1), gated on # quantum.braket/ready + preempting priority, interceptor -# in faux mode (its submit returns the producer's task). +# told role=consumer; it fetches the producer's result by id. # # These are raw pods (not a Job) so the e2e can reference stable names; the # completion-index annotation is set manually to make producer election diff --git a/pkg/webhook/handlers/handlers_test.go b/pkg/webhook/handlers/handlers_test.go index 4931a8a..dee0746 100644 --- a/pkg/webhook/handlers/handlers_test.go +++ b/pkg/webhook/handlers/handlers_test.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "strings" "testing" "github.com/converged-computing/fluence/pkg/placement" @@ -80,6 +81,19 @@ func hasGateOp(ops []spec.Op) bool { return false } +// hasDropQuantumResourceOp reports whether ops remove the Fluxion quantum +// resource from a container's requests or limits (the consumer qpu strip). +func hasDropQuantumResourceOp(ops []spec.Op) bool { + for _, op := range ops { + if op.Op == "remove" && strings.HasSuffix(op.Path, "qpu") && + (strings.Contains(op.Path, "/resources/requests/") || + strings.Contains(op.Path, "/resources/limits/")) { + return true + } + } + return false +} + func hasSidecarOp(ops []spec.Op) bool { for _, op := range ops { switch v := op.Value.(type) { diff --git a/pkg/webhook/handlers/quantum.go b/pkg/webhook/handlers/quantum.go index 2990f85..26a09b0 100644 --- a/pkg/webhook/handlers/quantum.go +++ b/pkg/webhook/handlers/quantum.go @@ -37,10 +37,10 @@ func init() { // position==1. NOT gated. The producer is one of the N members, so the // application is run exactly N times — never N+1. // - CONSUMERS (the other N-1 members): the gang (minCount N-1), each -// gated on quantum.braket/ready + preempting priority, staged with the -// interceptor in FAUX mode so its submit returns the producer's task instead -// of resubmitting (the shared-result dedup). Ungated together when the -// producer's task is ready. +// gated on quantum.braket/ready + preempting priority, told its role via +// FLUENCE_COORDINATION_ROLE=consumer and handed the producer's task id via +// FLUENCE_QUANTUM_JOB_ID. A consumer does NOT submit; it fetches the shared +// result by that id. Ungated together when the producer's task is ready. // // In CoordinationIndependent mode (the default) there is no shared result to // coordinate: every member is its own standalone producer (real submit, no gate), @@ -71,10 +71,9 @@ const ( CoordinationAnnotation = "fluence.flux-framework.org/coordination" // CoordinationShared: one real task; the producer (index 0) submits and the - // other members are gated consumers that dedup to the producer's task. Serves - // BOTH the explicit-role contract (workers never call submit) and the - // identical-script contract (workers call submit; the faux interceptor returns - // the producer's cached task) — the handler is the same for both. + // other members are gated consumers that fetch the producer's result. Each + // member is told its role via FLUENCE_COORDINATION_ROLE; a role-aware workload + // branches on it (producer submits, consumer fetches by FLUENCE_QUANTUM_JOB_ID). CoordinationShared = "shared" // CoordinationIndependent (default): every member does its own quantum work; @@ -101,7 +100,7 @@ const ( ) // quantumHandler splits a shared quantum gang into a single producer (real -// submit + sidecar) and N-1 gated faux consumers, or runs every member +// submit + sidecar) and N-1 gated, role-aware consumers, or runs every member // standalone in independent mode (see the package-level model comment). It is the // only place in the webhook that knows about quantum resources, gates, // coordination, or observe semantics. @@ -154,7 +153,8 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP pg := group + ProducerGroupSuffix m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1) ops := linkGroupOps(pod, pg) - ops = append(ops, interceptorOps(pod)...) // real (tag) mode — no fauxSubmitEnvOps + ops = append(ops, interceptorOps(pod)...) // tag mode: the producer submits for real + ops = append(ops, roleEnvOps(pod, RoleProducer)...) // FLUENCE_COORDINATION_ROLE=producer sc := sidecarFor(m) sc.EnsureRBAC(ctx, pod.Namespace) extra := []corev1.EnvVar{{Name: GangGroupEnv, Value: group}} @@ -165,11 +165,12 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP } // mutateConsumer wires a non-producer member: it joins the consumer gang -// (minCount N-1), is gated until the producer's task is ready, and runs the same -// image with the interceptor in FAUX mode so its submit returns the producer's -// task (handed over as FLUENCE_QUANTUM_JOB_ID at ungate) instead of resubmitting. -// In the explicit-role contract the consumer's script never calls submit, so the -// faux interceptor is simply unused — harmless either way. +// (minCount N-1) and is gated until the producer's task is ready. It is told its +// role (FLUENCE_COORDINATION_ROLE=consumer) and handed the producer's task id +// (FLUENCE_QUANTUM_JOB_ID, stamped on the pod by the sidecar at ungate). A +// role-aware consumer reads those and fetches the shared result instead of +// submitting — so the consumer never calls the vendor submit, and needs neither +// the interceptor nor a faux flag. func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op { m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1) ops := linkGroupOps(pod, group) @@ -178,13 +179,46 @@ func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAP // gate. applyOps gates the pod, raises priority, and stamps depends-on-*. dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate} ops = append(ops, dep.applyOps(pod)...) - ops = append(ops, interceptorOps(pod)...) - ops = append(ops, fauxSubmitEnvOps(pod)...) - log.Printf("[fluence-webhook] quantum consumer %s/%s — group %s minCount=%d, gated+faux", + ops = append(ops, consumerEnvOps(pod)...) + // A gated consumer never runs the QPU task — it only fetches the producer's + // shared result — so it must not hold the Fluxion quantum resource. Leaving it + // would make Fluxion allocate a qpu per consumer, capping the gang at the + // backend's graph qpu count and, on a single-slot real QPU, leaving the + // consumers unschedulable. Applies() already routed this pod on the request, so + // stripping it here is safe. + ops = append(ops, dropQuantumResourceOps(pod)...) + log.Printf("[fluence-webhook] quantum consumer %s/%s — group %s minCount=%d, gated (role=consumer, qpu stripped)", pod.Namespace, pod.Name, group, n-1) return ops } +// dropQuantumResourceOps removes the Fluxion quantum resource from a consumer's +// containers (requests and limits), returning the patch ops and mutating pod in +// place. Only entries that are present are removed (a JSON-patch remove on a +// missing path would fail). The sidecar container is never a consumer concern. +func dropQuantumResourceOps(pod *corev1.Pod) []spec.Op { + rn := corev1.ResourceName(QuantumResource) + // JSON Pointer escaping for the resource key: '~' -> '~0', '/' -> '~1'. + key := strings.ReplaceAll(strings.ReplaceAll(QuantumResource, "~", "~0"), "/", "~1") + var ops []spec.Op + for i, c := range pod.Spec.Containers { + if c.Name == SidecarContainerName { + continue + } + if _, ok := c.Resources.Requests[rn]; ok { + ops = append(ops, spec.Op{Op: "remove", + Path: fmt.Sprintf("/spec/containers/%d/resources/requests/%s", i, key)}) + delete(pod.Spec.Containers[i].Resources.Requests, rn) + } + if _, ok := c.Resources.Limits[rn]; ok { + ops = append(ops, spec.Op{Op: "remove", + Path: fmt.Sprintf("/spec/containers/%d/resources/limits/%s", i, key)}) + delete(pod.Spec.Containers[i].Resources.Limits, rn) + } + } + return ops +} + // coordinationMode reads the coordination annotation; default independent. func coordinationMode(pod *corev1.Pod) string { if v := spec.Annotation(pod, CoordinationAnnotation); v != "" { @@ -328,57 +362,66 @@ func escapeJSONPointer(s string) string { const QuantumClassicalPriorityClass = "fluence-quantum-classical" -// ── faux-submit (worker submit dedup) ─────────────────────────────────────────── +// ── coordination role (producer / consumer) ───────────────────────────────────── // -// Quantum-specific, and delivered through the SAME Python interceptor as the -// submitter — not a second mechanism. The submitter's interceptor tags the -// submit; the worker's interceptor (same staged code) no-ops the submit. Which -// behavior runs is selected at runtime by FLUENCE_FAUX_SUBMIT, set here on the -// worker. Workers run the submitter's image and may call submit, but by ungate -// time the task already exists, so resubmitting would duplicate it N times. +// In a shared gang each member is told its role positively, so the application +// branches on it instead of relying on any submit-interception magic: +// producer submits the one real task (and is tagged so the sidecar finds it); +// consumer does NOT submit — it reads the producer's task id and fetches the +// shared result (e.g. via the vendor's S3-backed result API). +// The role is decided at admission by isProducer (completion index 0, else the +// producer-group claim) and surfaced as FLUENCE_COORDINATION_ROLE. Because the +// election is the webhook's, this env is the single source of truth — the +// container never re-derives its role from the Job index (which loose, non-Job +// pods don't even have). const ( - // FauxSubmitEnv selects the interceptor's no-op (faux) mode on workers. - // install_interceptor (see python/fluence/providers/braket.py) reads it and - // patches the vendor submit to return the existing task instead of submitting. - FauxSubmitEnv = "FLUENCE_FAUX_SUBMIT" + // CoordinationRoleEnv carries the pod's role in a shared gang. A role-aware + // workload branches on it: RoleProducer submits, RoleConsumer fetches the + // shared result by id. Unset for standalone/independent pods (they all submit). + CoordinationRoleEnv = "FLUENCE_COORDINATION_ROLE" + RoleProducer = "producer" + RoleConsumer = "consumer" // QuantumJobIDAnnotation is the vendor-neutral task id the ungating sidecar - // stamps on each worker (mirrors python/fluence/ungate.py JOB_ID_ANNOTATION), + // stamps on each consumer (mirrors python/fluence/ungate.py JOB_ID_ANNOTATION), // BEFORE removing the gate. Surfaced into FLUENCE_QUANTUM_JOB_ID via the - // downward API so the faux interceptor can return a handle to that task. + // downward API so a consumer can fetch the producer's result by id. QuantumJobIDAnnotation = "fluence.flux-framework.org/quantum-job-id" - // QuantumJobIDEnv is the env the faux interceptor reads for the existing - // task's id. + // QuantumJobIDEnv is the env a consumer reads for the producer's task id. QuantumJobIDEnv = "FLUENCE_QUANTUM_JOB_ID" ) -// fauxSubmitEnvOps sets, on each non-sidecar worker container, the faux-mode -// marker (FLUENCE_FAUX_SUBMIT=true) and the existing task's id +// roleEnvOps sets FLUENCE_COORDINATION_ROLE= on each non-sidecar container. +func roleEnvOps(pod *corev1.Pod, role string) []spec.Op { + return setContainerEnvOps(pod, corev1.EnvVar{Name: CoordinationRoleEnv, Value: role}) +} + +// consumerEnvOps tells a consumer its role and hands it the producer's task id // (FLUENCE_QUANTUM_JOB_ID, downward API from the annotation the ungating sidecar -// stamps). The interceptor is staged separately via the shared sidecar -// InterceptorOps path — these env vars only switch its mode and hand it the id. -func fauxSubmitEnvOps(pod *corev1.Pod) []spec.Op { - faux := corev1.EnvVar{Name: FauxSubmitEnv, Value: "true"} - jobID := spec.AnnotationEnv(QuantumJobIDEnv, QuantumJobIDAnnotation) +// stamps). A consumer never submits, so it gets neither the interceptor nor any +// faux flag — just its role and the id to fetch the shared result with. +func consumerEnvOps(pod *corev1.Pod) []spec.Op { + ops := roleEnvOps(pod, RoleConsumer) + ops = append(ops, setContainerEnvOps(pod, spec.AnnotationEnv(QuantumJobIDEnv, QuantumJobIDAnnotation))...) + return ops +} + +// setContainerEnvOps appends env var e to every non-sidecar container that does +// not already define it, returning the patch ops and mutating pod in place. +func setContainerEnvOps(pod *corev1.Pod, e corev1.EnvVar) []spec.Op { var ops []spec.Op for i, c := range pod.Spec.Containers { - if c.Name == SidecarContainerName { + if c.Name == SidecarContainerName || spec.HasEnv(c, e.Name) { continue } - if !spec.HasEnv(c, FauxSubmitEnv) { - if len(c.Env) == 0 { - ops = append(ops, spec.Op{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env", i), Value: []corev1.EnvVar{faux}}) - pod.Spec.Containers[i].Env = []corev1.EnvVar{faux} - } else { - ops = append(ops, spec.Op{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), Value: faux}) - pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, faux) - } - } - if !spec.HasEnv(c, QuantumJobIDEnv) { - ops = append(ops, spec.Op{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), Value: jobID}) - pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, jobID) + if len(c.Env) == 0 { + ops = append(ops, spec.Op{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env", i), Value: []corev1.EnvVar{e}}) + pod.Spec.Containers[i].Env = []corev1.EnvVar{e} + } else { + ops = append(ops, spec.Op{Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), Value: e}) + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, e) } } return ops @@ -410,7 +453,7 @@ const ( // defaultSidecarImage is used when FLUENCE_SIDECAR_IMAGE is not set. Owned by // the quantum integration; the deployment may override it via the env var. - defaultSidecarImage = "ghcr.io/converged-computing/fluence-sidecar:latest" + defaultSidecarImage = "vanessa/fluence-sidecar:latest" // StageVolumeName / StageMountPath: the shared emptyDir the init container // stages the fluence Python package into, mounted into workload containers @@ -471,9 +514,9 @@ func ensureSidecarRBAC(ctx context.Context, m webhook.MutatorAPI, namespace stri // interceptorOps stages the fluence Python package (Model C): an init container // copies it into a shared emptyDir, mounted into every workload container // (skipping the sidecar) with PYTHONPATH + FLUENCE_POD_UID, so Python auto-imports -// the interceptor via sitecustomize. Broad mounting is safe (fail-soft when the -// vendor SDK is absent) and is required so a quantum WORKER — which runs the same -// image but does not request the resource — also gets the (faux-mode) interceptor. +// the interceptor via sitecustomize, which tags the vendor submit so the sidecar +// can find the task. Added to producers and standalone/independent pods (the ones +// that actually submit); consumers don't submit, so they don't get it. func interceptorOps(pod *corev1.Pod) []spec.Op { var ops []spec.Op @@ -489,7 +532,7 @@ func interceptorOps(pod *corev1.Pod) []spec.Op { Image: sidecarImage(), ImagePullPolicy: corev1.PullAlways, Command: []string{"sh", "-c", - fmt.Sprintf("python -m fluence.stage %s || echo '[fluence] staging skipped (interceptor unavailable)'", StageMountPath)}, + fmt.Sprintf("python3 -m fluence.stage %s || echo '[fluence] staging skipped (interceptor unavailable)'", StageMountPath)}, VolumeMounts: []corev1.VolumeMount{{Name: StageVolumeName, MountPath: StageMountPath}}, } if len(pod.Spec.InitContainers) == 0 { diff --git a/pkg/webhook/handlers/quantum_test.go b/pkg/webhook/handlers/quantum_test.go index 7330a2a..10a000d 100644 --- a/pkg/webhook/handlers/quantum_test.go +++ b/pkg/webhook/handlers/quantum_test.go @@ -6,7 +6,7 @@ SPDX-License-Identifier: Apache-2.0 // quantum_test.go — all tests for the quantum handler: the producer/consumer // shared-coordination split (no separate submitter pod), independent mode, -// faux-submit, the sidecar wiring, the Dependency primitive, and the +// the coordination role + job-id handoff, the sidecar wiring, the Dependency primitive, and the // standalone/observe paths. Shared fixtures (qpuPod, cpuPod, op helpers) live in // handlers_test.go. package handlers @@ -99,10 +99,10 @@ func mincount(t *testing.T, cs *fake.Clientset, ns, group string) (int32, bool) } // A shared-mode CONSUMER (completion index != 0, owned by Job parallelism=3) is -// gated + faux, joins the consumer gang at minCount N-1 (the split), and -// gets NO sidecar (it is gated). No separate submitter pod is ever created — the -// producer is one of the N members. -func TestSharedConsumerGatedFauxAndSplit(t *testing.T) { +// gated, told its role (FLUENCE_COORDINATION_ROLE=consumer), joins the +// consumer gang at minCount N-1 (the split), and gets NO sidecar (it is gated). +// No separate submitter pod is ever created — the producer is one of the N members. +func TestSharedConsumerGatedRoleAndSplit(t *testing.T) { ns, group, job := "default", "qg", "qg-job" par := int32(3) cs := fake.NewSimpleClientset(&batchv1.Job{ @@ -118,8 +118,11 @@ func TestSharedConsumerGatedFauxAndSplit(t *testing.T) { if hasSidecarOp(ops) { t.Error("consumer (gated) must NOT get a sidecar") } - if e, ok := envOp(ops, FauxSubmitEnv); !ok || e.Value != "true" { - t.Errorf("consumer must get %s=true", FauxSubmitEnv) + if !hasDropQuantumResourceOp(ops) { + t.Error("consumer (gated, never runs the QPU) must have its qpu resource stripped") + } + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleConsumer { + t.Errorf("consumer must get %s=%s", CoordinationRoleEnv, RoleConsumer) } // Consumer gang is minCount N-1 (the producer/consumer split). if mc, ok := mincount(t, cs, ns, group); !ok || mc != 2 { @@ -133,8 +136,8 @@ func TestSharedConsumerGatedFauxAndSplit(t *testing.T) { } // The shared-mode PRODUCER (completion index 0) is wired as the real coordinator: -// its own group-of-one -producer at minCount 1, the real sidecar (not -// faux), not gated, and told which consumer group to ungate via +// its own group-of-one -producer at minCount 1, the real sidecar, not +// gated, role=producer, and told which consumer group to ungate via // FLUENCE_GANG_GROUP. It is one of the N members — no extra pod is created. func TestSharedProducerWiredAsRealSidecar(t *testing.T) { ns, group, job := "default", "qg2", "qg2-job" @@ -152,8 +155,14 @@ func TestSharedProducerWiredAsRealSidecar(t *testing.T) { if hasGateOp(ops) { t.Error("producer must NOT be gated") } - if _, ok := envOp(ops, FauxSubmitEnv); ok { - t.Error("producer must NOT be in faux mode") + if hasDropQuantumResourceOp(ops) { + t.Error("producer must KEEP its qpu resource (it runs the real submit)") + } + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer { + t.Errorf("producer must get %s=%s", CoordinationRoleEnv, RoleProducer) + } + if _, ok := envOp(ops, QuantumJobIDEnv); ok { + t.Error("producer must NOT get FLUENCE_QUANTUM_JOB_ID (it submits its own task)") } // FLUENCE_GANG_GROUP (the consumer group to ungate) is on the sidecar. var sidecar *corev1.Container @@ -216,8 +225,8 @@ func TestSharedGangNoSeparateSubmitterPod(t *testing.T) { // ── independent mode (default) ────────────────────────────────────────────────── // A grouped quantum pod with no coordination annotation is INDEPENDENT (default): -// it does its own real submit, is not gated, not faux, and triggers no group -// split and no submitter pod. +// it does its own real submit, is not gated, carries no coordination role, and +// triggers no group split and no submitter pod. func TestIndependentGroupedQuantumIsStandalone(t *testing.T) { ns, group, job := "default", "indep", "indep-job" par := int32(3) @@ -231,8 +240,8 @@ func TestIndependentGroupedQuantumIsStandalone(t *testing.T) { if hasGateOp(ops) { t.Error("independent member must not be gated") } - if _, ok := envOp(ops, FauxSubmitEnv); ok { - t.Error("independent member must not be faux") + if _, ok := envOp(ops, CoordinationRoleEnv); ok { + t.Error("independent member must not get a coordination role env") } if _, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); ok { t.Error("independent mode must not create a producer group") @@ -244,7 +253,7 @@ func TestIndependentGroupedQuantumIsStandalone(t *testing.T) { } // A standalone quantum pod (no group, no owner → group of one) does its own real -// submit: interceptor staged, but no gating, no faux, and no separate submitter. +// submit: interceptor staged, but no gating, no coordination role, no submitter. func TestStandaloneQuantumIsReal(t *testing.T) { ns := "default" cs := fake.NewSimpleClientset() @@ -258,8 +267,8 @@ func TestStandaloneQuantumIsReal(t *testing.T) { if hasGateOp(ops) { t.Error("standalone quantum pod must not be gated") } - if _, ok := envOp(ops, FauxSubmitEnv); ok { - t.Error("standalone quantum pod must not be faux") + if _, ok := envOp(ops, CoordinationRoleEnv); ok { + t.Error("standalone quantum pod must not get a coordination role env") } pods, _ := cs.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{}) if len(pods.Items) != 0 { @@ -286,7 +295,7 @@ func TestSharedGroupOfOneIsStandalone(t *testing.T) { } } -// ── faux-submit + dependency ──────────────────────────────────────────────────── +// ── role + dependency ──────────────────────────────────────────────────── // envOp returns the env var op with the given name, if present (covers both // single-EnvVar and []EnvVar op shapes). @@ -400,31 +409,31 @@ func TestDependencyOfRoundTrip(t *testing.T) { } } -// The consumer is staged with the SAME interceptor as the producer (PYTHONPATH + -// FLUENCE_POD_UID), put into faux mode (FLUENCE_FAUX_SUBMIT=true), and handed the -// existing task id via the FLUENCE_QUANTUM_JOB_ID downward-API env. One -// mechanism, two modes — no separate ConfigMap shim. The user sets nothing. -func TestQuantumConsumerStagedWithFauxSubmit(t *testing.T) { - ns, group, job := "default", "fauxq", "fauxq-job" +// The consumer is role-aware: it gets FLUENCE_COORDINATION_ROLE=consumer and the +// producer's task id via the FLUENCE_QUANTUM_JOB_ID downward-API env, and it is +// NOT staged with the interceptor (a consumer never submits, so it needs neither +// the interceptor nor any faux flag). The user's script branches on the role. +func TestQuantumConsumerStagedWithRole(t *testing.T) { + ns, group, job := "default", "roleq", "roleq-job" par := int32(2) cs := fake.NewSimpleClientset(&batchv1.Job{ ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, Spec: batchv1.JobSpec{Parallelism: &par, Completions: &par}}) m := &webhook.Mutator{Clientset: cs} - ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "fauxq-1", job, "1")) + ops := m.Mutate(context.Background(), sharedQPUPod(ns, group, "roleq-1", job, "1")) - // Same interceptor staging as the producer (PYTHONPATH set on the consumer). - if _, ok := envOp(ops, "PYTHONPATH"); !ok { - t.Errorf("consumer not staged with the interceptor (no PYTHONPATH); ops: %+v", ops) + // Role surfaced to the container. + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleConsumer { + t.Errorf("consumer missing %s=%s (got %+v, ok=%v)", CoordinationRoleEnv, RoleConsumer, e, ok) } - // Faux mode selected. - if e, ok := envOp(ops, FauxSubmitEnv); !ok || e.Value != "true" { - t.Errorf("consumer missing %s=true (got %+v, ok=%v)", FauxSubmitEnv, e, ok) + // A consumer never submits, so it is NOT staged with the interceptor. + if _, ok := envOp(ops, "PYTHONPATH"); ok { + t.Error("consumer must NOT be staged with the interceptor (it does not submit)") } - // Existing task id sourced from the annotation the ungating sidecar stamps. + // Producer's task id sourced from the annotation the ungating sidecar stamps. e, ok := envOp(ops, QuantumJobIDEnv) if !ok { t.Fatalf("consumer missing %s env", QuantumJobIDEnv) diff --git a/python/Dockerfile b/python/Dockerfile index 5cff209..03bf153 100644 --- a/python/Dockerfile +++ b/python/Dockerfile @@ -1,14 +1,6 @@ # Fluence quantum coordination sidecar image. -# -# Bakes the `fluence` Python package in, so the SAME image serves three roles -# (versions locked together — they are built from this one source tree): -# 1. sidecar container — runs `fluence-sidecar` (the coordination loop) -# 2. init container — runs `python -m fluence.stage ` to copy the -# pure-Python package + sitecustomize into a shared -# volume that the webhook mounts onto the user -# container's PYTHONPATH (Model C delivery) -# 3. (the staged copy) — the user container imports the staged package via -# sitecustomize; no install required in the user image +# TODO organize into subdirectories when we have >1 image +#sitecustomize; no install required in the user image FROM python:3.11-slim LABEL org.opencontainers.image.source="https://github.com/converged-computing/fluence" @@ -27,11 +19,8 @@ COPY . /app # Install the package with the vendor SDKs the SIDECAR needs for its own API # calls (task discovery / queue polling). The interceptor staged into the user # container carries NONE of these — it patches whatever SDK the user already has. -RUN pip install --no-cache-dir ".[all]" +RUN pip install --no-cache-dir ".[all]" && ln -s $(which python3) /usr/bin/python -ENV FLUENCE_TASK_DISCOVERY_TIMEOUT=300 +ENV FLUENCE_TASK_DISCOVERY_TIMEOUT=300000 ENV FLUENCE_POLL_INTERVAL=30 - -# Default entrypoint is the sidecar loop; the init container overrides the -# command with `python -m fluence.stage `. CMD ["fluence-sidecar"] diff --git a/python/fluence/providers/braket.py b/python/fluence/providers/braket.py index 33f1683..d6e6ea9 100644 --- a/python/fluence/providers/braket.py +++ b/python/fluence/providers/braket.py @@ -49,26 +49,13 @@ def install_interceptor(self, pod_uid: str) -> bool: return False # braket SDK not in this container — fail-soft original_run = AwsDevice.run - faux = os.environ.get("FLUENCE_FAUX_SUBMIT", "").lower() == "true" def patched_run(self, task_specification, *args, **kwargs): - # Two modes of the ONE interceptor: - # faux (worker): the one-off submitter already submitted this task - # before the worker was ungated, so submitting again would - # duplicate it N times. Return a handle to the EXISTING task (by - # ARN, handed over via FLUENCE_QUANTUM_JOB_ID) without submitting. - # tag (submitter): stamp the pod-uid tag so the sidecar can find the - # task in the queue, then submit for real. - if faux: - arn = os.environ.get("FLUENCE_QUANTUM_JOB_ID", "") - if arn: - from braket.aws import AwsQuantumTask - log(f"faux-submit: returning existing task {arn} " - f"(no resubmission)") - return AwsQuantumTask(arn=arn) - log("faux-submit: no job id; suppressing submit " - "(worker consumes results by id)") - return None + # Tag the submission with the pod-uid so the sidecar can find this task + # in the queue. The interceptor is staged only on pods that actually + # submit (producers and standalone/independent pods); consumers are + # role-aware (FLUENCE_COORDINATION_ROLE=consumer) and never call run(), + # so there is no submit to intercept and no faux mode to select. if pod_uid: tags = kwargs.get("tags", {}) tags[TAG_KEY] = pod_uid diff --git a/test/e2e/quantum/04-gang-env-contract.sh b/test/e2e/quantum/04-gang-env-contract.sh index f887448..157f78b 100755 --- a/test/e2e/quantum/04-gang-env-contract.sh +++ b/test/e2e/quantum/04-gang-env-contract.sh @@ -7,8 +7,9 @@ # Spec layer only (these are downward-API valueFrom refs whose VALUES resolve at # placement, but whose PRESENCE is deterministic at admission), so no scheduling, # no qpu capacity, no logs — it cannot flake on capacity. Contract: -# consumer (faux): FLUENCE_FAUX_SUBMIT, FLUENCE_QUANTUM_JOB_ID, PYTHONPATH, FLUXION_BACKEND -# producer: FLUENCE_GANG_GROUP on the sidecar (real submit, ungates the consumers) +# consumer (role): FLUENCE_COORDINATION_ROLE=consumer, FLUENCE_QUANTUM_JOB_ID, FLUXION_BACKEND +# (NO interceptor/PYTHONPATH — a consumer never submits) +# producer (role): FLUENCE_COORDINATION_ROLE=producer + FLUENCE_GANG_GROUP on the sidecar set -euo pipefail HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE%/test/e2e/*}/test/e2e/lib.sh" @@ -24,15 +25,28 @@ has_env() { kubectl get pod "$1" -o jsonpath="{.spec.containers[?(@.name=='$2')].env[*].name}" \ 2>/dev/null | tr ' ' '\n' | grep -qx "$3" } +# value of env $3 in container $2 of pod $1 (empty if absent) +env_val() { + kubectl get pod "$1" -o jsonpath="{.spec.containers[?(@.name=='$2')].env[?(@.name=='$3')].value}" \ + 2>/dev/null || true +} -log "checking the webhook wired the faux contract onto the consumer" -for i in $(seq 1 15); do has_env "$CONSUMER" app FLUENCE_FAUX_SUBMIT && break; sleep 2; done -for v in FLUENCE_FAUX_SUBMIT FLUENCE_QUANTUM_JOB_ID PYTHONPATH FLUXION_BACKEND; do +log "checking the webhook wired the consumer role contract" +for i in $(seq 1 15); do has_env "$CONSUMER" app FLUENCE_COORDINATION_ROLE && break; sleep 2; done +# Present: the role (=consumer), the producer's task id, and the backend. +for v in FLUENCE_COORDINATION_ROLE FLUENCE_QUANTUM_JOB_ID FLUXION_BACKEND; do has_env "$CONSUMER" app "$v" \ || { kubectl get pod "$CONSUMER" -o yaml | sed -n '/containers:/,/status:/p'; \ fail "consumer 'app' container missing env '$v'"; } log " consumer has env: $v" done +role="$(env_val "$CONSUMER" app FLUENCE_COORDINATION_ROLE)" +[ "$role" = "consumer" ] || fail "consumer role=$role, want consumer" +# Absent: a consumer never submits, so no interceptor staging and no faux flag. +for v in PYTHONPATH FLUENCE_FAUX_SUBMIT; do + ! has_env "$CONSUMER" app "$v" || fail "consumer must NOT carry '$v' (it does not submit)" +done +log " consumer role=consumer, no interceptor/faux" # The producer's sidecar must know which consumer group to ungate. log "checking the producer sidecar has FLUENCE_GANG_GROUP=$GROUP" @@ -43,13 +57,15 @@ gg="$(kubectl get pod "$PRODUCER" \ [ "$gg" = "$GROUP" ] || fail "producer sidecar FLUENCE_GANG_GROUP=$gg, want $GROUP" log " producer sidecar has FLUENCE_GANG_GROUP=$gg" -# And the producer must NOT be in faux mode (it does the real submit). -if has_env "$PRODUCER" app FLUENCE_FAUX_SUBMIT; then - fail "producer must NOT carry FLUENCE_FAUX_SUBMIT (it submits for real)" +# The producer carries role=producer and is the real submitter (no consumer id). +prole="$(env_val "$PRODUCER" app FLUENCE_COORDINATION_ROLE)" +[ "$prole" = "producer" ] || fail "producer role=$prole, want producer" +if has_env "$PRODUCER" app FLUENCE_QUANTUM_JOB_ID; then + fail "producer must NOT carry FLUENCE_QUANTUM_JOB_ID (it submits its own task)" fi -log " producer is not faux" +log " producer role=producer, submits its own task" -log "PASS 8: webhook injects the consumer(faux) + producer(real) env contract at admission" +log "PASS 8: webhook injects the consumer(role) + producer(role) env contract at admission" kubectl delete -f examples/test/e2e/quantum/quantum-gang-pods.yaml --wait=false || true for g in "$GROUP" "${GROUP}-producer"; do