From 09f90904ce8a7adeab07c9f0e8714d11948ed691 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 22 May 2026 11:45:27 +0000 Subject: [PATCH 1/6] bundle: extract DeploymentLock interface + workspace filesystem impl Pure code-movement refactor. Wraps the existing workspace-filesystem lock behavior behind a DeploymentLock interface so a follow-up PR can introduce an alternative metadata-service-backed lock implementation without touching deploy/destroy/bind callers again. What changed: - New bundle/deploy/lock/lock.go: DeploymentLock interface, Goal enum (moved from release.go), DeploymentStatus enum, and a NewDeploymentLock factory that unconditionally returns the workspace filesystem implementation. - New bundle/deploy/lock/workspace_filesystem.go: workspaceFilesystemLock struct that implements DeploymentLock. Preserves the historical behavior of the deleted acquire.go / release.go mutators: lock-disabled short-circuit, locker.CreateLocker initialization, the permissions.ReportPossiblePermissionDenied branch on fs.ErrPermission / fs.ErrNotExist, and the destroy-mode locker.AllowLockFileNotExist unlock quirk. - Deleted bundle/deploy/lock/acquire.go and bundle/deploy/lock/release.go. - Updated bundle/phases/{deploy,destroy,bind}.go to construct the lock once via NewDeploymentLock and call Acquire / Release directly instead of through bundle.ApplyContext. The deferred Release now reports DeploymentSuccess / DeploymentFailure based on logdiag.HasError so a future DMS-backed implementation can record the outcome. Behavior is preserved end-to-end: lock-related acceptance goldens (pipelines/{deploy,destroy}/force-lock, bundle/help/bundle-{deploy, destroy}) all pass unchanged. --- bundle/deploy/lock/acquire.go | 69 ------------------ bundle/deploy/lock/lock.go | 41 +++++++++++ bundle/deploy/lock/release.go | 58 --------------- bundle/deploy/lock/workspace_filesystem.go | 84 ++++++++++++++++++++++ bundle/phases/bind.go | 26 +++++-- bundle/phases/deploy.go | 20 ++++-- bundle/phases/destroy.go | 13 +++- 7 files changed, 169 insertions(+), 142 deletions(-) delete mode 100644 bundle/deploy/lock/acquire.go create mode 100644 bundle/deploy/lock/lock.go delete mode 100644 bundle/deploy/lock/release.go create mode 100644 bundle/deploy/lock/workspace_filesystem.go diff --git a/bundle/deploy/lock/acquire.go b/bundle/deploy/lock/acquire.go deleted file mode 100644 index 6e4844ca5ff..00000000000 --- a/bundle/deploy/lock/acquire.go +++ /dev/null @@ -1,69 +0,0 @@ -package lock - -import ( - "context" - "errors" - "io/fs" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/permissions" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type acquire struct{} - -func Acquire() bundle.Mutator { - return &acquire{} -} - -func (m *acquire) Name() string { - return "lock:acquire" -} - -func (m *acquire) init(ctx context.Context, b *bundle.Bundle) error { - user := b.Config.Workspace.CurrentUser.UserName - dir := b.Config.Workspace.StatePath - l, err := locker.CreateLocker(user, dir, b.WorkspaceClient(ctx)) - if err != nil { - return err - } - - b.Locker = l - return nil -} - -func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - err := m.init(ctx, b) - if err != nil { - return diag.FromErr(err) - } - - force := b.Config.Bundle.Deployment.Lock.Force - log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) - err = b.Locker.Lock(ctx, force) - if err != nil { - log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) - - if errors.Is(err, fs.ErrPermission) { - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - if errors.Is(err, fs.ErrNotExist) { - // If we get a "doesn't exist" error from the API this indicates - // we either don't have permissions or the path is invalid. - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - return diag.FromErr(err) - } - - return nil -} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go new file mode 100644 index 00000000000..6e3339d6fdb --- /dev/null +++ b/bundle/deploy/lock/lock.go @@ -0,0 +1,41 @@ +package lock + +import ( + "context" + + "github.com/databricks/cli/bundle" +) + +// Goal describes the purpose of a deployment operation. +type Goal string + +const ( + GoalBind = Goal("bind") + GoalUnbind = Goal("unbind") + GoalDeploy = Goal("deploy") + GoalDestroy = Goal("destroy") +) + +// DeploymentStatus indicates whether the deployment operation succeeded or failed. +type DeploymentStatus int + +const ( + DeploymentSuccess DeploymentStatus = iota + DeploymentFailure +) + +// DeploymentLock manages the deployment lock lifecycle. +type DeploymentLock interface { + // Acquire acquires the deployment lock. + Acquire(ctx context.Context) error + + // Release releases the deployment lock with the given deployment status. + Release(ctx context.Context, status DeploymentStatus) error +} + +// NewDeploymentLock returns a DeploymentLock backed by the workspace +// filesystem. This factory exists so a future change can swap in alternative +// lock implementations without touching callers. +func NewDeploymentLock(b *bundle.Bundle, goal Goal) DeploymentLock { + return newWorkspaceFilesystemLock(b, goal) +} diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go deleted file mode 100644 index 26f95edfc95..00000000000 --- a/bundle/deploy/lock/release.go +++ /dev/null @@ -1,58 +0,0 @@ -package lock - -import ( - "context" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type Goal string - -const ( - GoalBind = Goal("bind") - GoalUnbind = Goal("unbind") - GoalDeploy = Goal("deploy") - GoalDestroy = Goal("destroy") -) - -type release struct { - goal Goal -} - -func Release(goal Goal) bundle.Mutator { - return &release{goal} -} - -func (m *release) Name() string { - return "lock:release" -} - -func (m *release) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - // Return early if the locker is not set. - // It is likely an error occurred prior to initialization of the locker instance. - if b.Locker == nil { - log.Warnf(ctx, "Unable to release lock if locker is not configured") - return nil - } - - log.Infof(ctx, "Releasing deployment lock") - switch m.goal { - case GoalDeploy: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalBind, GoalUnbind: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalDestroy: - return diag.FromErr(b.Locker.Unlock(ctx, locker.AllowLockFileNotExist)) - default: - return diag.Errorf("unknown goal for lock release: %s", m.goal) - } -} diff --git a/bundle/deploy/lock/workspace_filesystem.go b/bundle/deploy/lock/workspace_filesystem.go new file mode 100644 index 00000000000..55da52d6a2e --- /dev/null +++ b/bundle/deploy/lock/workspace_filesystem.go @@ -0,0 +1,84 @@ +package lock + +import ( + "context" + "errors" + "io/fs" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/permissions" + "github.com/databricks/cli/libs/locker" + "github.com/databricks/cli/libs/log" +) + +// workspaceFilesystemLock implements DeploymentLock using a lock file in the +// bundle's workspace state path. This preserves the historical behavior of +// the previous lock.Acquire / lock.Release mutators. +type workspaceFilesystemLock struct { + b *bundle.Bundle + goal Goal +} + +func newWorkspaceFilesystemLock(b *bundle.Bundle, goal Goal) *workspaceFilesystemLock { + return &workspaceFilesystemLock{b: b, goal: goal} +} + +func (l *workspaceFilesystemLock) Acquire(ctx context.Context) error { + b := l.b + + // Return early if locking is disabled. + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + user := b.Config.Workspace.CurrentUser.UserName + dir := b.Config.Workspace.StatePath + lk, err := locker.CreateLocker(user, dir, b.WorkspaceClient(ctx)) + if err != nil { + return err + } + + b.Locker = lk + + force := b.Config.Bundle.Deployment.Lock.Force + log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) + err = lk.Lock(ctx, force) + if err != nil { + log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) + + // If we get a permission or "doesn't exist" error from the API this + // indicates we either don't have permissions or the path is invalid. + if errors.Is(err, fs.ErrPermission) || errors.Is(err, fs.ErrNotExist) { + diags := permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) + return diags.Error() + } + + return err + } + + return nil +} + +func (l *workspaceFilesystemLock) Release(ctx context.Context, _ DeploymentStatus) error { + b := l.b + + // Return early if locking is disabled. + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + // Return early if the locker is not set. + // It is likely an error occurred prior to initialization of the locker instance. + if b.Locker == nil { + log.Warnf(ctx, "Unable to release lock if locker is not configured") + return nil + } + + log.Infof(ctx, "Releasing deployment lock") + if l.goal == GoalDestroy { + return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist) + } + return b.Locker.Unlock(ctx) +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index 48ba7755714..7b3ce12df64 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -23,13 +23,20 @@ import ( func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions, engine engine.EngineType) { log.Info(ctx, "Phase: bind") - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(b, lock.GoalBind) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalBind)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { @@ -119,13 +126,20 @@ func jsonDump(ctx context.Context, v any, field string) string { func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, resourceKey string, engine engine.EngineType) { log.Info(ctx, "Phase: unbind") - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(b, lock.GoalUnbind) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalUnbind)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 1db4b2e02da..d23b9ac2c91 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -125,19 +125,27 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand // Core mutators that CRUD resources and modify deployment state. These // mutators need informed consent if they are potentially destructive. - bundle.ApplySeqContext(ctx, b, - scripts.Execute(config.ScriptPreDeploy), - lock.Acquire(), - ) - + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) if logdiag.HasError(ctx) { // lock is not acquired here return } + dl := lock.NewDeploymentLock(b, lock.GoalDeploy) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } + // lock is acquired here defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() uploadLibraries(ctx, b, libs) diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 98e6f7fee2a..741a30c99c0 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -120,13 +120,20 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(b, lock.GoalDestroy) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if !engine.IsDirect() { From 204b40a14bd2b5c7dfbcb68afa85241dfcca6fe8 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 26 May 2026 11:07:12 +0000 Subject: [PATCH 2/6] bundle: add DATABRICKS_BUNDLE_MANAGED_STATE gate + DMS-backed deployment lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires up a new env-var gate (DATABRICKS_BUNDLE_MANAGED_STATE) that switches the deployment lock from the workspace filesystem to the bundle deployment metadata service (DMS). The env var accepts the usual boolean spellings (true/false, 1/0, yes/no, on/off); the historical filesystem lock is the default. The DMS-backed lock uses the SDK's databricks-sdk-go/service/bundle client (merged via #5311, available since v0.135.0) — no hand-rolled DMS client. Acquire calls CreateDeployment / CreateVersion and starts a background heartbeat goroutine; Release stops the heartbeat and calls CompleteVersion (plus DeleteDeployment on successful destroy). Bind and Unbind are not yet supported under DMS and return an error at lock construction. Deployment ID persistence lives inside the lock package for now (managed_service.json in the workspace state dir). In step 4 of the DMS split it will move to bundle/statemgmt so it can be shared with the state-from-DMS path. Co-authored-by: Isaac --- .../dms/release-lock-error/databricks.yml | 11 + .../dms/release-lock-error/out.test.toml | 4 + .../bundle/dms/release-lock-error/output.txt | 38 ++ .../bundle/dms/release-lock-error/script | 8 + .../bundle/dms/release-lock-error/test.toml | 4 + acceptance/bundle/dms/test.toml | 7 + .../lock/deployment_metadata_service.go | 302 +++++++++++++ .../lock/deployment_metadata_service_test.go | 58 +++ bundle/deploy/lock/lock.go | 19 +- bundle/env/deployment_metadata.go | 30 ++ bundle/phases/bind.go | 12 +- bundle/phases/deploy.go | 6 +- bundle/phases/destroy.go | 6 +- libs/testserver/deployment_metadata.go | 405 ++++++++++++++++++ libs/testserver/fake_workspace.go | 5 + libs/testserver/handlers.go | 29 ++ 16 files changed, 935 insertions(+), 9 deletions(-) create mode 100644 acceptance/bundle/dms/release-lock-error/databricks.yml create mode 100644 acceptance/bundle/dms/release-lock-error/out.test.toml create mode 100644 acceptance/bundle/dms/release-lock-error/output.txt create mode 100755 acceptance/bundle/dms/release-lock-error/script create mode 100644 acceptance/bundle/dms/release-lock-error/test.toml create mode 100644 acceptance/bundle/dms/test.toml create mode 100644 bundle/deploy/lock/deployment_metadata_service.go create mode 100644 bundle/deploy/lock/deployment_metadata_service_test.go create mode 100644 bundle/env/deployment_metadata.go create mode 100644 libs/testserver/deployment_metadata.go diff --git a/acceptance/bundle/dms/release-lock-error/databricks.yml b/acceptance/bundle/dms/release-lock-error/databricks.yml new file mode 100644 index 00000000000..94323b84d93 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: dms-release-lock-error + +targets: + fail-complete: + default: true + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/release-lock-error/out.test.toml b/acceptance/bundle/dms/release-lock-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt new file mode 100644 index 00000000000..476ba422ef5 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -0,0 +1,38 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... +Deploying resources... +Updating deployment state... +Deployment complete! +Warn: Failed to release deployment lock: simulated complete version failure + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "fail-complete" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "fail-complete", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/release-lock-error/script b/acceptance/bundle/dms/release-lock-error/script new file mode 100755 index 00000000000..86220e4be0a --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/script @@ -0,0 +1,8 @@ +# Deploy with the deployment metadata service enabled. +# The target name "fail-complete" instructs the fake DMS server to fail the +# CompleteVersion call so the CLI exercises the "lock release failed" branch. +trace $CLI bundle deploy + +# Print the DMS requests to verify the lock release was attempted. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/release-lock-error/test.toml b/acceptance/bundle/dms/release-lock-error/test.toml new file mode 100644 index 00000000000..58ed9f5172c --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/test.toml @@ -0,0 +1,4 @@ +Ignore = [".databricks"] + +# Override target to "fail-complete" which makes the fake DMS server's +# CompleteVersion endpoint return an error, simulating a release failure. diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..1b258d2613a --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,7 @@ +Badness = "Uses local test server; enable on cloud once the deployment metadata service is in production" +Local = true +Cloud = false +RecordRequests = true + +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..afb91ef5f79 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,302 @@ +package lock + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/google/uuid" +) + +// defaultHeartbeatInterval is how often the background heartbeat goroutine +// renews the DMS-side lock lease while a deployment is in progress. +const defaultHeartbeatInterval = 30 * time.Second + +// managedServiceFileName is the workspace state file where the lock package +// persists the DMS deployment_id across CLI invocations. It is intentionally +// scoped to this package for now; once the state-from-DMS path lands the +// file (and accompanying struct) will move to bundle/statemgmt so both the +// lock and state managers can share it. +const managedServiceFileName = "managed_service.json" + +// managedServiceJSON is the on-disk shape of managedServiceFileName. +type managedServiceJSON struct { + DeploymentID string `json:"deployment_id"` +} + +// metadataServiceLock implements DeploymentLock against the bundle deployment +// metadata service (DMS). The lock is acquired by creating a new Version +// under the deployment; a background goroutine renews the lock lease via +// Heartbeat calls; the lock is released by CompleteVersion. +type metadataServiceLock struct { + b *bundle.Bundle + versionType sdkbundle.VersionType + + svc sdkbundle.BundleInterface + deploymentID string + versionID string + + stopHeartbeat context.CancelFunc +} + +func newMetadataServiceLock(b *bundle.Bundle, goal Goal) (*metadataServiceLock, error) { + versionType, err := goalToVersionType(goal) + if err != nil { + return nil, err + } + return &metadataServiceLock{b: b, versionType: versionType}, nil +} + +// goalToVersionType maps a deployment Goal onto the DMS VersionType enum. +// Bind and Unbind are not yet supported under DMS — they will gain dedicated +// DMS operations in a later change. +func goalToVersionType(goal Goal) (sdkbundle.VersionType, error) { + switch goal { + case GoalDeploy: + return sdkbundle.VersionTypeVersionTypeDeploy, nil + case GoalDestroy: + return sdkbundle.VersionTypeVersionTypeDestroy, nil + case GoalBind, GoalUnbind: + return "", fmt.Errorf("%s is not supported with the deployment metadata service", goal) + default: + return "", fmt.Errorf("unknown deployment goal: %s", goal) + } +} + +func (l *metadataServiceLock) Acquire(ctx context.Context) error { + if l.b.Config.Bundle.Deployment.Lock.Force { + return errors.New("force lock is not supported with the deployment metadata service") + } + + l.svc = l.b.WorkspaceClient(ctx).Bundle + + deploymentID, versionID, err := acquireLock(ctx, l.b, l.svc, l.versionType) + if err != nil { + return err + } + + l.deploymentID = deploymentID + l.versionID = versionID + l.stopHeartbeat = startHeartbeat(ctx, l.svc, deploymentID, versionID) + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, versionID) + return nil +} + +func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStatus) error { + // Stop the heartbeat first so its in-flight request doesn't race with + // CompleteVersion below. + if l.stopHeartbeat != nil { + l.stopHeartbeat() + } + + // If Acquire failed before reaching CreateVersion there is nothing to release. + if l.svc == nil || l.deploymentID == "" || l.versionID == "" { + return nil + } + + reason := sdkbundle.VersionCompleteVersionCompleteSuccess + if status == DeploymentFailure { + reason = sdkbundle.VersionCompleteVersionCompleteFailure + } + + versionName := fmt.Sprintf("deployments/%s/versions/%s", l.deploymentID, l.versionID) + if _, err := l.svc.CompleteVersion(ctx, sdkbundle.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }); err != nil { + return err + } + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%s", + l.deploymentID, l.versionID, reason) + + // On successful destroy, delete the deployment record. Surface failures + // to the caller — they are deploy-correctness issues, not best-effort + // cleanup. + if status == DeploymentSuccess && l.versionType == sdkbundle.VersionTypeVersionTypeDestroy { + if err := l.svc.DeleteDeployment(ctx, sdkbundle.DeleteDeploymentRequest{ + Name: "deployments/" + l.deploymentID, + }); err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + return nil +} + +// acquireLock implements the lock acquisition protocol: +// 1. Resolve the deployment ID from managed_service.json (or generate a new one). +// 2. CreateDeployment for fresh IDs; GetDeployment otherwise to learn the +// next version number. +// 3. CreateVersion to acquire the lock. +func acquireLock(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, versionType sdkbundle.VersionType) (deploymentID, versionID string, err error) { + deploymentID, isNew, err := resolveDeploymentID(ctx, b) + if err != nil { + return "", "", err + } + + if isNew { + // Fresh deployment: create the record at version 1. + _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ + DeploymentId: deploymentID, + Deployment: sdkbundle.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", "", fmt.Errorf("failed to create deployment: %w", createErr) + } + // Persist the deployment ID only after the server-side record exists, + // so a failed CreateDeployment doesn't leave a dangling ID on disk. + if err := writeDeploymentID(ctx, b, deploymentID); err != nil { + return "", "", err + } + versionID = "1" + } else { + // Existing deployment: ask the server for the last version ID. + dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + deploymentID, + }) + if getErr != nil { + return "", "", fmt.Errorf("failed to get deployment: %w", getErr) + } + next, parseErr := nextVersionID(dep.LastVersionId) + if parseErr != nil { + return "", "", parseErr + } + versionID = next + } + + if _, err := svc.CreateVersion(ctx, sdkbundle.CreateVersionRequest{ + Parent: "deployments/" + deploymentID, + VersionId: versionID, + Version: sdkbundle.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }); err != nil { + return "", "", fmt.Errorf("failed to acquire deployment lock: %w", err) + } + + return deploymentID, versionID, nil +} + +// nextVersionID returns the next monotonic version ID following lastVersionID. +// An empty lastVersionID means "no prior versions" so the next ID is "1". +func nextVersionID(lastVersionID string) (string, error) { + if lastVersionID == "" { + return "1", nil + } + n, err := strconv.ParseInt(lastVersionID, 10, 64) + if err != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", lastVersionID, err) + } + return strconv.FormatInt(n+1, 10), nil +} + +// resolveDeploymentID returns the deployment ID for this bundle. If +// managed_service.json exists in the workspace state directory and contains a +// deployment ID, it is reused. Otherwise a new UUID is generated and the +// caller must write it to disk after CreateDeployment succeeds. +func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, bool, error) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return "", false, fmt.Errorf("failed to create state filer: %w", err) + } + + reader, readErr := f.Read(ctx, managedServiceFileName) + if readErr == nil { + defer reader.Close() + data, err := io.ReadAll(reader) + if err != nil { + return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, err) + } + var sj managedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + return "", false, fmt.Errorf("failed to parse %s: %w", managedServiceFileName, err) + } + if sj.DeploymentID != "" { + return sj.DeploymentID, false, nil + } + // File exists but has no deployment_id — treat as fresh. + } else if !errors.Is(readErr, fs.ErrNotExist) { + return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, readErr) + } + + return uuid.New().String(), true, nil +} + +func writeDeploymentID(ctx context.Context, b *bundle.Bundle, deploymentID string) error { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return fmt.Errorf("failed to create state filer: %w", err) + } + data, err := json.Marshal(managedServiceJSON{DeploymentID: deploymentID}) + if err != nil { + return fmt.Errorf("failed to marshal %s: %w", managedServiceFileName, err) + } + if err := f.Write(ctx, managedServiceFileName, bytes.NewReader(data), + filer.CreateParentDirectories, filer.OverwriteIfExists); err != nil { + return fmt.Errorf("failed to write %s: %w", managedServiceFileName, err) + } + return nil +} + +// startHeartbeat spawns a goroutine that renews the DMS lock lease at +// defaultHeartbeatInterval. The returned cancel func stops the goroutine. +// Heartbeat errors that indicate the version was already completed (HTTP 409 +// ABORTED) are treated as benign termination; all other errors are logged +// and the goroutine continues so a transient network blip doesn't tear down +// the deploy. +func startHeartbeat(parent context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(parent) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, err := svc.Heartbeat(ctx, sdkbundle.HeartbeatRequest{Name: versionName}); err != nil { + if isAborted(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + continue + } + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", + deploymentID, versionID) + } + } + }() + + return cancel +} + +// isAborted reports whether err is the DMS-specific "409 ABORTED" response +// the server emits when the heartbeat target version is no longer active. +func isAborted(err error) bool { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" { + return true + } + return false +} diff --git a/bundle/deploy/lock/deployment_metadata_service_test.go b/bundle/deploy/lock/deployment_metadata_service_test.go new file mode 100644 index 00000000000..76b09d6532c --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service_test.go @@ -0,0 +1,58 @@ +package lock + +import ( + "testing" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" +) + +func TestGoalToVersionType(t *testing.T) { + tests := []struct { + goal Goal + want sdkbundle.VersionType + wantErr bool + }{ + {goal: GoalDeploy, want: sdkbundle.VersionTypeVersionTypeDeploy}, + {goal: GoalDestroy, want: sdkbundle.VersionTypeVersionTypeDestroy}, + {goal: GoalBind, wantErr: true}, + {goal: GoalUnbind, wantErr: true}, + {goal: Goal("garbage"), wantErr: true}, + } + for _, tt := range tests { + t.Run(string(tt.goal), func(t *testing.T) { + got, err := goalToVersionType(tt.goal) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestNextVersionID(t *testing.T) { + tests := []struct { + name string + last string + want string + wantErr bool + }{ + {name: "empty starts at 1", last: "", want: "1"}, + {name: "increments numeric", last: "1", want: "2"}, + {name: "increments larger numeric", last: "42", want: "43"}, + {name: "rejects non-numeric", last: "v1", wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := nextVersionID(tt.last) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go index 6e3339d6fdb..58162aba946 100644 --- a/bundle/deploy/lock/lock.go +++ b/bundle/deploy/lock/lock.go @@ -4,6 +4,7 @@ import ( "context" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/env" ) // Goal describes the purpose of a deployment operation. @@ -33,9 +34,17 @@ type DeploymentLock interface { Release(ctx context.Context, status DeploymentStatus) error } -// NewDeploymentLock returns a DeploymentLock backed by the workspace -// filesystem. This factory exists so a future change can swap in alternative -// lock implementations without touching callers. -func NewDeploymentLock(b *bundle.Bundle, goal Goal) DeploymentLock { - return newWorkspaceFilesystemLock(b, goal) +// NewDeploymentLock returns a DeploymentLock implementation chosen based on +// the DATABRICKS_BUNDLE_MANAGED_STATE environment variable: when set to a +// truthy value the deployment metadata service backs the lock, otherwise the +// historical workspace-filesystem lock is used. +// +// Note: today the env var alone gates the DMS path. Once the broader managed- +// state feature lands the gate will move behind a richer predicate (e.g. +// statemgmt.IsDmsActive) that also checks server-side opt-in. +func NewDeploymentLock(ctx context.Context, b *bundle.Bundle, goal Goal) (DeploymentLock, error) { + if env.IsManagedState(ctx) { + return newMetadataServiceLock(b, goal) + } + return newWorkspaceFilesystemLock(b, goal), nil } diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 00000000000..004f22c495b --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,30 @@ +package env + +import ( + "context" + + envlib "github.com/databricks/cli/libs/env" +) + +// managedStateVariable names the environment variable that controls whether +// server-managed state (via the deployment metadata service) is used for +// locking and, in a future change, resource state management. +// +// The variable is treated as a boolean and accepts the usual spellings: +// "true"/"false", "1"/"0", "yes"/"no", "on"/"off" (case-insensitive). An +// empty or absent value falls back to the historical filesystem-based +// behavior. +const managedStateVariable = "DATABRICKS_BUNDLE_MANAGED_STATE" + +// ManagedState returns the raw value of DATABRICKS_BUNDLE_MANAGED_STATE if +// set. Callers that only need a bool should use IsManagedState. +func ManagedState(ctx context.Context) (string, bool) { + return get(ctx, []string{managedStateVariable}) +} + +// IsManagedState reports whether the DATABRICKS_BUNDLE_MANAGED_STATE +// environment variable is set to a truthy value. +func IsManagedState(ctx context.Context) bool { + v, ok := envlib.GetBool(ctx, managedStateVariable) + return ok && v +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index 7b3ce12df64..943d4902cd1 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -23,7 +23,11 @@ import ( func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions, engine engine.EngineType) { log.Info(ctx, "Phase: bind") - dl := lock.NewDeploymentLock(b, lock.GoalBind) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalBind) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return @@ -126,7 +130,11 @@ func jsonDump(ctx context.Context, v any, field string) string { func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, resourceKey string, engine engine.EngineType) { log.Info(ctx, "Phase: unbind") - dl := lock.NewDeploymentLock(b, lock.GoalUnbind) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalUnbind) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index d23b9ac2c91..f0cceda3c92 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -131,7 +131,11 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - dl := lock.NewDeploymentLock(b, lock.GoalDeploy) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDeploy) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 741a30c99c0..27ddc2cb725 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -120,7 +120,11 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } - dl := lock.NewDeploymentLock(b, lock.GoalDestroy) + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDestroy) + if err != nil { + logdiag.LogError(ctx, err) + return + } if err := dl.Acquire(ctx); err != nil { logdiag.LogError(ctx, err) return diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 00000000000..277dd1e112e --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,405 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + sdktime "github.com/databricks/databricks-sdk-go/common/types/time" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// deploymentMetadata holds in-memory state for the deployment metadata +// service. One instance lives inside each FakeWorkspace so tests can drive +// CRUD against the DMS routes the same way they drive jobs/apps/etc. +type deploymentMetadata struct { + // deployments keyed by deployment_id. + deployments map[string]sdkbundle.Deployment + + // versions keyed by "deploymentId/versionId". + versions map[string]sdkbundle.Version + + // operations keyed by "deploymentId/versionId/resourceKey". + operations map[string]sdkbundle.Operation + + // resources keyed by "deploymentId/resourceKey". + resources map[string]sdkbundle.Resource + + // lockHolder maps deploymentId -> the full version name that holds the + // lock (e.g. "deployments/{id}/versions/{vid}"). Absent when no lock is + // held. + lockHolder map[string]string + + // lockExpiry maps deploymentId -> when the lock expires; checked against + // time.Now() on every lock-acquiring or lock-respecting call. + lockExpiry map[string]time.Time +} + +func newDeploymentMetadata() *deploymentMetadata { + return &deploymentMetadata{ + deployments: map[string]sdkbundle.Deployment{}, + versions: map[string]sdkbundle.Version{}, + operations: map[string]sdkbundle.Operation{}, + resources: map[string]sdkbundle.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +// lockDuration matches the real service's default lease so heartbeat-renewal +// tests have a comfortable margin. +const lockDuration = 2 * time.Minute + +func nowPtr() *sdktime.Time { + return sdktime.New(time.Now().UTC()) +} + +func toSDKTime(t time.Time) *sdktime.Time { + return sdktime.New(t.UTC()) +} + +func badRequest(msg string) Response { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": msg}, + } +} + +func notFound(msg string) Response { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": msg}, + } +} + +func aborted(msg string) Response { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": msg}, + } +} + +// DeploymentMetadataCreateDeployment is mounted at +// POST /api/2.0/bundle/deployments. The SDK sends the inner Deployment as the +// body and passes deployment_id as a query parameter. +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + deploymentID := req.URL.Query().Get("deployment_id") + if deploymentID == "" { + return badRequest("deployment_id is required") + } + + var body sdkbundle.Deployment + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ALREADY_EXISTS", + "message": fmt.Sprintf("deployment %s already exists", deploymentID), + }, + } + } + + now := nowPtr() + dep := sdkbundle.Deployment{ + Name: "deployments/" + deploymentID, + DisplayName: deploymentID, + TargetName: body.TargetName, + Status: sdkbundle.DeploymentStatusDeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: now, + UpdateTime: now, + } + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataGetDeployment is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.deploymentMetadata.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + return Response{Body: dep} +} + +// DeploymentMetadataDeleteDeployment is mounted at +// DELETE /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + now := nowPtr() + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusDeleted + dep.DestroyTime = now + dep.DestroyedBy = s.CurrentUser().UserName + dep.UpdateTime = now + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataCreateVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions. Body = Version, +// query = version_id. Validates monotonic version IDs and enforces the +// deployment-level lock. +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + versionID := req.URL.Query().Get("version_id") + if versionID == "" { + return badRequest("version_id is required") + } + + var body sdkbundle.Version + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + // Enforce monotonic versions: version_id must equal last_version_id + 1. + expected := "1" + if dep.LastVersionId != "" { + n, err := strconv.ParseInt(dep.LastVersionId, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "stored last_version_id is not a valid number: " + dep.LastVersionId, + }, + } + } + expected = strconv.FormatInt(n+1, 10) + } + if versionID != expected { + return aborted(fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", + expected, versionID)) + } + + // Enforce lock: if a lock is held and not expired, reject. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if exp, ok := state.lockExpiry[deploymentID]; ok && exp.After(now) { + return aborted(fmt.Sprintf("deployment is locked by %s until %s", + holder, exp.Format(time.RFC3339))) + } + } + + versionKey := deploymentID + "/" + versionID + createTime := toSDKTime(now) + version := sdkbundle.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionId: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: createTime, + Status: sdkbundle.VersionStatusVersionStatusInProgress, + CliVersion: body.CliVersion, + VersionType: body.VersionType, + TargetName: body.TargetName, + } + state.versions[versionKey] = version + + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = now.Add(lockDuration) + + dep.LastVersionId = versionID + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusInProgress + dep.UpdateTime = createTime + state.deployments[deploymentID] = dep + + return Response{Body: version} +} + +// DeploymentMetadataGetVersion is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}. +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + versionKey := deploymentID + "/" + versionID + version, ok := s.deploymentMetadata.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + return Response{Body: version} +} + +// DeploymentMetadataHeartbeat is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat. +// Validates the version is in-progress and holds the lock, then resets the +// lock expiry. +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(_ Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is no longer in progress") + } + + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return aborted("lock is not held by this version") + } + + now := time.Now().UTC() + expiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = expiry + return Response{Body: sdkbundle.HeartbeatResponse{ExpireTime: toSDKTime(expiry)}} +} + +// DeploymentMetadataCompleteVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete. +// Tests can inject a simulated failure by setting the deployment's target_name +// to "fail-complete": the endpoint returns a 500 so the caller exercises its +// "lock release failed" path. +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + if dep, ok := state.deployments[deploymentID]; ok && dep.TargetName == "fail-complete" { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "simulated complete version failure", + }, + } + } + + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is already completed") + } + + var body sdkbundle.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + + now := nowPtr() + version.Status = sdkbundle.VersionStatusVersionStatusCompleted + version.CompleteTime = now + version.CompletionReason = body.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + if dep, ok := state.deployments[deploymentID]; ok { + switch body.CompletionReason { + case sdkbundle.VersionCompleteVersionCompleteSuccess: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusActive + case sdkbundle.VersionCompleteVersionCompleteFailure, + sdkbundle.VersionCompleteVersionCompleteForceAbort, + sdkbundle.VersionCompleteVersionCompleteLeaseExpired: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusFailed + } + dep.UpdateTime = now + state.deployments[deploymentID] = dep + } + + return Response{Body: version} +} + +// DeploymentMetadataCreateOperation is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations. +// Records the operation and upserts the deployment-level Resource so a +// follow-up ListResources sees the merged view. +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + resourceKey := req.URL.Query().Get("resource_key") + if resourceKey == "" { + return badRequest("resource_key is required") + } + + var body sdkbundle.Operation + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + now := nowPtr() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + op := sdkbundle.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: now, + ActionType: body.ActionType, + State: body.State, + ResourceId: body.ResourceId, + Status: body.Status, + ErrorMessage: body.ErrorMessage, + } + state.operations[opKey] = op + + resKey := deploymentID + "/" + resourceKey + state.resources[resKey] = sdkbundle.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + State: body.State, + ResourceId: body.ResourceId, + LastActionType: body.ActionType, + LastVersionId: versionID, + } + + return Response{Body: op} +} + +// DeploymentMetadataListResources is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/resources. +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []sdkbundle.Resource + for key, r := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, r) + } + } + if resources == nil { + resources = []sdkbundle.Resource{} + } + return Response{Body: sdkbundle.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index 4870e25e07c..07876994219 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -185,6 +185,10 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + // deploymentMetadata is the in-memory bundle deployment metadata service + // state. Accessed via the DeploymentMetadata* methods. + deploymentMetadata *deploymentMetadata } func (s *FakeWorkspace) LockUnlock() func() { @@ -314,6 +318,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadata(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index ed405470f20..0225f47311e 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -993,4 +993,33 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Bundle deployment metadata service. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) } From d5b230789748f41d004d05ecd6240974134c7e1a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 28 May 2026 00:45:38 +0000 Subject: [PATCH 3/6] bundle: read deployment state from the deployment metadata service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Step 4 of the DMS CLI integration plan. When DATABRICKS_BUNDLE_MANAGED_STATE is set, resource state is now fetched from the deployment metadata service on the server rather than the workspace-side resources.json file: - statemgmt/state_dms.go: new LoadStateFromDMS reads resources via the SDK Bundle.ListResourcesAll and seeds the in-memory state DB via DeploymentState.OpenWithData (no local file is touched). - statemgmt/state_pull.go: when DMS is active, pull only the deployment_id pointer from managed_service.json and short-circuit. The full state is loaded later via LoadStateFromDMS so file-state lineage/serial logic stays off the DMS path entirely. - statemgmt/state_push.go: no-op under DMS — the server owns the state, there is nothing useful to upload. - cmd/bundle/utils/process.go: route through LoadStateFromDMS instead of StateDB.Open when DMS is on; reject --plan under DMS because plan-vs-state lineage checks don't translate to the server's version-based locking. - bundle/bundle.go: add Bundle.DeploymentID, the in-memory carrier for the DMS deployment record id (populated by either state pull or lock acquire). - statemgmt/managed_service_json.go: promote managed_service.json constant and struct out of the lock package so the state-read path can share them. - deploy/lock/deployment_metadata_service.go: switch to the exported names and publish b.DeploymentID after acquiring the lock so subsequent reads in the same process see it without re-reading the file. Operation reporting (writing per-resource operations back to DMS during deploy) is intentionally deferred to step 5; today the resources list returned by ListResources is empty until that lands. The release-lock-error acceptance fixture loses the "Updating deployment state..." line because PushResourcesState is now a no-op under DMS. A new plan-and-summary acceptance test exercises the full read path end-to-end and verifies both bundle plan and bundle summary hit ListResources. --- .../dms/plan-and-summary/databricks.yml | 7 ++ .../bundle/dms/plan-and-summary/out.test.toml | 4 ++ .../bundle/dms/plan-and-summary/output.txt | 61 ++++++++++++++++++ acceptance/bundle/dms/plan-and-summary/script | 17 +++++ .../bundle/dms/plan-and-summary/test.toml | 10 +++ .../bundle/dms/release-lock-error/output.txt | 1 - bundle/bundle.go | 7 ++ .../lock/deployment_metadata_service.go | 34 ++++------ bundle/statemgmt/managed_service_json.go | 18 ++++++ bundle/statemgmt/state_dms.go | 64 +++++++++++++++++++ bundle/statemgmt/state_pull.go | 50 +++++++++++++++ bundle/statemgmt/state_push.go | 10 +++ cmd/bundle/utils/process.go | 30 +++++++-- 13 files changed, 287 insertions(+), 26 deletions(-) create mode 100644 acceptance/bundle/dms/plan-and-summary/databricks.yml create mode 100644 acceptance/bundle/dms/plan-and-summary/out.test.toml create mode 100644 acceptance/bundle/dms/plan-and-summary/output.txt create mode 100644 acceptance/bundle/dms/plan-and-summary/script create mode 100644 acceptance/bundle/dms/plan-and-summary/test.toml create mode 100644 bundle/statemgmt/managed_service_json.go create mode 100644 bundle/statemgmt/state_dms.go diff --git a/acceptance/bundle/dms/plan-and-summary/databricks.yml b/acceptance/bundle/dms/plan-and-summary/databricks.yml new file mode 100644 index 00000000000..9529e5b8c9b --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-plan-and-summary + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/plan-and-summary/out.test.toml b/acceptance/bundle/dms/plan-and-summary/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/plan-and-summary/output.txt b/acceptance/bundle/dms/plan-and-summary/output.txt new file mode 100644 index 00000000000..014eaa411ae --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/output.txt @@ -0,0 +1,61 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-plan-and-summary/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle plan +create jobs.test_job + +Plan: 1 to add, 0 to change, 0 to delete, 0 unchanged + +>>> [CLI] bundle summary +Name: dms-plan-and-summary +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/dms-plan-and-summary/default +Resources: + Jobs: + test_job: + Name: test-job + URL: (not deployed) + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} diff --git a/acceptance/bundle/dms/plan-and-summary/script b/acceptance/bundle/dms/plan-and-summary/script new file mode 100644 index 00000000000..8b2d0def20c --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/script @@ -0,0 +1,17 @@ +# Deploy first so managed_service.json exists with a deployment_id. +# Step 5 will report individual operations here; for now DMS only knows the +# deployment exists. +trace $CLI bundle deploy + +# bundle plan should hit DMS via ListResources. With no operations reported +# yet, the resource list is empty and the job is planned as a create. +trace $CLI bundle plan + +# bundle summary should also read from DMS. ListResources is empty, so the +# job renders as (not deployed). +trace $CLI bundle summary + +# Verify the metadata-service calls. Two ListResources lines confirm that +# both plan and summary went through the DMS read path. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/plan-and-summary/test.toml b/acceptance/bundle/dms/plan-and-summary/test.toml new file mode 100644 index 00000000000..3ca0272e034 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/test.toml @@ -0,0 +1,10 @@ +Ignore = [".databricks"] + +# Verify the state-read path against the deployment metadata service: +# - bundle deploy populates managed_service.json with a deployment_id. +# - bundle plan / summary read state from DMS via ListResources rather than +# from the workspace resources.json file. +# +# Step 4 does not yet ship operation reporting (that is step 5), so the DMS +# resources list is empty after deploy. The test still exercises every +# state-read code path end-to-end. diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt index 476ba422ef5..076bbd86f12 100644 --- a/acceptance/bundle/dms/release-lock-error/output.txt +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -2,7 +2,6 @@ >>> [CLI] bundle deploy Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... Deploying resources... -Updating deployment state... Deployment complete! Warn: Failed to release deployment lock: simulated complete version failure diff --git a/bundle/bundle.go b/bundle/bundle.go index e7eef14b907..ec012b2a031 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -141,6 +141,13 @@ type Bundle struct { // (direct only) deployment implementation and state DeploymentBundle direct.DeploymentBundle + // DeploymentID identifies the DMS-side deployment record for this bundle. + // Populated from the workspace managed_service.json during state pull when + // DATABRICKS_BUNDLE_MANAGED_STATE is set, and by the lock package after + // CreateDeployment. Empty when the deployment metadata service is not in + // use, or when DMS is enabled but no prior deployment exists. + DeploymentID string + // if true, we skip approval checks for deploy, destroy resources and delete // files AutoApprove bool diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go index afb91ef5f79..052c3310cca 100644 --- a/bundle/deploy/lock/deployment_metadata_service.go +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -14,6 +14,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/internal/build" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -26,18 +27,6 @@ import ( // renews the DMS-side lock lease while a deployment is in progress. const defaultHeartbeatInterval = 30 * time.Second -// managedServiceFileName is the workspace state file where the lock package -// persists the DMS deployment_id across CLI invocations. It is intentionally -// scoped to this package for now; once the state-from-DMS path lands the -// file (and accompanying struct) will move to bundle/statemgmt so both the -// lock and state managers can share it. -const managedServiceFileName = "managed_service.json" - -// managedServiceJSON is the on-disk shape of managedServiceFileName. -type managedServiceJSON struct { - DeploymentID string `json:"deployment_id"` -} - // metadataServiceLock implements DeploymentLock against the bundle deployment // metadata service (DMS). The lock is acquired by creating a new Version // under the deployment; a background goroutine renews the lock lease via @@ -91,6 +80,9 @@ func (l *metadataServiceLock) Acquire(ctx context.Context) error { l.deploymentID = deploymentID l.versionID = versionID + // Publish the deployment ID on the bundle so downstream code (e.g. + // statemgmt.LoadStateFromDMS) can address the right server-side record. + l.b.DeploymentID = deploymentID l.stopHeartbeat = startHeartbeat(ctx, l.svc, deploymentID, versionID) log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, versionID) @@ -218,23 +210,23 @@ func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, bool, e return "", false, fmt.Errorf("failed to create state filer: %w", err) } - reader, readErr := f.Read(ctx, managedServiceFileName) + reader, readErr := f.Read(ctx, statemgmt.ManagedServiceFileName) if readErr == nil { defer reader.Close() data, err := io.ReadAll(reader) if err != nil { - return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, err) + return "", false, fmt.Errorf("failed to read %s: %w", statemgmt.ManagedServiceFileName, err) } - var sj managedServiceJSON + var sj statemgmt.ManagedServiceJSON if err := json.Unmarshal(data, &sj); err != nil { - return "", false, fmt.Errorf("failed to parse %s: %w", managedServiceFileName, err) + return "", false, fmt.Errorf("failed to parse %s: %w", statemgmt.ManagedServiceFileName, err) } if sj.DeploymentID != "" { return sj.DeploymentID, false, nil } // File exists but has no deployment_id — treat as fresh. } else if !errors.Is(readErr, fs.ErrNotExist) { - return "", false, fmt.Errorf("failed to read %s: %w", managedServiceFileName, readErr) + return "", false, fmt.Errorf("failed to read %s: %w", statemgmt.ManagedServiceFileName, readErr) } return uuid.New().String(), true, nil @@ -245,13 +237,13 @@ func writeDeploymentID(ctx context.Context, b *bundle.Bundle, deploymentID strin if err != nil { return fmt.Errorf("failed to create state filer: %w", err) } - data, err := json.Marshal(managedServiceJSON{DeploymentID: deploymentID}) + data, err := json.Marshal(statemgmt.ManagedServiceJSON{DeploymentID: deploymentID}) if err != nil { - return fmt.Errorf("failed to marshal %s: %w", managedServiceFileName, err) + return fmt.Errorf("failed to marshal %s: %w", statemgmt.ManagedServiceFileName, err) } - if err := f.Write(ctx, managedServiceFileName, bytes.NewReader(data), + if err := f.Write(ctx, statemgmt.ManagedServiceFileName, bytes.NewReader(data), filer.CreateParentDirectories, filer.OverwriteIfExists); err != nil { - return fmt.Errorf("failed to write %s: %w", managedServiceFileName, err) + return fmt.Errorf("failed to write %s: %w", statemgmt.ManagedServiceFileName, err) } return nil } diff --git a/bundle/statemgmt/managed_service_json.go b/bundle/statemgmt/managed_service_json.go new file mode 100644 index 00000000000..09d9703d618 --- /dev/null +++ b/bundle/statemgmt/managed_service_json.go @@ -0,0 +1,18 @@ +package statemgmt + +// ManagedServiceFileName is the workspace state-directory file that records the +// deployment metadata service (DMS) deployment_id for the bundle. The file +// pins this bundle to a server-side deployment record across CLI invocations. +// It is created by the lock package after the first CreateDeployment succeeds +// and is read by both the lock package (when re-acquiring the lock) and the +// statemgmt package (when loading resource state from DMS). +// +// resources.json continues to be written by the deploy path so that operators +// who turn DATABRICKS_BUNDLE_MANAGED_STATE off again still have a usable local +// state file. The fallback path is intentional, not accidental. +const ManagedServiceFileName = "managed_service.json" + +// ManagedServiceJSON is the on-disk shape of ManagedServiceFileName. +type ManagedServiceJSON struct { + DeploymentID string `json:"deployment_id"` +} diff --git a/bundle/statemgmt/state_dms.go b/bundle/statemgmt/state_dms.go new file mode 100644 index 00000000000..5804e35bcb1 --- /dev/null +++ b/bundle/statemgmt/state_dms.go @@ -0,0 +1,64 @@ +package statemgmt + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/direct/dstate" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// LoadStateFromDMS populates the in-memory DeploymentState DB from the +// deployment metadata service for the deployment identified by +// b.DeploymentID. The state is opened in read mode via OpenWithData; the +// historical local resources.json file is never touched on the DMS path. +// +// The path passed to OpenWithData is the local resources.json path: it is +// only used for diagnostics and as the eventual write target if the deploy +// path later upgrades the state to write mode (step 5 territory; today no +// callers do that under DMS). +// +// When b.DeploymentID is empty the function is a no-op: this is the +// "DMS enabled but no prior deployment" case, where the state is genuinely +// empty until the lock package creates the deployment. +func LoadStateFromDMS(ctx context.Context, b *bundle.Bundle) error { + if b.DeploymentID == "" { + // Initialize an empty state so subsequent reads (e.g. ExportState) + // don't panic on an unopened DB. + _, localPath := b.StateFilenameDirect(ctx) + b.DeploymentBundle.StateDB.OpenWithData(localPath, dstate.NewDatabase("", 0)) + return nil + } + + w := b.WorkspaceClient(ctx) + resources, err := w.Bundle.ListResourcesAll(ctx, sdkbundle.ListResourcesRequest{ + Parent: "deployments/" + b.DeploymentID, + }) + if err != nil { + return fmt.Errorf("failed to list resources from deployment metadata service: %w", err) + } + + data := dstate.NewDatabase("", 0) + for _, r := range resources { + // DMS reports resource keys without the "resources." prefix (e.g. + // "jobs.foo"); the local state DB uses the fully-qualified form + // ("resources.jobs.foo") as its map key, so prepend it here. + stateKey := "resources." + r.ResourceKey + + var stateBytes json.RawMessage + if r.State != nil { + stateBytes = *r.State + } + + data.State[stateKey] = dstate.ResourceEntry{ + ID: r.ResourceId, + State: stateBytes, + } + } + + _, localPath := b.StateFilenameDirect(ctx) + b.DeploymentBundle.StateDB.OpenWithData(localPath, data) + return nil +} diff --git a/bundle/statemgmt/state_pull.go b/bundle/statemgmt/state_pull.go index 7e62bb84967..8fef9ef5957 100644 --- a/bundle/statemgmt/state_pull.go +++ b/bundle/statemgmt/state_pull.go @@ -16,6 +16,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -219,6 +220,23 @@ func readStates(ctx context.Context, b *bundle.Bundle, alwaysPull AlwaysPull) [] directLocalState := localRead(ctx, localPathDirect, engine.EngineDirect) terraformLocalState := localRead(ctx, localPathTerraform, engine.EngineTerraform) + // When the deployment metadata service is enabled, resource state lives on + // the server. Pull only the deployment-id pointer from the workspace; the + // state itself is fetched later via LoadStateFromDMS. Returning nil here + // causes PullResourcesState to fall through to its "no states found" path, + // which yields an empty StateDesc — exactly what we want, since DMS state + // is loaded separately and does not participate in the local/remote + // lineage-and-serial winner-picking. + if env.IsManagedState(ctx) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + logdiag.LogError(ctx, err) + return nil + } + b.DeploymentID = readDeploymentID(ctx, f) + return nil + } + if (directLocalState == nil && terraformLocalState == nil) || alwaysPull { f, err := deploy.StateFiler(ctx, b) if err != nil { @@ -294,6 +312,38 @@ func logStatesWarning(ctx context.Context, msg string, states []*StateDesc) { logStatesDiag(ctx, diag.Warning, msg, states) } +// readDeploymentID returns the DMS deployment_id stored in the workspace +// state directory's managed_service.json. An absent file or a missing/empty +// deployment_id is not an error: it means "no prior deployment exists yet" +// and the caller should proceed with an empty state. Read or parse failures +// other than fs.ErrNotExist are logged at debug level and ignored — they are +// recoverable on the next CLI invocation (the lock package will rewrite the +// file after CreateDeployment). +func readDeploymentID(ctx context.Context, f filer.Filer) string { + reader, err := f.Read(ctx, ManagedServiceFileName) + if errors.Is(err, fs.ErrNotExist) { + return "" + } + if err != nil { + log.Debugf(ctx, "Failed to read %s for deployment ID: %v", ManagedServiceFileName, err) + return "" + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + log.Debugf(ctx, "Failed to read %s content: %v", ManagedServiceFileName, err) + return "" + } + + var sj ManagedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + log.Debugf(ctx, "Failed to parse %s: %v", ManagedServiceFileName, err) + return "" + } + return sj.DeploymentID +} + func logStatesDiag(ctx context.Context, severity diag.Severity, msg string, states []*StateDesc) { var stateStrs []string for _, state := range states { diff --git a/bundle/statemgmt/state_push.go b/bundle/statemgmt/state_push.go index f098e8a07cc..ce225068d99 100644 --- a/bundle/statemgmt/state_push.go +++ b/bundle/statemgmt/state_push.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -16,7 +17,16 @@ import ( ) // PushResourcesState uploads the local state file to the remote location. +// When the deployment metadata service is enabled the call is a no-op: DMS +// owns resource state on the server, so there is no local state file worth +// uploading. (Step 5 will stop the deploy path from writing local state in +// the first place; until then the local file is written and then ignored.) func PushResourcesState(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { + if env.IsManagedState(ctx) { + log.Debugf(ctx, "Skipping state push: DATABRICKS_BUNDLE_MANAGED_STATE is set, DMS owns the state") + return + } + f, err := deploy.StateFiler(ctx, b) if err != nil { logdiag.LogError(ctx, err) diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index 5f43cff6acd..ae9c222eb4f 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" "github.com/databricks/cli/bundle/direct/dstate" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/cmd/root" @@ -188,10 +189,21 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle // Open direct engine state once for all subsequent operations (ExportState, CalculatePlan, Apply, etc.) needDirectState := stateDesc.Engine.IsDirect() && (opts.InitIDs || opts.ErrorOnEmptyState || opts.Deploy || opts.ReadPlanPath != "" || opts.PreDeployChecks || opts.PostStateFunc != nil) if needDirectState { - _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { - logdiag.LogError(ctx, err) - return b, stateDesc, root.ErrAlreadyPrinted + if env.IsManagedState(ctx) { + // Under DMS the StateDB is populated from the server, not from + // the local file. b.DeploymentID may be empty here (no prior + // deployment); LoadStateFromDMS handles that by initialising + // an empty in-memory DB. + if err := statemgmt.LoadStateFromDMS(ctx, b); err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } + } else { + _, localPath := b.StateFilenameDirect(ctx) + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } } } @@ -233,6 +245,16 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle logdiag.LogError(ctx, errors.New("--plan is only supported with direct engine (set bundle.engine to \"direct\" or DATABRICKS_BUNDLE_ENGINE=direct)")) return b, stateDesc, root.ErrAlreadyPrinted } + if env.IsManagedState(ctx) { + // --plan persists a serial/lineage snapshot from the local state + // DB and validates it on apply. Under DMS the server is the + // single source of truth and acquires its own version-based lock + // on CreateVersion, so the local snapshot is meaningless. Reject + // the flag explicitly rather than silently accept it and risk an + // inconsistent deploy. + logdiag.LogError(ctx, errors.New("--plan is not supported with the deployment metadata service")) + return b, stateDesc, root.ErrAlreadyPrinted + } opts.Build = false opts.PreDeployChecks = false From c91ec1bb77897c6b2195ec206a5deb47a8ee5037 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 28 May 2026 00:48:16 +0000 Subject: [PATCH 4/6] bundle/statemgmt: unit-test LoadStateFromDMS with a mock workspace client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds three table-style tests: * NoDeploymentID — empty b.DeploymentID short-circuits without any RPC and leaves the in-memory state DB safe to read (ExportState returns empty). * PopulatesFromList — resources land under "resources." with their ID and State preserved verbatim; the nil-State path is exercised too. * ListError — DMS-side failures are wrapped with the package-level prefix so callers can tell them apart from local-filesystem errors. Together these cover the three branches a code reviewer would otherwise have to read the source to verify. --- bundle/statemgmt/state_dms_test.go | 100 +++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 bundle/statemgmt/state_dms_test.go diff --git a/bundle/statemgmt/state_dms_test.go b/bundle/statemgmt/state_dms_test.go new file mode 100644 index 00000000000..e1535bd7edf --- /dev/null +++ b/bundle/statemgmt/state_dms_test.go @@ -0,0 +1,100 @@ +package statemgmt + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// newTestBundle returns a Bundle wired with a mock workspace client and the +// workspace fields that StateFilenameDirect needs (CurrentUser, RootPath). +func newTestBundle(t *testing.T) (*bundle.Bundle, *mocks.MockWorkspaceClient) { + b := &bundle.Bundle{ + BundleRootPath: t.TempDir(), + Config: config.Root{ + Bundle: config.Bundle{Target: "default"}, + Workspace: config.Workspace{ + StatePath: "/Workspace/Users/me@example.com/.bundle/test/default/state", + }, + }, + } + m := mocks.NewMockWorkspaceClient(t) + b.SetWorkpaceClient(m.WorkspaceClient) + return b, m +} + +// TestLoadStateFromDMS_NoDeploymentID covers the "DMS enabled but the +// workspace has no managed_service.json yet" case: the state DB should be +// initialised empty (no panic on later ExportState) and no API call should be +// made. +func TestLoadStateFromDMS_NoDeploymentID(t *testing.T) { + b, _ := newTestBundle(t) + // b.DeploymentID stays empty. + + err := LoadStateFromDMS(t.Context(), b) + require.NoError(t, err) + + // State is initialised but empty — Export should succeed and return no entries. + got := b.DeploymentBundle.ExportState(t.Context()) + assert.Empty(t, got) +} + +// TestLoadStateFromDMS_PopulatesFromList confirms that resources reported by +// ListResources land in the in-memory state DB under the fully-qualified +// "resources." form, and that per-resource State payloads are preserved +// verbatim. +func TestLoadStateFromDMS_PopulatesFromList(t *testing.T) { + b, m := newTestBundle(t) + b.DeploymentID = "dep-123" + + jobState := json.RawMessage(`{"name":"my-job","max_concurrent_runs":1}`) + mockBundle := m.GetMockBundleAPI() + mockBundle.EXPECT(). + ListResourcesAll(mock.Anything, sdkbundle.ListResourcesRequest{ + Parent: "deployments/dep-123", + }). + Return([]sdkbundle.Resource{ + {ResourceKey: "jobs.foo", ResourceId: "1001", State: &jobState}, + // State omitted — exercises the nil-state path. + {ResourceKey: "pipelines.bar", ResourceId: "p-1"}, + }, nil) + + err := LoadStateFromDMS(t.Context(), b) + require.NoError(t, err) + + job, ok := b.DeploymentBundle.StateDB.GetResourceEntry("resources.jobs.foo") + require.True(t, ok) + assert.Equal(t, "1001", job.ID) + assert.JSONEq(t, string(jobState), string(job.State)) + + pipeline, ok := b.DeploymentBundle.StateDB.GetResourceEntry("resources.pipelines.bar") + require.True(t, ok) + assert.Equal(t, "p-1", pipeline.ID) + assert.Empty(t, pipeline.State) +} + +// TestLoadStateFromDMS_ListError checks that an underlying API failure is +// wrapped and surfaced rather than swallowed (otherwise the deploy would +// proceed against an empty in-memory view and treat everything as a create). +func TestLoadStateFromDMS_ListError(t *testing.T) { + b, m := newTestBundle(t) + b.DeploymentID = "dep-456" + + mockBundle := m.GetMockBundleAPI() + mockBundle.EXPECT(). + ListResourcesAll(mock.Anything, mock.Anything). + Return(nil, errors.New("boom")) + + err := LoadStateFromDMS(t.Context(), b) + require.Error(t, err) + assert.ErrorContains(t, err, "boom") + assert.ErrorContains(t, err, "failed to list resources from deployment metadata service") +} From 384a97083f797abae6d3eb007a36b5abe228fe55 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 28 May 2026 01:07:03 +0000 Subject: [PATCH 5/6] bundle: report deploy operations to the deployment metadata service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire per-resource Create/Update/Delete outcomes from the direct engine through to the deployment metadata service (DMS), completing step 5 of the DMS CLI integration plan. * bundle/deploy/lock/operation_reporter.go — new asyncReporter ships operationEvents to DMS from a single sender goroutine fed by a buffered channel (size matches direct.defaultParallelism so apply workers rarely block). planActionToOperationAction maps the deploy-plan action enum onto sdkbundle.OperationActionType. * bundle/deploy/lock/operation_reporter_test.go — exhaustive mapping test covering every ActionType (incl. Skip → empty, Undefined → error). * bundle/deploy/lock/deployment_metadata_service.go — start the reporter on Acquire and Close() it in Release before CompleteVersion so all per-resource events land under the right version_id. * bundle/direct/pkg.go — new OperationReporter type and DeploymentBundle field; left nil when DMS is off so the file-state path is unaffected. * bundle/direct/bundle_apply.go — call the reporter inline after each Create/Update/Delete (success or failure); skip-actions are filtered out at the sender, deletes report resource_id only, failures carry the error message and no state payload. * bundle/deployplan/plan.go, bundle/direct/bundle_plan.go — comment-only notes that Lineage/Serial don't apply under DMS. * acceptance/bundle/dms/{add-resources,sequential-deploys,deploy-error} — three new acceptance tests exercising CREATE, DELETE, and FAILED operations end-to-end against the fake DMS server. * acceptance/bundle/dms/plan-and-summary — updated golden output now that the deploy reports a CREATE, so plan/summary see the resource already deployed. * acceptance/bin/print_requests.py — open recorded-requests file with utf-8 to handle non-ASCII bytes in the trace. Co-authored-by: Isaac --- acceptance/bin/print_requests.py | 2 +- .../bundle/dms/add-resources/databricks.yml | 7 + .../bundle/dms/add-resources/out.test.toml | 4 + .../bundle/dms/add-resources/output.txt | 127 ++++++++++++ acceptance/bundle/dms/add-resources/script | 34 ++++ acceptance/bundle/dms/add-resources/test.toml | 6 + .../bundle/dms/deploy-error/databricks.yml | 7 + .../bundle/dms/deploy-error/out.test.toml | 4 + acceptance/bundle/dms/deploy-error/output.txt | 56 ++++++ acceptance/bundle/dms/deploy-error/script | 10 + acceptance/bundle/dms/deploy-error/test.toml | 10 + .../bundle/dms/plan-and-summary/output.txt | 32 ++- .../bundle/dms/plan-and-summary/test.toml | 6 +- .../bundle/dms/release-lock-error/output.txt | 26 +++ .../dms/sequential-deploys/databricks.yml | 7 + .../dms/sequential-deploys/out.test.toml | 4 + .../bundle/dms/sequential-deploys/output.txt | 169 ++++++++++++++++ .../bundle/dms/sequential-deploys/script | 36 ++++ .../bundle/dms/sequential-deploys/test.toml | 6 + .../lock/deployment_metadata_service.go | 15 ++ bundle/deploy/lock/operation_reporter.go | 182 ++++++++++++++++++ bundle/deploy/lock/operation_reporter_test.go | 78 ++++++++ bundle/deployplan/plan.go | 14 +- bundle/direct/bundle_apply.go | 51 +++++ bundle/direct/bundle_plan.go | 4 + bundle/direct/pkg.go | 21 ++ 26 files changed, 905 insertions(+), 13 deletions(-) create mode 100644 acceptance/bundle/dms/add-resources/databricks.yml create mode 100644 acceptance/bundle/dms/add-resources/out.test.toml create mode 100644 acceptance/bundle/dms/add-resources/output.txt create mode 100644 acceptance/bundle/dms/add-resources/script create mode 100644 acceptance/bundle/dms/add-resources/test.toml create mode 100644 acceptance/bundle/dms/deploy-error/databricks.yml create mode 100644 acceptance/bundle/dms/deploy-error/out.test.toml create mode 100644 acceptance/bundle/dms/deploy-error/output.txt create mode 100644 acceptance/bundle/dms/deploy-error/script create mode 100644 acceptance/bundle/dms/deploy-error/test.toml create mode 100644 acceptance/bundle/dms/sequential-deploys/databricks.yml create mode 100644 acceptance/bundle/dms/sequential-deploys/out.test.toml create mode 100644 acceptance/bundle/dms/sequential-deploys/output.txt create mode 100644 acceptance/bundle/dms/sequential-deploys/script create mode 100644 acceptance/bundle/dms/sequential-deploys/test.toml create mode 100644 bundle/deploy/lock/operation_reporter.go create mode 100644 bundle/deploy/lock/operation_reporter_test.go diff --git a/acceptance/bin/print_requests.py b/acceptance/bin/print_requests.py index c06449a2482..6e75f495156 100755 --- a/acceptance/bin/print_requests.py +++ b/acceptance/bin/print_requests.py @@ -168,7 +168,7 @@ def main(): if not requests_file.exists(): sys.exit(f"File {requests_file} not found") - with open(requests_file) as fobj: + with open(requests_file, encoding="utf-8") as fobj: data = fobj.read() if not data: diff --git a/acceptance/bundle/dms/add-resources/databricks.yml b/acceptance/bundle/dms/add-resources/databricks.yml new file mode 100644 index 00000000000..22873b2e010 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-add-resources + +resources: + jobs: + job_a: + name: job-a diff --git a/acceptance/bundle/dms/add-resources/out.test.toml b/acceptance/bundle/dms/add-resources/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/add-resources/output.txt b/acceptance/bundle/dms/add-resources/output.txt new file mode 100644 index 00000000000..093756b72ff --- /dev/null +++ b/acceptance/bundle/dms/add-resources/output.txt @@ -0,0 +1,127 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle plan +Plan: 0 to add, 0 to change, 0 to delete, 2 unchanged + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.job_a" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.job_a", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-a", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "jobs.job_b" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.job_b", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-b", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} diff --git a/acceptance/bundle/dms/add-resources/script b/acceptance/bundle/dms/add-resources/script new file mode 100644 index 00000000000..33d205e9257 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/script @@ -0,0 +1,34 @@ +# Deploy a fresh bundle with one job — DMS records a single CREATE operation. +trace $CLI bundle deploy + +# Drop the local cache so the next deploy re-reads state from DMS via ListResources. +rm -rf .databricks + +# Add a second job and redeploy. The new deploy should: +# - read existing state from DMS, +# - skip job_a (no change), +# - create job_b and report exactly one CREATE operation under version 2. +cat > databricks.yml << 'EOF' +bundle: + name: dms-add-resources + +resources: + jobs: + job_a: + name: job-a + job_b: + name: job-b +EOF +trace $CLI bundle deploy + +# Drop the cache one more time and run plan — both resources should show as +# unchanged because their reported state matches the local config. +rm -rf .databricks +trace $CLI bundle plan + +# Inspect the recorded DMS traffic. Expect: +# - Deploy 1: CreateDeployment + CreateVersion(1) + CreateOperation(job_a, CREATE) + CompleteVersion(1) +# - Deploy 2: ListResources + GetDeployment + CreateVersion(2) + CreateOperation(job_b, CREATE) + CompleteVersion(2) +# - Plan: ListResources +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/add-resources/test.toml b/acceptance/bundle/dms/add-resources/test.toml new file mode 100644 index 00000000000..f57dd05b513 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/test.toml @@ -0,0 +1,6 @@ +Ignore = [".databricks"] + +# Exercise DMS-side operation reporting across two deploys: +# - first deploy creates job_a and reports one CREATE operation, +# - second deploy adds job_b without retouching job_a, +# - subsequent plan reads from DMS and sees both jobs already deployed. diff --git a/acceptance/bundle/dms/deploy-error/databricks.yml b/acceptance/bundle/dms/deploy-error/databricks.yml new file mode 100644 index 00000000000..b7e4006775a --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-deploy-error + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/deploy-error/out.test.toml b/acceptance/bundle/dms/deploy-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/deploy-error/output.txt b/acceptance/bundle/dms/deploy-error/output.txt new file mode 100644 index 00000000000..cbb7f1d934a --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/output.txt @@ -0,0 +1,56 @@ + +>>> musterr [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-deploy-error/default/files... +Deploying resources... +Error: cannot create resources.jobs.test_job: Invalid job configuration. (400 INVALID_PARAMETER_VALUE) + +Endpoint: POST [DATABRICKS_URL]/api/2.2/jobs/create +HTTP Status: 400 Bad Request +API error_code: INVALID_PARAMETER_VALUE +API message: Invalid job configuration. + + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "error_message": "Invalid job configuration.", + "resource_id": "", + "resource_key": "jobs.test_job", + "status": "OPERATION_STATUS_FAILED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_FAILURE" + } +} diff --git a/acceptance/bundle/dms/deploy-error/script b/acceptance/bundle/dms/deploy-error/script new file mode 100644 index 00000000000..3c2dfc11408 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/script @@ -0,0 +1,10 @@ +# Deploy with a server-side job creation failure injected via test.toml. +# Expect the CLI to: +# - acquire the DMS lock (CreateDeployment + CreateVersion(1)), +# - attempt the job creation and fail, +# - report a FAILED CREATE operation carrying the error message, +# - release the lock with completion_reason=VERSION_COMPLETE_FAILURE. +trace musterr $CLI bundle deploy + +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/deploy-error/test.toml b/acceptance/bundle/dms/deploy-error/test.toml new file mode 100644 index 00000000000..d3190822d55 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/test.toml @@ -0,0 +1,10 @@ +Ignore = [".databricks"] + +# Inject a server-side failure on job creation and confirm that: +# - the failed operation is reported to DMS with OPERATION_STATUS_FAILED, +# - the lock is released with VERSION_COMPLETE_FAILURE. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.StatusCode = 400 +Response.Body = '{"error_code": "INVALID_PARAMETER_VALUE", "message": "Invalid job configuration."}' diff --git a/acceptance/bundle/dms/plan-and-summary/output.txt b/acceptance/bundle/dms/plan-and-summary/output.txt index 014eaa411ae..d4b7a62cfd4 100644 --- a/acceptance/bundle/dms/plan-and-summary/output.txt +++ b/acceptance/bundle/dms/plan-and-summary/output.txt @@ -5,9 +5,7 @@ Deploying resources... Deployment complete! >>> [CLI] bundle plan -create jobs.test_job - -Plan: 1 to add, 0 to change, 0 to delete, 0 unchanged +Plan: 0 to add, 0 to change, 0 to delete, 1 unchanged >>> [CLI] bundle summary Name: dms-plan-and-summary @@ -19,7 +17,7 @@ Resources: Jobs: test_job: Name: test-job - URL: (not deployed) + URL: [DATABRICKS_URL]/jobs/[NUMID]?o=[NUMID] >>> print_requests.py --get //bundle ^//workspace-files ^//import-file { @@ -44,6 +42,32 @@ Resources: "version_type": "VERSION_TYPE_DEPLOY" } } +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-plan-and-summary/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} { "method": "POST", "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", diff --git a/acceptance/bundle/dms/plan-and-summary/test.toml b/acceptance/bundle/dms/plan-and-summary/test.toml index 3ca0272e034..55257593f46 100644 --- a/acceptance/bundle/dms/plan-and-summary/test.toml +++ b/acceptance/bundle/dms/plan-and-summary/test.toml @@ -5,6 +5,6 @@ Ignore = [".databricks"] # - bundle plan / summary read state from DMS via ListResources rather than # from the workspace resources.json file. # -# Step 4 does not yet ship operation reporting (that is step 5), so the DMS -# resources list is empty after deploy. The test still exercises every -# state-read code path end-to-end. +# With operation reporting (step 5) in place, the deploy reports a CREATE +# operation so the subsequent plan sees the job as already deployed and the +# summary renders its URL. diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt index 076bbd86f12..cc333c6e415 100644 --- a/acceptance/bundle/dms/release-lock-error/output.txt +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -28,6 +28,32 @@ Warn: Failed to release deployment lock: simulated complete version failure "version_type": "VERSION_TYPE_DEPLOY" } } +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} { "method": "POST", "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", diff --git a/acceptance/bundle/dms/sequential-deploys/databricks.yml b/acceptance/bundle/dms/sequential-deploys/databricks.yml new file mode 100644 index 00000000000..7eb80d5965a --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-sequential-deploys + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/sequential-deploys/out.test.toml b/acceptance/bundle/dms/sequential-deploys/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/sequential-deploys/output.txt b/acceptance/bundle/dms/sequential-deploys/output.txt new file mode 100644 index 00000000000..0c6041cc81e --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/output.txt @@ -0,0 +1,169 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "jobs.new_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.new_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "new-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_DELETE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/sequential-deploys/script b/acceptance/bundle/dms/sequential-deploys/script new file mode 100644 index 00000000000..e11211eaa1f --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/script @@ -0,0 +1,36 @@ +# Deploy 1: create test_job. Expect a single CREATE operation under version 1. +trace $CLI bundle deploy +trace print_requests.py --get //bundle ^//workspace-files ^//import-file + +# Deploy 2: add new_job. Expect: +# - ListResources (state read) + GetDeployment (version bump) +# - one CREATE operation for new_job (test_job is unchanged — no SKIP is reported) +# - CompleteVersion 2. +cat > databricks.yml << 'EOF' +bundle: + name: dms-sequential-deploys + +resources: + jobs: + test_job: + name: test-job + new_job: + name: new-job +EOF +trace $CLI bundle deploy +trace print_requests.py --get //bundle ^//workspace-files ^//import-file + +# Deploy 3: drop test_job. Expect one DELETE operation under version 3 with +# the resource_id of the previously created job, no state payload. +cat > databricks.yml << 'EOF' +bundle: + name: dms-sequential-deploys + +resources: + jobs: + new_job: + name: new-job +EOF +trace $CLI bundle deploy +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/sequential-deploys/test.toml b/acceptance/bundle/dms/sequential-deploys/test.toml new file mode 100644 index 00000000000..b55fbcbf6b5 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/test.toml @@ -0,0 +1,6 @@ +Ignore = [".databricks"] + +# Three successive deploys exercise the full operation-reporting taxonomy: +# v1 — CREATE for test_job +# v2 — CREATE for new_job (test_job unchanged, no operation) +# v3 — DELETE for test_job (new_job unchanged) diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go index 052c3310cca..a7af28d87ba 100644 --- a/bundle/deploy/lock/deployment_metadata_service.go +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -40,6 +40,7 @@ type metadataServiceLock struct { versionID string stopHeartbeat context.CancelFunc + reporter *asyncReporter } func newMetadataServiceLock(b *bundle.Bundle, goal Goal) (*metadataServiceLock, error) { @@ -85,6 +86,13 @@ func (l *metadataServiceLock) Acquire(ctx context.Context) error { l.b.DeploymentID = deploymentID l.stopHeartbeat = startHeartbeat(ctx, l.svc, deploymentID, versionID) + // Wire per-resource operation reporting through an async dispatcher so + // apply workers don't synchronously wait on DMS round-trips. The + // reporter is closed in Release before CompleteVersion to ensure all + // operations are recorded under this version. + l.reporter = newAsyncReporter(ctx, makeOperationSender(l.svc, deploymentID, versionID)) + l.b.DeploymentBundle.OperationReporter = l.reporter.Reporter() + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, versionID) return nil } @@ -96,6 +104,13 @@ func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStat l.stopHeartbeat() } + // Drain pending operation reports before completing the version, so all + // per-resource events are recorded under this version_id on the server + // side. Close blocks until the sender goroutine exits. + if l.reporter != nil { + l.reporter.Close() + } + // If Acquire failed before reaching CreateVersion there is nothing to release. if l.svc == nil || l.deploymentID == "" || l.versionID == "" { return nil diff --git a/bundle/deploy/lock/operation_reporter.go b/bundle/deploy/lock/operation_reporter.go new file mode 100644 index 00000000000..d59de5f39ab --- /dev/null +++ b/bundle/deploy/lock/operation_reporter.go @@ -0,0 +1,182 @@ +package lock + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/libs/log" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// asyncReporterBufferSize matches direct.defaultParallelism so that, in the +// worst case, a hard process crash drops at most ~10 unsent operation events. +// The send blocks when the buffer is full, applying backpressure to the apply +// worker pool. +const asyncReporterBufferSize = 10 + +// operationEvent is one resource-apply outcome enqueued for delivery to DMS. +type operationEvent struct { + resourceKey string + resourceID string + action deployplan.ActionType + operationErr error + state json.RawMessage +} + +// asyncReporter ships operationEvents to DMS from a single sender goroutine +// fed by a buffered channel. Apply workers push events and continue; the +// sender drains the channel and logs any DMS-side failures without +// interrupting the deploy. Close blocks until the sender drains. +// +// We send asynchronously because operation reporting is on the hot path of +// every Create/Update/Delete and the DMS call is a network round-trip we +// don't want to serialize the deploy on. The buffer size matches the apply +// parallelism so a single batch of operations can be enqueued without +// blocking; once full, sends apply backpressure to the worker pool. +type asyncReporter struct { + ch chan operationEvent + done chan struct{} + sendFn func(ctx context.Context, ev operationEvent) error + ctx context.Context +} + +// newAsyncReporter starts the sender goroutine. The given ctx is used for all +// DMS API calls; it must outlive individual worker contexts (callers pass the +// deploy-level context for this reason). +func newAsyncReporter(ctx context.Context, sendFn func(context.Context, operationEvent) error) *asyncReporter { + r := &asyncReporter{ + ch: make(chan operationEvent, asyncReporterBufferSize), + done: make(chan struct{}), + sendFn: sendFn, + ctx: ctx, + } + go r.run() + return r +} + +func (r *asyncReporter) run() { + defer close(r.done) + for ev := range r.ch { + if err := r.sendFn(r.ctx, ev); err != nil { + // Reporting failures are intentionally non-fatal: the deploy + // already succeeded (or failed independently), and we don't + // want a DMS hiccup to surface as a deploy error. Matches the + // heartbeat behaviour established in step 3. + log.Warnf(r.ctx, "Failed to report %s operation for %s to DMS: %v", ev.action, ev.resourceKey, err) + } + } +} + +// Reporter returns a direct.OperationReporter that enqueues onto the channel. +// The returned function is safe to call from multiple goroutines and returns +// quickly unless the buffer is full. +func (r *asyncReporter) Reporter() direct.OperationReporter { + return func( + ctx context.Context, + resourceKey, resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, + ) error { + select { + case r.ch <- operationEvent{ + resourceKey: resourceKey, + resourceID: resourceID, + action: action, + operationErr: operationErr, + state: state, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Close signals end-of-input and waits for the sender to drain. +func (r *asyncReporter) Close() { + close(r.ch) + <-r.done +} + +// makeOperationSender returns the synchronous "send one event to DMS" function +// consumed by asyncReporter. Skip actions short-circuit to nil; mapping errors +// and API errors are returned to the caller (asyncReporter logs and continues). +func makeOperationSender(svc sdkbundle.BundleInterface, deploymentID, versionID string) func(context.Context, operationEvent) error { + return func(ctx context.Context, ev operationEvent) error { + // Internal state uses fully-qualified keys like "resources.jobs.foo". + // DMS-side resource keys omit the "resources." prefix. + apiKey := strings.TrimPrefix(ev.resourceKey, "resources.") + + actionType, err := planActionToOperationAction(ev.action) + if err != nil { + return fmt.Errorf("mapping action for resource %s: %w", ev.resourceKey, err) + } + if actionType == "" { + // Skip actions are not reported — there is nothing for DMS to record. + return nil + } + + status := sdkbundle.OperationStatusOperationStatusSucceeded + var errorMessage string + if ev.operationErr != nil { + status = sdkbundle.OperationStatusOperationStatusFailed + errorMessage = ev.operationErr.Error() + } + + op := sdkbundle.Operation{ + ResourceKey: apiKey, + ResourceId: ev.resourceID, + Status: status, + ActionType: actionType, + ErrorMessage: errorMessage, + } + if len(ev.state) > 0 { + s := ev.state + op.State = &s + } + + _, err = svc.CreateOperation(ctx, sdkbundle.CreateOperationRequest{ + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: apiKey, + Operation: op, + }) + if err != nil { + return fmt.Errorf("reporting operation for resource %s: %w", ev.resourceKey, err) + } + return nil + } +} + +// planActionToOperationAction maps a deploy-plan action onto the DMS +// OperationActionType enum. The Skip action is mapped to the empty string so +// the caller can drop it (DMS has no concept of a no-op operation). +// +// Bind / BindAndUpdate / InitialRegister are not currently produced by the +// direct planner, so they are intentionally not mapped here — adding new plan +// actions in the planner without updating this mapping will fail loud at +// runtime (default branch returns an error). +func planActionToOperationAction(action deployplan.ActionType) (sdkbundle.OperationActionType, error) { + switch action { + case deployplan.Skip: + return "", nil + case deployplan.Create: + return sdkbundle.OperationActionTypeOperationActionTypeCreate, nil + case deployplan.Update: + return sdkbundle.OperationActionTypeOperationActionTypeUpdate, nil + case deployplan.UpdateWithID: + return sdkbundle.OperationActionTypeOperationActionTypeUpdateWithId, nil + case deployplan.Delete: + return sdkbundle.OperationActionTypeOperationActionTypeDelete, nil + case deployplan.Recreate: + return sdkbundle.OperationActionTypeOperationActionTypeRecreate, nil + case deployplan.Resize: + return sdkbundle.OperationActionTypeOperationActionTypeResize, nil + default: + return "", fmt.Errorf("unsupported operation action type: %s", action) + } +} diff --git a/bundle/deploy/lock/operation_reporter_test.go b/bundle/deploy/lock/operation_reporter_test.go new file mode 100644 index 00000000000..479b17b0d41 --- /dev/null +++ b/bundle/deploy/lock/operation_reporter_test.go @@ -0,0 +1,78 @@ +package lock + +import ( + "testing" + + "github.com/databricks/cli/bundle/deployplan" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPlanActionToOperationAction(t *testing.T) { + tests := []struct { + name string + action deployplan.ActionType + expected sdkbundle.OperationActionType + wantErr string + }{ + { + name: "skip maps to empty (no DMS operation)", + action: deployplan.Skip, + expected: "", + }, + { + name: "create", + action: deployplan.Create, + expected: sdkbundle.OperationActionTypeOperationActionTypeCreate, + }, + { + name: "update", + action: deployplan.Update, + expected: sdkbundle.OperationActionTypeOperationActionTypeUpdate, + }, + { + name: "update_id", + action: deployplan.UpdateWithID, + expected: sdkbundle.OperationActionTypeOperationActionTypeUpdateWithId, + }, + { + name: "delete", + action: deployplan.Delete, + expected: sdkbundle.OperationActionTypeOperationActionTypeDelete, + }, + { + name: "recreate", + action: deployplan.Recreate, + expected: sdkbundle.OperationActionTypeOperationActionTypeRecreate, + }, + { + name: "resize", + action: deployplan.Resize, + expected: sdkbundle.OperationActionTypeOperationActionTypeResize, + }, + { + name: "undefined returns error", + action: deployplan.Undefined, + wantErr: "unsupported operation action type", + }, + { + name: "unknown returns error", + action: "some_garbage_value", + wantErr: "unsupported operation action type", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := planActionToOperationAction(tt.action) + if tt.wantErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) + return + } + require.NoError(t, err) + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/bundle/deployplan/plan.go b/bundle/deployplan/plan.go index b35357c7c28..cf81398f9a7 100644 --- a/bundle/deployplan/plan.go +++ b/bundle/deployplan/plan.go @@ -16,11 +16,15 @@ import ( const currentPlanVersion = 2 type Plan struct { - PlanVersion int `json:"plan_version,omitempty"` - CLIVersion string `json:"cli_version,omitempty"` - Lineage string `json:"lineage,omitempty"` - Serial int `json:"serial,omitempty"` - Plan map[string]*PlanEntry `json:"plan,omitzero"` + PlanVersion int `json:"plan_version,omitempty"` + CLIVersion string `json:"cli_version,omitempty"` + // Lineage and Serial uniquely identify a file-state snapshot the plan + // was computed against. Under the deployment metadata service (DMS) + // these are unused — server-side versioning replaces them with the + // deployment_id + version_id pair carried by the lock. + Lineage string `json:"lineage,omitempty"` + Serial int `json:"serial,omitempty"` + Plan map[string]*PlanEntry `json:"plan,omitzero"` mutex sync.Mutex `json:"-"` lockmap lockmap `json:"-"` diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index 6bad8091469..c6f0dde9587 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -84,7 +84,19 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa logdiag.LogError(ctx, fmt.Errorf("%s: Unexpected delete action during migration", errorPrefix)) return false } + + // Capture the resource ID before Destroy clears it from state so + // the DMS report carries a usable resource_id. + var deleteResourceID string + if b.OperationReporter != nil { + deleteResourceID = b.StateDB.GetResourceID(resourceKey) + } + err = d.Destroy(ctx, &b.StateDB) + if reportErr := b.reportOperation(ctx, resourceKey, deleteResourceID, action, err, nil); reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false @@ -124,6 +136,28 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa err = d.Deploy(ctx, &b.StateDB, sv.Value, action, entry) } + // Report the operation outcome to DMS (if enabled). On success + // we report the post-deploy resource ID + the new local state; + // on failure we report the action and error message and skip + // the state payload (matches OPERATION_STATUS_FAILED conventions). + if !migrateMode { + var reportState json.RawMessage + var reportID string + if err == nil { + reportID = b.StateDB.GetResourceID(resourceKey) + stateBytes, marshalErr := json.Marshal(sv.Value) + if marshalErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: marshalling state for report: %w", errorPrefix, marshalErr)) + return false + } + reportState = stateBytes + } + if reportErr := b.reportOperation(ctx, resourceKey, reportID, action, err, reportState); reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } + } + if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false @@ -189,6 +223,23 @@ func (b *DeploymentBundle) LookupReferencePostDeploy(ctx context.Context, path * return structaccess.Get(remoteState, fieldPath) } +// reportOperation forwards a single resource operation outcome to the +// configured OperationReporter (if any). Returning nil when the reporter is +// unset lets callers invoke this unconditionally and keeps the DMS-off path +// identical to before. +func (b *DeploymentBundle) reportOperation( + ctx context.Context, + resourceKey, resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, +) error { + if b.OperationReporter == nil { + return nil + } + return b.OperationReporter(ctx, resourceKey, resourceID, action, operationErr, state) +} + func jsonDump(obj any) string { bytes, err := json.MarshalIndent(obj, "", " ") if err != nil { diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index 5c543619b06..0fa8aceda06 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -39,6 +39,10 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { // ValidatePlanAgainstState validates that a plan's lineage and serial match the given state. // If the plan has no lineage (first deployment), validation is skipped. +// +// Serialized plans are not yet supported with the deployment metadata +// service. When that lands, an equivalent check will be needed against the +// server-side deployment_id + version_id pair. func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error { if plan.Lineage == "" { return nil diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index 48a9c5a2ff7..9b11f0e7476 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -2,6 +2,7 @@ package direct import ( "context" + "encoding/json" "fmt" "reflect" "sync" @@ -37,6 +38,20 @@ type DeploymentUnit struct { DependsOn []deployplan.DependsOnEntry } +// OperationReporter is called after each resource Create/Update/Delete to +// report the outcome to the deployment metadata service (DMS). state holds +// the post-operation local config and is nil for deletes and failed +// operations. Returns an error when the reporter cannot deliver the event +// (e.g. context cancelled); a non-nil error fails the deploy. +type OperationReporter func( + ctx context.Context, + resourceKey string, + resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, +) error + // DeploymentBundle holds everything needed to deploy a bundle type DeploymentBundle struct { StateDB dstate.DeploymentState @@ -44,6 +59,12 @@ type DeploymentBundle struct { Plan *deployplan.Plan RemoteStateCache sync.Map StateCache structvar.Cache + + // OperationReporter, when non-nil, is called inline after each successful + // or failed Create/Update/Delete to report the outcome to DMS. Apply + // leaves this nil when the DMS gate is off, so the file-state path is + // unaffected. + OperationReporter OperationReporter } // SetRemoteState updates the remote state with type validation and marks as fresh. From d98818d05e50a62425947bc73abc7eab196ca16e Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 28 May 2026 01:08:13 +0000 Subject: [PATCH 6/6] bundle/direct: reject migrate+DMS combination up front Migrate mode rewrites local state in-place and never calls resource adapters, so there are no operations to report. Without this check, a DMS-enabled migrate would silently leave the server-side resource inventory empty. Co-authored-by: Isaac --- bundle/direct/bundle_apply.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index c6f0dde9587..06bb4719162 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -20,6 +20,14 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa panic("Planning is not done") } + // Migrate mode rewrites local state in-place and does not call resource + // adapters, so there are no operations to report. Reject the combination + // rather than silently dropping migrations from the DMS view. + if migrateMode && b.OperationReporter != nil { + logdiag.LogError(ctx, errors.New("migration is not supported with the deployment metadata service")) + return + } + if len(plan.Plan) == 0 { // Avoid creating state file if nothing to deploy return @@ -138,9 +146,11 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa // Report the operation outcome to DMS (if enabled). On success // we report the post-deploy resource ID + the new local state; - // on failure we report the action and error message and skip + // on failure we report the action and error message and omit // the state payload (matches OPERATION_STATUS_FAILED conventions). - if !migrateMode { + // migrateMode is rejected up-front above when DMS is enabled, so + // it's safe to always go through reportOperation here. + if b.OperationReporter != nil { var reportState json.RawMessage var reportID string if err == nil {