diff --git a/acceptance/bin/print_requests.py b/acceptance/bin/print_requests.py index c06449a2482..6e75f495156 100755 --- a/acceptance/bin/print_requests.py +++ b/acceptance/bin/print_requests.py @@ -168,7 +168,7 @@ def main(): if not requests_file.exists(): sys.exit(f"File {requests_file} not found") - with open(requests_file) as fobj: + with open(requests_file, encoding="utf-8") as fobj: data = fobj.read() if not data: diff --git a/acceptance/bundle/dms/add-resources/databricks.yml b/acceptance/bundle/dms/add-resources/databricks.yml new file mode 100644 index 00000000000..22873b2e010 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-add-resources + +resources: + jobs: + job_a: + name: job-a diff --git a/acceptance/bundle/dms/add-resources/out.test.toml b/acceptance/bundle/dms/add-resources/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/add-resources/output.txt b/acceptance/bundle/dms/add-resources/output.txt new file mode 100644 index 00000000000..093756b72ff --- /dev/null +++ b/acceptance/bundle/dms/add-resources/output.txt @@ -0,0 +1,127 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle plan +Plan: 0 to add, 0 to change, 0 to delete, 2 unchanged + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.job_a" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.job_a", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-a", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "jobs.job_b" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.job_b", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-add-resources/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-b", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} diff --git a/acceptance/bundle/dms/add-resources/script b/acceptance/bundle/dms/add-resources/script new file mode 100644 index 00000000000..33d205e9257 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/script @@ -0,0 +1,34 @@ +# Deploy a fresh bundle with one job — DMS records a single CREATE operation. +trace $CLI bundle deploy + +# Drop the local cache so the next deploy re-reads state from DMS via ListResources. +rm -rf .databricks + +# Add a second job and redeploy. The new deploy should: +# - read existing state from DMS, +# - skip job_a (no change), +# - create job_b and report exactly one CREATE operation under version 2. +cat > databricks.yml << 'EOF' +bundle: + name: dms-add-resources + +resources: + jobs: + job_a: + name: job-a + job_b: + name: job-b +EOF +trace $CLI bundle deploy + +# Drop the cache one more time and run plan — both resources should show as +# unchanged because their reported state matches the local config. +rm -rf .databricks +trace $CLI bundle plan + +# Inspect the recorded DMS traffic. Expect: +# - Deploy 1: CreateDeployment + CreateVersion(1) + CreateOperation(job_a, CREATE) + CompleteVersion(1) +# - Deploy 2: ListResources + GetDeployment + CreateVersion(2) + CreateOperation(job_b, CREATE) + CompleteVersion(2) +# - Plan: ListResources +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/add-resources/test.toml b/acceptance/bundle/dms/add-resources/test.toml new file mode 100644 index 00000000000..f57dd05b513 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/test.toml @@ -0,0 +1,6 @@ +Ignore = [".databricks"] + +# Exercise DMS-side operation reporting across two deploys: +# - first deploy creates job_a and reports one CREATE operation, +# - second deploy adds job_b without retouching job_a, +# - subsequent plan reads from DMS and sees both jobs already deployed. diff --git a/acceptance/bundle/dms/deploy-error/databricks.yml b/acceptance/bundle/dms/deploy-error/databricks.yml new file mode 100644 index 00000000000..b7e4006775a --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-deploy-error + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/deploy-error/out.test.toml b/acceptance/bundle/dms/deploy-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/deploy-error/output.txt b/acceptance/bundle/dms/deploy-error/output.txt new file mode 100644 index 00000000000..cbb7f1d934a --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/output.txt @@ -0,0 +1,56 @@ + +>>> musterr [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-deploy-error/default/files... +Deploying resources... +Error: cannot create resources.jobs.test_job: Invalid job configuration. (400 INVALID_PARAMETER_VALUE) + +Endpoint: POST [DATABRICKS_URL]/api/2.2/jobs/create +HTTP Status: 400 Bad Request +API error_code: INVALID_PARAMETER_VALUE +API message: Invalid job configuration. + + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "error_message": "Invalid job configuration.", + "resource_id": "", + "resource_key": "jobs.test_job", + "status": "OPERATION_STATUS_FAILED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_FAILURE" + } +} diff --git a/acceptance/bundle/dms/deploy-error/script b/acceptance/bundle/dms/deploy-error/script new file mode 100644 index 00000000000..3c2dfc11408 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/script @@ -0,0 +1,10 @@ +# Deploy with a server-side job creation failure injected via test.toml. +# Expect the CLI to: +# - acquire the DMS lock (CreateDeployment + CreateVersion(1)), +# - attempt the job creation and fail, +# - report a FAILED CREATE operation carrying the error message, +# - release the lock with completion_reason=VERSION_COMPLETE_FAILURE. +trace musterr $CLI bundle deploy + +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/deploy-error/test.toml b/acceptance/bundle/dms/deploy-error/test.toml new file mode 100644 index 00000000000..d3190822d55 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/test.toml @@ -0,0 +1,10 @@ +Ignore = [".databricks"] + +# Inject a server-side failure on job creation and confirm that: +# - the failed operation is reported to DMS with OPERATION_STATUS_FAILED, +# - the lock is released with VERSION_COMPLETE_FAILURE. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.StatusCode = 400 +Response.Body = '{"error_code": "INVALID_PARAMETER_VALUE", "message": "Invalid job configuration."}' diff --git a/acceptance/bundle/dms/plan-and-summary/databricks.yml b/acceptance/bundle/dms/plan-and-summary/databricks.yml new file mode 100644 index 00000000000..9529e5b8c9b --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-plan-and-summary + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/plan-and-summary/out.test.toml b/acceptance/bundle/dms/plan-and-summary/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/plan-and-summary/output.txt b/acceptance/bundle/dms/plan-and-summary/output.txt new file mode 100644 index 00000000000..d4b7a62cfd4 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/output.txt @@ -0,0 +1,85 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-plan-and-summary/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle plan +Plan: 0 to add, 0 to change, 0 to delete, 1 unchanged + +>>> [CLI] bundle summary +Name: dms-plan-and-summary +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/dms-plan-and-summary/default +Resources: + Jobs: + test_job: + Name: test-job + URL: [DATABRICKS_URL]/jobs/[NUMID]?o=[NUMID] + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-plan-and-summary/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} diff --git a/acceptance/bundle/dms/plan-and-summary/script b/acceptance/bundle/dms/plan-and-summary/script new file mode 100644 index 00000000000..8b2d0def20c --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/script @@ -0,0 +1,17 @@ +# Deploy first so managed_service.json exists with a deployment_id. +# Step 5 will report individual operations here; for now DMS only knows the +# deployment exists. +trace $CLI bundle deploy + +# bundle plan should hit DMS via ListResources. With no operations reported +# yet, the resource list is empty and the job is planned as a create. +trace $CLI bundle plan + +# bundle summary should also read from DMS. ListResources is empty, so the +# job renders as (not deployed). +trace $CLI bundle summary + +# Verify the metadata-service calls. Two ListResources lines confirm that +# both plan and summary went through the DMS read path. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/plan-and-summary/test.toml b/acceptance/bundle/dms/plan-and-summary/test.toml new file mode 100644 index 00000000000..55257593f46 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/test.toml @@ -0,0 +1,10 @@ +Ignore = [".databricks"] + +# Verify the state-read path against the deployment metadata service: +# - bundle deploy populates managed_service.json with a deployment_id. +# - bundle plan / summary read state from DMS via ListResources rather than +# from the workspace resources.json file. +# +# With operation reporting (step 5) in place, the deploy reports a CREATE +# operation so the subsequent plan sees the job as already deployed and the +# summary renders its URL. diff --git a/acceptance/bundle/dms/release-lock-error/databricks.yml b/acceptance/bundle/dms/release-lock-error/databricks.yml new file mode 100644 index 00000000000..94323b84d93 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: dms-release-lock-error + +targets: + fail-complete: + default: true + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/release-lock-error/out.test.toml b/acceptance/bundle/dms/release-lock-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt new file mode 100644 index 00000000000..cc333c6e415 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -0,0 +1,63 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... +Deploying resources... +Deployment complete! +Warn: Failed to release deployment lock: simulated complete version failure + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "fail-complete" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "fail-complete", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/release-lock-error/script b/acceptance/bundle/dms/release-lock-error/script new file mode 100755 index 00000000000..86220e4be0a --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/script @@ -0,0 +1,8 @@ +# Deploy with the deployment metadata service enabled. +# The target name "fail-complete" instructs the fake DMS server to fail the +# CompleteVersion call so the CLI exercises the "lock release failed" branch. +trace $CLI bundle deploy + +# Print the DMS requests to verify the lock release was attempted. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/release-lock-error/test.toml b/acceptance/bundle/dms/release-lock-error/test.toml new file mode 100644 index 00000000000..58ed9f5172c --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/test.toml @@ -0,0 +1,4 @@ +Ignore = [".databricks"] + +# Override target to "fail-complete" which makes the fake DMS server's +# CompleteVersion endpoint return an error, simulating a release failure. diff --git a/acceptance/bundle/dms/sequential-deploys/databricks.yml b/acceptance/bundle/dms/sequential-deploys/databricks.yml new file mode 100644 index 00000000000..7eb80d5965a --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-sequential-deploys + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/sequential-deploys/out.test.toml b/acceptance/bundle/dms/sequential-deploys/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/sequential-deploys/output.txt b/acceptance/bundle/dms/sequential-deploys/output.txt new file mode 100644 index 00000000000..0c6041cc81e --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/output.txt @@ -0,0 +1,169 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "jobs.new_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "resource_key": "jobs.new_job", + "state": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/state/metadata.json" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "new-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-sequential-deploys/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "action_type": "OPERATION_ACTION_TYPE_DELETE", + "resource_id": "[NUMID]", + "resource_key": "jobs.test_job", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/sequential-deploys/script b/acceptance/bundle/dms/sequential-deploys/script new file mode 100644 index 00000000000..e11211eaa1f --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/script @@ -0,0 +1,36 @@ +# Deploy 1: create test_job. Expect a single CREATE operation under version 1. +trace $CLI bundle deploy +trace print_requests.py --get //bundle ^//workspace-files ^//import-file + +# Deploy 2: add new_job. Expect: +# - ListResources (state read) + GetDeployment (version bump) +# - one CREATE operation for new_job (test_job is unchanged — no SKIP is reported) +# - CompleteVersion 2. +cat > databricks.yml << 'EOF' +bundle: + name: dms-sequential-deploys + +resources: + jobs: + test_job: + name: test-job + new_job: + name: new-job +EOF +trace $CLI bundle deploy +trace print_requests.py --get //bundle ^//workspace-files ^//import-file + +# Deploy 3: drop test_job. Expect one DELETE operation under version 3 with +# the resource_id of the previously created job, no state payload. +cat > databricks.yml << 'EOF' +bundle: + name: dms-sequential-deploys + +resources: + jobs: + new_job: + name: new-job +EOF +trace $CLI bundle deploy +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/sequential-deploys/test.toml b/acceptance/bundle/dms/sequential-deploys/test.toml new file mode 100644 index 00000000000..b55fbcbf6b5 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/test.toml @@ -0,0 +1,6 @@ +Ignore = [".databricks"] + +# Three successive deploys exercise the full operation-reporting taxonomy: +# v1 — CREATE for test_job +# v2 — CREATE for new_job (test_job unchanged, no operation) +# v3 — DELETE for test_job (new_job unchanged) diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..1b258d2613a --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,7 @@ +Badness = "Uses local test server; enable on cloud once the deployment metadata service is in production" +Local = true +Cloud = false +RecordRequests = true + +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/bundle/bundle.go b/bundle/bundle.go index e7eef14b907..ec012b2a031 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -141,6 +141,13 @@ type Bundle struct { // (direct only) deployment implementation and state DeploymentBundle direct.DeploymentBundle + // DeploymentID identifies the DMS-side deployment record for this bundle. + // Populated from the workspace managed_service.json during state pull when + // DATABRICKS_BUNDLE_MANAGED_STATE is set, and by the lock package after + // CreateDeployment. Empty when the deployment metadata service is not in + // use, or when DMS is enabled but no prior deployment exists. + DeploymentID string + // if true, we skip approval checks for deploy, destroy resources and delete // files AutoApprove bool diff --git a/bundle/deploy/lock/acquire.go b/bundle/deploy/lock/acquire.go deleted file mode 100644 index 6e4844ca5ff..00000000000 --- a/bundle/deploy/lock/acquire.go +++ /dev/null @@ -1,69 +0,0 @@ -package lock - -import ( - "context" - "errors" - "io/fs" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/permissions" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type acquire struct{} - -func Acquire() bundle.Mutator { - return &acquire{} -} - -func (m *acquire) Name() string { - return "lock:acquire" -} - -func (m *acquire) init(ctx context.Context, b *bundle.Bundle) error { - user := b.Config.Workspace.CurrentUser.UserName - dir := b.Config.Workspace.StatePath - l, err := locker.CreateLocker(user, dir, b.WorkspaceClient(ctx)) - if err != nil { - return err - } - - b.Locker = l - return nil -} - -func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - err := m.init(ctx, b) - if err != nil { - return diag.FromErr(err) - } - - force := b.Config.Bundle.Deployment.Lock.Force - log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) - err = b.Locker.Lock(ctx, force) - if err != nil { - log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) - - if errors.Is(err, fs.ErrPermission) { - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - if errors.Is(err, fs.ErrNotExist) { - // If we get a "doesn't exist" error from the API this indicates - // we either don't have permissions or the path is invalid. - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - return diag.FromErr(err) - } - - return nil -} diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..a7af28d87ba --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,309 @@ +package lock + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/statemgmt" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/google/uuid" +) + +// defaultHeartbeatInterval is how often the background heartbeat goroutine +// renews the DMS-side lock lease while a deployment is in progress. +const defaultHeartbeatInterval = 30 * time.Second + +// metadataServiceLock implements DeploymentLock against the bundle deployment +// metadata service (DMS). The lock is acquired by creating a new Version +// under the deployment; a background goroutine renews the lock lease via +// Heartbeat calls; the lock is released by CompleteVersion. +type metadataServiceLock struct { + b *bundle.Bundle + versionType sdkbundle.VersionType + + svc sdkbundle.BundleInterface + deploymentID string + versionID string + + stopHeartbeat context.CancelFunc + reporter *asyncReporter +} + +func newMetadataServiceLock(b *bundle.Bundle, goal Goal) (*metadataServiceLock, error) { + versionType, err := goalToVersionType(goal) + if err != nil { + return nil, err + } + return &metadataServiceLock{b: b, versionType: versionType}, nil +} + +// goalToVersionType maps a deployment Goal onto the DMS VersionType enum. +// Bind and Unbind are not yet supported under DMS — they will gain dedicated +// DMS operations in a later change. +func goalToVersionType(goal Goal) (sdkbundle.VersionType, error) { + switch goal { + case GoalDeploy: + return sdkbundle.VersionTypeVersionTypeDeploy, nil + case GoalDestroy: + return sdkbundle.VersionTypeVersionTypeDestroy, nil + case GoalBind, GoalUnbind: + return "", fmt.Errorf("%s is not supported with the deployment metadata service", goal) + default: + return "", fmt.Errorf("unknown deployment goal: %s", goal) + } +} + +func (l *metadataServiceLock) Acquire(ctx context.Context) error { + if l.b.Config.Bundle.Deployment.Lock.Force { + return errors.New("force lock is not supported with the deployment metadata service") + } + + l.svc = l.b.WorkspaceClient(ctx).Bundle + + deploymentID, versionID, err := acquireLock(ctx, l.b, l.svc, l.versionType) + if err != nil { + return err + } + + l.deploymentID = deploymentID + l.versionID = versionID + // Publish the deployment ID on the bundle so downstream code (e.g. + // statemgmt.LoadStateFromDMS) can address the right server-side record. + l.b.DeploymentID = deploymentID + l.stopHeartbeat = startHeartbeat(ctx, l.svc, deploymentID, versionID) + + // Wire per-resource operation reporting through an async dispatcher so + // apply workers don't synchronously wait on DMS round-trips. The + // reporter is closed in Release before CompleteVersion to ensure all + // operations are recorded under this version. + l.reporter = newAsyncReporter(ctx, makeOperationSender(l.svc, deploymentID, versionID)) + l.b.DeploymentBundle.OperationReporter = l.reporter.Reporter() + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, versionID) + return nil +} + +func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStatus) error { + // Stop the heartbeat first so its in-flight request doesn't race with + // CompleteVersion below. + if l.stopHeartbeat != nil { + l.stopHeartbeat() + } + + // Drain pending operation reports before completing the version, so all + // per-resource events are recorded under this version_id on the server + // side. Close blocks until the sender goroutine exits. + if l.reporter != nil { + l.reporter.Close() + } + + // If Acquire failed before reaching CreateVersion there is nothing to release. + if l.svc == nil || l.deploymentID == "" || l.versionID == "" { + return nil + } + + reason := sdkbundle.VersionCompleteVersionCompleteSuccess + if status == DeploymentFailure { + reason = sdkbundle.VersionCompleteVersionCompleteFailure + } + + versionName := fmt.Sprintf("deployments/%s/versions/%s", l.deploymentID, l.versionID) + if _, err := l.svc.CompleteVersion(ctx, sdkbundle.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }); err != nil { + return err + } + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%s", + l.deploymentID, l.versionID, reason) + + // On successful destroy, delete the deployment record. Surface failures + // to the caller — they are deploy-correctness issues, not best-effort + // cleanup. + if status == DeploymentSuccess && l.versionType == sdkbundle.VersionTypeVersionTypeDestroy { + if err := l.svc.DeleteDeployment(ctx, sdkbundle.DeleteDeploymentRequest{ + Name: "deployments/" + l.deploymentID, + }); err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + return nil +} + +// acquireLock implements the lock acquisition protocol: +// 1. Resolve the deployment ID from managed_service.json (or generate a new one). +// 2. CreateDeployment for fresh IDs; GetDeployment otherwise to learn the +// next version number. +// 3. CreateVersion to acquire the lock. +func acquireLock(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, versionType sdkbundle.VersionType) (deploymentID, versionID string, err error) { + deploymentID, isNew, err := resolveDeploymentID(ctx, b) + if err != nil { + return "", "", err + } + + if isNew { + // Fresh deployment: create the record at version 1. + _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ + DeploymentId: deploymentID, + Deployment: sdkbundle.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", "", fmt.Errorf("failed to create deployment: %w", createErr) + } + // Persist the deployment ID only after the server-side record exists, + // so a failed CreateDeployment doesn't leave a dangling ID on disk. + if err := writeDeploymentID(ctx, b, deploymentID); err != nil { + return "", "", err + } + versionID = "1" + } else { + // Existing deployment: ask the server for the last version ID. + dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + deploymentID, + }) + if getErr != nil { + return "", "", fmt.Errorf("failed to get deployment: %w", getErr) + } + next, parseErr := nextVersionID(dep.LastVersionId) + if parseErr != nil { + return "", "", parseErr + } + versionID = next + } + + if _, err := svc.CreateVersion(ctx, sdkbundle.CreateVersionRequest{ + Parent: "deployments/" + deploymentID, + VersionId: versionID, + Version: sdkbundle.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }); err != nil { + return "", "", fmt.Errorf("failed to acquire deployment lock: %w", err) + } + + return deploymentID, versionID, nil +} + +// nextVersionID returns the next monotonic version ID following lastVersionID. +// An empty lastVersionID means "no prior versions" so the next ID is "1". +func nextVersionID(lastVersionID string) (string, error) { + if lastVersionID == "" { + return "1", nil + } + n, err := strconv.ParseInt(lastVersionID, 10, 64) + if err != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", lastVersionID, err) + } + return strconv.FormatInt(n+1, 10), nil +} + +// resolveDeploymentID returns the deployment ID for this bundle. If +// managed_service.json exists in the workspace state directory and contains a +// deployment ID, it is reused. Otherwise a new UUID is generated and the +// caller must write it to disk after CreateDeployment succeeds. +func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, bool, error) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return "", false, fmt.Errorf("failed to create state filer: %w", err) + } + + reader, readErr := f.Read(ctx, statemgmt.ManagedServiceFileName) + if readErr == nil { + defer reader.Close() + data, err := io.ReadAll(reader) + if err != nil { + return "", false, fmt.Errorf("failed to read %s: %w", statemgmt.ManagedServiceFileName, err) + } + var sj statemgmt.ManagedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + return "", false, fmt.Errorf("failed to parse %s: %w", statemgmt.ManagedServiceFileName, err) + } + if sj.DeploymentID != "" { + return sj.DeploymentID, false, nil + } + // File exists but has no deployment_id — treat as fresh. + } else if !errors.Is(readErr, fs.ErrNotExist) { + return "", false, fmt.Errorf("failed to read %s: %w", statemgmt.ManagedServiceFileName, readErr) + } + + return uuid.New().String(), true, nil +} + +func writeDeploymentID(ctx context.Context, b *bundle.Bundle, deploymentID string) error { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return fmt.Errorf("failed to create state filer: %w", err) + } + data, err := json.Marshal(statemgmt.ManagedServiceJSON{DeploymentID: deploymentID}) + if err != nil { + return fmt.Errorf("failed to marshal %s: %w", statemgmt.ManagedServiceFileName, err) + } + if err := f.Write(ctx, statemgmt.ManagedServiceFileName, bytes.NewReader(data), + filer.CreateParentDirectories, filer.OverwriteIfExists); err != nil { + return fmt.Errorf("failed to write %s: %w", statemgmt.ManagedServiceFileName, err) + } + return nil +} + +// startHeartbeat spawns a goroutine that renews the DMS lock lease at +// defaultHeartbeatInterval. The returned cancel func stops the goroutine. +// Heartbeat errors that indicate the version was already completed (HTTP 409 +// ABORTED) are treated as benign termination; all other errors are logged +// and the goroutine continues so a transient network blip doesn't tear down +// the deploy. +func startHeartbeat(parent context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(parent) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if _, err := svc.Heartbeat(ctx, sdkbundle.HeartbeatRequest{Name: versionName}); err != nil { + if isAborted(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + continue + } + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", + deploymentID, versionID) + } + } + }() + + return cancel +} + +// isAborted reports whether err is the DMS-specific "409 ABORTED" response +// the server emits when the heartbeat target version is no longer active. +func isAborted(err error) bool { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" { + return true + } + return false +} diff --git a/bundle/deploy/lock/deployment_metadata_service_test.go b/bundle/deploy/lock/deployment_metadata_service_test.go new file mode 100644 index 00000000000..76b09d6532c --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service_test.go @@ -0,0 +1,58 @@ +package lock + +import ( + "testing" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" +) + +func TestGoalToVersionType(t *testing.T) { + tests := []struct { + goal Goal + want sdkbundle.VersionType + wantErr bool + }{ + {goal: GoalDeploy, want: sdkbundle.VersionTypeVersionTypeDeploy}, + {goal: GoalDestroy, want: sdkbundle.VersionTypeVersionTypeDestroy}, + {goal: GoalBind, wantErr: true}, + {goal: GoalUnbind, wantErr: true}, + {goal: Goal("garbage"), wantErr: true}, + } + for _, tt := range tests { + t.Run(string(tt.goal), func(t *testing.T) { + got, err := goalToVersionType(tt.goal) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestNextVersionID(t *testing.T) { + tests := []struct { + name string + last string + want string + wantErr bool + }{ + {name: "empty starts at 1", last: "", want: "1"}, + {name: "increments numeric", last: "1", want: "2"}, + {name: "increments larger numeric", last: "42", want: "43"}, + {name: "rejects non-numeric", last: "v1", wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := nextVersionID(tt.last) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go new file mode 100644 index 00000000000..58162aba946 --- /dev/null +++ b/bundle/deploy/lock/lock.go @@ -0,0 +1,50 @@ +package lock + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/env" +) + +// Goal describes the purpose of a deployment operation. +type Goal string + +const ( + GoalBind = Goal("bind") + GoalUnbind = Goal("unbind") + GoalDeploy = Goal("deploy") + GoalDestroy = Goal("destroy") +) + +// DeploymentStatus indicates whether the deployment operation succeeded or failed. +type DeploymentStatus int + +const ( + DeploymentSuccess DeploymentStatus = iota + DeploymentFailure +) + +// DeploymentLock manages the deployment lock lifecycle. +type DeploymentLock interface { + // Acquire acquires the deployment lock. + Acquire(ctx context.Context) error + + // Release releases the deployment lock with the given deployment status. + Release(ctx context.Context, status DeploymentStatus) error +} + +// NewDeploymentLock returns a DeploymentLock implementation chosen based on +// the DATABRICKS_BUNDLE_MANAGED_STATE environment variable: when set to a +// truthy value the deployment metadata service backs the lock, otherwise the +// historical workspace-filesystem lock is used. +// +// Note: today the env var alone gates the DMS path. Once the broader managed- +// state feature lands the gate will move behind a richer predicate (e.g. +// statemgmt.IsDmsActive) that also checks server-side opt-in. +func NewDeploymentLock(ctx context.Context, b *bundle.Bundle, goal Goal) (DeploymentLock, error) { + if env.IsManagedState(ctx) { + return newMetadataServiceLock(b, goal) + } + return newWorkspaceFilesystemLock(b, goal), nil +} diff --git a/bundle/deploy/lock/operation_reporter.go b/bundle/deploy/lock/operation_reporter.go new file mode 100644 index 00000000000..d59de5f39ab --- /dev/null +++ b/bundle/deploy/lock/operation_reporter.go @@ -0,0 +1,182 @@ +package lock + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/libs/log" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// asyncReporterBufferSize matches direct.defaultParallelism so that, in the +// worst case, a hard process crash drops at most ~10 unsent operation events. +// The send blocks when the buffer is full, applying backpressure to the apply +// worker pool. +const asyncReporterBufferSize = 10 + +// operationEvent is one resource-apply outcome enqueued for delivery to DMS. +type operationEvent struct { + resourceKey string + resourceID string + action deployplan.ActionType + operationErr error + state json.RawMessage +} + +// asyncReporter ships operationEvents to DMS from a single sender goroutine +// fed by a buffered channel. Apply workers push events and continue; the +// sender drains the channel and logs any DMS-side failures without +// interrupting the deploy. Close blocks until the sender drains. +// +// We send asynchronously because operation reporting is on the hot path of +// every Create/Update/Delete and the DMS call is a network round-trip we +// don't want to serialize the deploy on. The buffer size matches the apply +// parallelism so a single batch of operations can be enqueued without +// blocking; once full, sends apply backpressure to the worker pool. +type asyncReporter struct { + ch chan operationEvent + done chan struct{} + sendFn func(ctx context.Context, ev operationEvent) error + ctx context.Context +} + +// newAsyncReporter starts the sender goroutine. The given ctx is used for all +// DMS API calls; it must outlive individual worker contexts (callers pass the +// deploy-level context for this reason). +func newAsyncReporter(ctx context.Context, sendFn func(context.Context, operationEvent) error) *asyncReporter { + r := &asyncReporter{ + ch: make(chan operationEvent, asyncReporterBufferSize), + done: make(chan struct{}), + sendFn: sendFn, + ctx: ctx, + } + go r.run() + return r +} + +func (r *asyncReporter) run() { + defer close(r.done) + for ev := range r.ch { + if err := r.sendFn(r.ctx, ev); err != nil { + // Reporting failures are intentionally non-fatal: the deploy + // already succeeded (or failed independently), and we don't + // want a DMS hiccup to surface as a deploy error. Matches the + // heartbeat behaviour established in step 3. + log.Warnf(r.ctx, "Failed to report %s operation for %s to DMS: %v", ev.action, ev.resourceKey, err) + } + } +} + +// Reporter returns a direct.OperationReporter that enqueues onto the channel. +// The returned function is safe to call from multiple goroutines and returns +// quickly unless the buffer is full. +func (r *asyncReporter) Reporter() direct.OperationReporter { + return func( + ctx context.Context, + resourceKey, resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, + ) error { + select { + case r.ch <- operationEvent{ + resourceKey: resourceKey, + resourceID: resourceID, + action: action, + operationErr: operationErr, + state: state, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Close signals end-of-input and waits for the sender to drain. +func (r *asyncReporter) Close() { + close(r.ch) + <-r.done +} + +// makeOperationSender returns the synchronous "send one event to DMS" function +// consumed by asyncReporter. Skip actions short-circuit to nil; mapping errors +// and API errors are returned to the caller (asyncReporter logs and continues). +func makeOperationSender(svc sdkbundle.BundleInterface, deploymentID, versionID string) func(context.Context, operationEvent) error { + return func(ctx context.Context, ev operationEvent) error { + // Internal state uses fully-qualified keys like "resources.jobs.foo". + // DMS-side resource keys omit the "resources." prefix. + apiKey := strings.TrimPrefix(ev.resourceKey, "resources.") + + actionType, err := planActionToOperationAction(ev.action) + if err != nil { + return fmt.Errorf("mapping action for resource %s: %w", ev.resourceKey, err) + } + if actionType == "" { + // Skip actions are not reported — there is nothing for DMS to record. + return nil + } + + status := sdkbundle.OperationStatusOperationStatusSucceeded + var errorMessage string + if ev.operationErr != nil { + status = sdkbundle.OperationStatusOperationStatusFailed + errorMessage = ev.operationErr.Error() + } + + op := sdkbundle.Operation{ + ResourceKey: apiKey, + ResourceId: ev.resourceID, + Status: status, + ActionType: actionType, + ErrorMessage: errorMessage, + } + if len(ev.state) > 0 { + s := ev.state + op.State = &s + } + + _, err = svc.CreateOperation(ctx, sdkbundle.CreateOperationRequest{ + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: apiKey, + Operation: op, + }) + if err != nil { + return fmt.Errorf("reporting operation for resource %s: %w", ev.resourceKey, err) + } + return nil + } +} + +// planActionToOperationAction maps a deploy-plan action onto the DMS +// OperationActionType enum. The Skip action is mapped to the empty string so +// the caller can drop it (DMS has no concept of a no-op operation). +// +// Bind / BindAndUpdate / InitialRegister are not currently produced by the +// direct planner, so they are intentionally not mapped here — adding new plan +// actions in the planner without updating this mapping will fail loud at +// runtime (default branch returns an error). +func planActionToOperationAction(action deployplan.ActionType) (sdkbundle.OperationActionType, error) { + switch action { + case deployplan.Skip: + return "", nil + case deployplan.Create: + return sdkbundle.OperationActionTypeOperationActionTypeCreate, nil + case deployplan.Update: + return sdkbundle.OperationActionTypeOperationActionTypeUpdate, nil + case deployplan.UpdateWithID: + return sdkbundle.OperationActionTypeOperationActionTypeUpdateWithId, nil + case deployplan.Delete: + return sdkbundle.OperationActionTypeOperationActionTypeDelete, nil + case deployplan.Recreate: + return sdkbundle.OperationActionTypeOperationActionTypeRecreate, nil + case deployplan.Resize: + return sdkbundle.OperationActionTypeOperationActionTypeResize, nil + default: + return "", fmt.Errorf("unsupported operation action type: %s", action) + } +} diff --git a/bundle/deploy/lock/operation_reporter_test.go b/bundle/deploy/lock/operation_reporter_test.go new file mode 100644 index 00000000000..479b17b0d41 --- /dev/null +++ b/bundle/deploy/lock/operation_reporter_test.go @@ -0,0 +1,78 @@ +package lock + +import ( + "testing" + + "github.com/databricks/cli/bundle/deployplan" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPlanActionToOperationAction(t *testing.T) { + tests := []struct { + name string + action deployplan.ActionType + expected sdkbundle.OperationActionType + wantErr string + }{ + { + name: "skip maps to empty (no DMS operation)", + action: deployplan.Skip, + expected: "", + }, + { + name: "create", + action: deployplan.Create, + expected: sdkbundle.OperationActionTypeOperationActionTypeCreate, + }, + { + name: "update", + action: deployplan.Update, + expected: sdkbundle.OperationActionTypeOperationActionTypeUpdate, + }, + { + name: "update_id", + action: deployplan.UpdateWithID, + expected: sdkbundle.OperationActionTypeOperationActionTypeUpdateWithId, + }, + { + name: "delete", + action: deployplan.Delete, + expected: sdkbundle.OperationActionTypeOperationActionTypeDelete, + }, + { + name: "recreate", + action: deployplan.Recreate, + expected: sdkbundle.OperationActionTypeOperationActionTypeRecreate, + }, + { + name: "resize", + action: deployplan.Resize, + expected: sdkbundle.OperationActionTypeOperationActionTypeResize, + }, + { + name: "undefined returns error", + action: deployplan.Undefined, + wantErr: "unsupported operation action type", + }, + { + name: "unknown returns error", + action: "some_garbage_value", + wantErr: "unsupported operation action type", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := planActionToOperationAction(tt.action) + if tt.wantErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErr) + return + } + require.NoError(t, err) + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go deleted file mode 100644 index 26f95edfc95..00000000000 --- a/bundle/deploy/lock/release.go +++ /dev/null @@ -1,58 +0,0 @@ -package lock - -import ( - "context" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type Goal string - -const ( - GoalBind = Goal("bind") - GoalUnbind = Goal("unbind") - GoalDeploy = Goal("deploy") - GoalDestroy = Goal("destroy") -) - -type release struct { - goal Goal -} - -func Release(goal Goal) bundle.Mutator { - return &release{goal} -} - -func (m *release) Name() string { - return "lock:release" -} - -func (m *release) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - // Return early if the locker is not set. - // It is likely an error occurred prior to initialization of the locker instance. - if b.Locker == nil { - log.Warnf(ctx, "Unable to release lock if locker is not configured") - return nil - } - - log.Infof(ctx, "Releasing deployment lock") - switch m.goal { - case GoalDeploy: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalBind, GoalUnbind: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalDestroy: - return diag.FromErr(b.Locker.Unlock(ctx, locker.AllowLockFileNotExist)) - default: - return diag.Errorf("unknown goal for lock release: %s", m.goal) - } -} diff --git a/bundle/deploy/lock/workspace_filesystem.go b/bundle/deploy/lock/workspace_filesystem.go new file mode 100644 index 00000000000..55da52d6a2e --- /dev/null +++ b/bundle/deploy/lock/workspace_filesystem.go @@ -0,0 +1,84 @@ +package lock + +import ( + "context" + "errors" + "io/fs" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/permissions" + "github.com/databricks/cli/libs/locker" + "github.com/databricks/cli/libs/log" +) + +// workspaceFilesystemLock implements DeploymentLock using a lock file in the +// bundle's workspace state path. This preserves the historical behavior of +// the previous lock.Acquire / lock.Release mutators. +type workspaceFilesystemLock struct { + b *bundle.Bundle + goal Goal +} + +func newWorkspaceFilesystemLock(b *bundle.Bundle, goal Goal) *workspaceFilesystemLock { + return &workspaceFilesystemLock{b: b, goal: goal} +} + +func (l *workspaceFilesystemLock) Acquire(ctx context.Context) error { + b := l.b + + // Return early if locking is disabled. + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + user := b.Config.Workspace.CurrentUser.UserName + dir := b.Config.Workspace.StatePath + lk, err := locker.CreateLocker(user, dir, b.WorkspaceClient(ctx)) + if err != nil { + return err + } + + b.Locker = lk + + force := b.Config.Bundle.Deployment.Lock.Force + log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) + err = lk.Lock(ctx, force) + if err != nil { + log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) + + // If we get a permission or "doesn't exist" error from the API this + // indicates we either don't have permissions or the path is invalid. + if errors.Is(err, fs.ErrPermission) || errors.Is(err, fs.ErrNotExist) { + diags := permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) + return diags.Error() + } + + return err + } + + return nil +} + +func (l *workspaceFilesystemLock) Release(ctx context.Context, _ DeploymentStatus) error { + b := l.b + + // Return early if locking is disabled. + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + // Return early if the locker is not set. + // It is likely an error occurred prior to initialization of the locker instance. + if b.Locker == nil { + log.Warnf(ctx, "Unable to release lock if locker is not configured") + return nil + } + + log.Infof(ctx, "Releasing deployment lock") + if l.goal == GoalDestroy { + return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist) + } + return b.Locker.Unlock(ctx) +} diff --git a/bundle/deployplan/plan.go b/bundle/deployplan/plan.go index b35357c7c28..cf81398f9a7 100644 --- a/bundle/deployplan/plan.go +++ b/bundle/deployplan/plan.go @@ -16,11 +16,15 @@ import ( const currentPlanVersion = 2 type Plan struct { - PlanVersion int `json:"plan_version,omitempty"` - CLIVersion string `json:"cli_version,omitempty"` - Lineage string `json:"lineage,omitempty"` - Serial int `json:"serial,omitempty"` - Plan map[string]*PlanEntry `json:"plan,omitzero"` + PlanVersion int `json:"plan_version,omitempty"` + CLIVersion string `json:"cli_version,omitempty"` + // Lineage and Serial uniquely identify a file-state snapshot the plan + // was computed against. Under the deployment metadata service (DMS) + // these are unused — server-side versioning replaces them with the + // deployment_id + version_id pair carried by the lock. + Lineage string `json:"lineage,omitempty"` + Serial int `json:"serial,omitempty"` + Plan map[string]*PlanEntry `json:"plan,omitzero"` mutex sync.Mutex `json:"-"` lockmap lockmap `json:"-"` diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index 6bad8091469..06bb4719162 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -20,6 +20,14 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa panic("Planning is not done") } + // Migrate mode rewrites local state in-place and does not call resource + // adapters, so there are no operations to report. Reject the combination + // rather than silently dropping migrations from the DMS view. + if migrateMode && b.OperationReporter != nil { + logdiag.LogError(ctx, errors.New("migration is not supported with the deployment metadata service")) + return + } + if len(plan.Plan) == 0 { // Avoid creating state file if nothing to deploy return @@ -84,7 +92,19 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa logdiag.LogError(ctx, fmt.Errorf("%s: Unexpected delete action during migration", errorPrefix)) return false } + + // Capture the resource ID before Destroy clears it from state so + // the DMS report carries a usable resource_id. + var deleteResourceID string + if b.OperationReporter != nil { + deleteResourceID = b.StateDB.GetResourceID(resourceKey) + } + err = d.Destroy(ctx, &b.StateDB) + if reportErr := b.reportOperation(ctx, resourceKey, deleteResourceID, action, err, nil); reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false @@ -124,6 +144,30 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa err = d.Deploy(ctx, &b.StateDB, sv.Value, action, entry) } + // Report the operation outcome to DMS (if enabled). On success + // we report the post-deploy resource ID + the new local state; + // on failure we report the action and error message and omit + // the state payload (matches OPERATION_STATUS_FAILED conventions). + // migrateMode is rejected up-front above when DMS is enabled, so + // it's safe to always go through reportOperation here. + if b.OperationReporter != nil { + var reportState json.RawMessage + var reportID string + if err == nil { + reportID = b.StateDB.GetResourceID(resourceKey) + stateBytes, marshalErr := json.Marshal(sv.Value) + if marshalErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: marshalling state for report: %w", errorPrefix, marshalErr)) + return false + } + reportState = stateBytes + } + if reportErr := b.reportOperation(ctx, resourceKey, reportID, action, err, reportState); reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } + } + if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false @@ -189,6 +233,23 @@ func (b *DeploymentBundle) LookupReferencePostDeploy(ctx context.Context, path * return structaccess.Get(remoteState, fieldPath) } +// reportOperation forwards a single resource operation outcome to the +// configured OperationReporter (if any). Returning nil when the reporter is +// unset lets callers invoke this unconditionally and keeps the DMS-off path +// identical to before. +func (b *DeploymentBundle) reportOperation( + ctx context.Context, + resourceKey, resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, +) error { + if b.OperationReporter == nil { + return nil + } + return b.OperationReporter(ctx, resourceKey, resourceID, action, operationErr, state) +} + func jsonDump(obj any) string { bytes, err := json.MarshalIndent(obj, "", " ") if err != nil { diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index 5c543619b06..0fa8aceda06 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -39,6 +39,10 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { // ValidatePlanAgainstState validates that a plan's lineage and serial match the given state. // If the plan has no lineage (first deployment), validation is skipped. +// +// Serialized plans are not yet supported with the deployment metadata +// service. When that lands, an equivalent check will be needed against the +// server-side deployment_id + version_id pair. func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error { if plan.Lineage == "" { return nil diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index 48a9c5a2ff7..9b11f0e7476 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -2,6 +2,7 @@ package direct import ( "context" + "encoding/json" "fmt" "reflect" "sync" @@ -37,6 +38,20 @@ type DeploymentUnit struct { DependsOn []deployplan.DependsOnEntry } +// OperationReporter is called after each resource Create/Update/Delete to +// report the outcome to the deployment metadata service (DMS). state holds +// the post-operation local config and is nil for deletes and failed +// operations. Returns an error when the reporter cannot deliver the event +// (e.g. context cancelled); a non-nil error fails the deploy. +type OperationReporter func( + ctx context.Context, + resourceKey string, + resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, +) error + // DeploymentBundle holds everything needed to deploy a bundle type DeploymentBundle struct { StateDB dstate.DeploymentState @@ -44,6 +59,12 @@ type DeploymentBundle struct { Plan *deployplan.Plan RemoteStateCache sync.Map StateCache structvar.Cache + + // OperationReporter, when non-nil, is called inline after each successful + // or failed Create/Update/Delete to report the outcome to DMS. Apply + // leaves this nil when the DMS gate is off, so the file-state path is + // unaffected. + OperationReporter OperationReporter } // SetRemoteState updates the remote state with type validation and marks as fresh. diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 00000000000..004f22c495b --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,30 @@ +package env + +import ( + "context" + + envlib "github.com/databricks/cli/libs/env" +) + +// managedStateVariable names the environment variable that controls whether +// server-managed state (via the deployment metadata service) is used for +// locking and, in a future change, resource state management. +// +// The variable is treated as a boolean and accepts the usual spellings: +// "true"/"false", "1"/"0", "yes"/"no", "on"/"off" (case-insensitive). An +// empty or absent value falls back to the historical filesystem-based +// behavior. +const managedStateVariable = "DATABRICKS_BUNDLE_MANAGED_STATE" + +// ManagedState returns the raw value of DATABRICKS_BUNDLE_MANAGED_STATE if +// set. Callers that only need a bool should use IsManagedState. +func ManagedState(ctx context.Context) (string, bool) { + return get(ctx, []string{managedStateVariable}) +} + +// IsManagedState reports whether the DATABRICKS_BUNDLE_MANAGED_STATE +// environment variable is set to a truthy value. +func IsManagedState(ctx context.Context) bool { + v, ok := envlib.GetBool(ctx, managedStateVariable) + return ok && v +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index 48ba7755714..943d4902cd1 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -23,13 +23,24 @@ import ( func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions, engine engine.EngineType) { log.Info(ctx, "Phase: bind") - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalBind) + if err != nil { + logdiag.LogError(ctx, err) + return + } + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalBind)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { @@ -119,13 +130,24 @@ func jsonDump(ctx context.Context, v any, field string) string { func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, resourceKey string, engine engine.EngineType) { log.Info(ctx, "Phase: unbind") - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalUnbind) + if err != nil { + logdiag.LogError(ctx, err) + return + } + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalUnbind)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 1db4b2e02da..f0cceda3c92 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -125,19 +125,31 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand // Core mutators that CRUD resources and modify deployment state. These // mutators need informed consent if they are potentially destructive. - bundle.ApplySeqContext(ctx, b, - scripts.Execute(config.ScriptPreDeploy), - lock.Acquire(), - ) - + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) if logdiag.HasError(ctx) { // lock is not acquired here return } + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDeploy) + if err != nil { + logdiag.LogError(ctx, err) + return + } + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } + // lock is acquired here defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() uploadLibraries(ctx, b, libs) diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 98e6f7fee2a..27ddc2cb725 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -120,13 +120,24 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl, err := lock.NewDeploymentLock(ctx, b, lock.GoalDestroy) + if err != nil { + logdiag.LogError(ctx, err) + return + } + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if !engine.IsDirect() { diff --git a/bundle/statemgmt/managed_service_json.go b/bundle/statemgmt/managed_service_json.go new file mode 100644 index 00000000000..09d9703d618 --- /dev/null +++ b/bundle/statemgmt/managed_service_json.go @@ -0,0 +1,18 @@ +package statemgmt + +// ManagedServiceFileName is the workspace state-directory file that records the +// deployment metadata service (DMS) deployment_id for the bundle. The file +// pins this bundle to a server-side deployment record across CLI invocations. +// It is created by the lock package after the first CreateDeployment succeeds +// and is read by both the lock package (when re-acquiring the lock) and the +// statemgmt package (when loading resource state from DMS). +// +// resources.json continues to be written by the deploy path so that operators +// who turn DATABRICKS_BUNDLE_MANAGED_STATE off again still have a usable local +// state file. The fallback path is intentional, not accidental. +const ManagedServiceFileName = "managed_service.json" + +// ManagedServiceJSON is the on-disk shape of ManagedServiceFileName. +type ManagedServiceJSON struct { + DeploymentID string `json:"deployment_id"` +} diff --git a/bundle/statemgmt/state_dms.go b/bundle/statemgmt/state_dms.go new file mode 100644 index 00000000000..5804e35bcb1 --- /dev/null +++ b/bundle/statemgmt/state_dms.go @@ -0,0 +1,64 @@ +package statemgmt + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/direct/dstate" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// LoadStateFromDMS populates the in-memory DeploymentState DB from the +// deployment metadata service for the deployment identified by +// b.DeploymentID. The state is opened in read mode via OpenWithData; the +// historical local resources.json file is never touched on the DMS path. +// +// The path passed to OpenWithData is the local resources.json path: it is +// only used for diagnostics and as the eventual write target if the deploy +// path later upgrades the state to write mode (step 5 territory; today no +// callers do that under DMS). +// +// When b.DeploymentID is empty the function is a no-op: this is the +// "DMS enabled but no prior deployment" case, where the state is genuinely +// empty until the lock package creates the deployment. +func LoadStateFromDMS(ctx context.Context, b *bundle.Bundle) error { + if b.DeploymentID == "" { + // Initialize an empty state so subsequent reads (e.g. ExportState) + // don't panic on an unopened DB. + _, localPath := b.StateFilenameDirect(ctx) + b.DeploymentBundle.StateDB.OpenWithData(localPath, dstate.NewDatabase("", 0)) + return nil + } + + w := b.WorkspaceClient(ctx) + resources, err := w.Bundle.ListResourcesAll(ctx, sdkbundle.ListResourcesRequest{ + Parent: "deployments/" + b.DeploymentID, + }) + if err != nil { + return fmt.Errorf("failed to list resources from deployment metadata service: %w", err) + } + + data := dstate.NewDatabase("", 0) + for _, r := range resources { + // DMS reports resource keys without the "resources." prefix (e.g. + // "jobs.foo"); the local state DB uses the fully-qualified form + // ("resources.jobs.foo") as its map key, so prepend it here. + stateKey := "resources." + r.ResourceKey + + var stateBytes json.RawMessage + if r.State != nil { + stateBytes = *r.State + } + + data.State[stateKey] = dstate.ResourceEntry{ + ID: r.ResourceId, + State: stateBytes, + } + } + + _, localPath := b.StateFilenameDirect(ctx) + b.DeploymentBundle.StateDB.OpenWithData(localPath, data) + return nil +} diff --git a/bundle/statemgmt/state_dms_test.go b/bundle/statemgmt/state_dms_test.go new file mode 100644 index 00000000000..e1535bd7edf --- /dev/null +++ b/bundle/statemgmt/state_dms_test.go @@ -0,0 +1,100 @@ +package statemgmt + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// newTestBundle returns a Bundle wired with a mock workspace client and the +// workspace fields that StateFilenameDirect needs (CurrentUser, RootPath). +func newTestBundle(t *testing.T) (*bundle.Bundle, *mocks.MockWorkspaceClient) { + b := &bundle.Bundle{ + BundleRootPath: t.TempDir(), + Config: config.Root{ + Bundle: config.Bundle{Target: "default"}, + Workspace: config.Workspace{ + StatePath: "/Workspace/Users/me@example.com/.bundle/test/default/state", + }, + }, + } + m := mocks.NewMockWorkspaceClient(t) + b.SetWorkpaceClient(m.WorkspaceClient) + return b, m +} + +// TestLoadStateFromDMS_NoDeploymentID covers the "DMS enabled but the +// workspace has no managed_service.json yet" case: the state DB should be +// initialised empty (no panic on later ExportState) and no API call should be +// made. +func TestLoadStateFromDMS_NoDeploymentID(t *testing.T) { + b, _ := newTestBundle(t) + // b.DeploymentID stays empty. + + err := LoadStateFromDMS(t.Context(), b) + require.NoError(t, err) + + // State is initialised but empty — Export should succeed and return no entries. + got := b.DeploymentBundle.ExportState(t.Context()) + assert.Empty(t, got) +} + +// TestLoadStateFromDMS_PopulatesFromList confirms that resources reported by +// ListResources land in the in-memory state DB under the fully-qualified +// "resources." form, and that per-resource State payloads are preserved +// verbatim. +func TestLoadStateFromDMS_PopulatesFromList(t *testing.T) { + b, m := newTestBundle(t) + b.DeploymentID = "dep-123" + + jobState := json.RawMessage(`{"name":"my-job","max_concurrent_runs":1}`) + mockBundle := m.GetMockBundleAPI() + mockBundle.EXPECT(). + ListResourcesAll(mock.Anything, sdkbundle.ListResourcesRequest{ + Parent: "deployments/dep-123", + }). + Return([]sdkbundle.Resource{ + {ResourceKey: "jobs.foo", ResourceId: "1001", State: &jobState}, + // State omitted — exercises the nil-state path. + {ResourceKey: "pipelines.bar", ResourceId: "p-1"}, + }, nil) + + err := LoadStateFromDMS(t.Context(), b) + require.NoError(t, err) + + job, ok := b.DeploymentBundle.StateDB.GetResourceEntry("resources.jobs.foo") + require.True(t, ok) + assert.Equal(t, "1001", job.ID) + assert.JSONEq(t, string(jobState), string(job.State)) + + pipeline, ok := b.DeploymentBundle.StateDB.GetResourceEntry("resources.pipelines.bar") + require.True(t, ok) + assert.Equal(t, "p-1", pipeline.ID) + assert.Empty(t, pipeline.State) +} + +// TestLoadStateFromDMS_ListError checks that an underlying API failure is +// wrapped and surfaced rather than swallowed (otherwise the deploy would +// proceed against an empty in-memory view and treat everything as a create). +func TestLoadStateFromDMS_ListError(t *testing.T) { + b, m := newTestBundle(t) + b.DeploymentID = "dep-456" + + mockBundle := m.GetMockBundleAPI() + mockBundle.EXPECT(). + ListResourcesAll(mock.Anything, mock.Anything). + Return(nil, errors.New("boom")) + + err := LoadStateFromDMS(t.Context(), b) + require.Error(t, err) + assert.ErrorContains(t, err, "boom") + assert.ErrorContains(t, err, "failed to list resources from deployment metadata service") +} diff --git a/bundle/statemgmt/state_pull.go b/bundle/statemgmt/state_pull.go index 7e62bb84967..8fef9ef5957 100644 --- a/bundle/statemgmt/state_pull.go +++ b/bundle/statemgmt/state_pull.go @@ -16,6 +16,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -219,6 +220,23 @@ func readStates(ctx context.Context, b *bundle.Bundle, alwaysPull AlwaysPull) [] directLocalState := localRead(ctx, localPathDirect, engine.EngineDirect) terraformLocalState := localRead(ctx, localPathTerraform, engine.EngineTerraform) + // When the deployment metadata service is enabled, resource state lives on + // the server. Pull only the deployment-id pointer from the workspace; the + // state itself is fetched later via LoadStateFromDMS. Returning nil here + // causes PullResourcesState to fall through to its "no states found" path, + // which yields an empty StateDesc — exactly what we want, since DMS state + // is loaded separately and does not participate in the local/remote + // lineage-and-serial winner-picking. + if env.IsManagedState(ctx) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + logdiag.LogError(ctx, err) + return nil + } + b.DeploymentID = readDeploymentID(ctx, f) + return nil + } + if (directLocalState == nil && terraformLocalState == nil) || alwaysPull { f, err := deploy.StateFiler(ctx, b) if err != nil { @@ -294,6 +312,38 @@ func logStatesWarning(ctx context.Context, msg string, states []*StateDesc) { logStatesDiag(ctx, diag.Warning, msg, states) } +// readDeploymentID returns the DMS deployment_id stored in the workspace +// state directory's managed_service.json. An absent file or a missing/empty +// deployment_id is not an error: it means "no prior deployment exists yet" +// and the caller should proceed with an empty state. Read or parse failures +// other than fs.ErrNotExist are logged at debug level and ignored — they are +// recoverable on the next CLI invocation (the lock package will rewrite the +// file after CreateDeployment). +func readDeploymentID(ctx context.Context, f filer.Filer) string { + reader, err := f.Read(ctx, ManagedServiceFileName) + if errors.Is(err, fs.ErrNotExist) { + return "" + } + if err != nil { + log.Debugf(ctx, "Failed to read %s for deployment ID: %v", ManagedServiceFileName, err) + return "" + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + log.Debugf(ctx, "Failed to read %s content: %v", ManagedServiceFileName, err) + return "" + } + + var sj ManagedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + log.Debugf(ctx, "Failed to parse %s: %v", ManagedServiceFileName, err) + return "" + } + return sj.DeploymentID +} + func logStatesDiag(ctx context.Context, severity diag.Severity, msg string, states []*StateDesc) { var stateStrs []string for _, state := range states { diff --git a/bundle/statemgmt/state_push.go b/bundle/statemgmt/state_push.go index f098e8a07cc..ce225068d99 100644 --- a/bundle/statemgmt/state_push.go +++ b/bundle/statemgmt/state_push.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -16,7 +17,16 @@ import ( ) // PushResourcesState uploads the local state file to the remote location. +// When the deployment metadata service is enabled the call is a no-op: DMS +// owns resource state on the server, so there is no local state file worth +// uploading. (Step 5 will stop the deploy path from writing local state in +// the first place; until then the local file is written and then ignored.) func PushResourcesState(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { + if env.IsManagedState(ctx) { + log.Debugf(ctx, "Skipping state push: DATABRICKS_BUNDLE_MANAGED_STATE is set, DMS owns the state") + return + } + f, err := deploy.StateFiler(ctx, b) if err != nil { logdiag.LogError(ctx, err) diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index 5f43cff6acd..ae9c222eb4f 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" "github.com/databricks/cli/bundle/direct/dstate" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/cmd/root" @@ -188,10 +189,21 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle // Open direct engine state once for all subsequent operations (ExportState, CalculatePlan, Apply, etc.) needDirectState := stateDesc.Engine.IsDirect() && (opts.InitIDs || opts.ErrorOnEmptyState || opts.Deploy || opts.ReadPlanPath != "" || opts.PreDeployChecks || opts.PostStateFunc != nil) if needDirectState { - _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { - logdiag.LogError(ctx, err) - return b, stateDesc, root.ErrAlreadyPrinted + if env.IsManagedState(ctx) { + // Under DMS the StateDB is populated from the server, not from + // the local file. b.DeploymentID may be empty here (no prior + // deployment); LoadStateFromDMS handles that by initialising + // an empty in-memory DB. + if err := statemgmt.LoadStateFromDMS(ctx, b); err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } + } else { + _, localPath := b.StateFilenameDirect(ctx) + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } } } @@ -233,6 +245,16 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle logdiag.LogError(ctx, errors.New("--plan is only supported with direct engine (set bundle.engine to \"direct\" or DATABRICKS_BUNDLE_ENGINE=direct)")) return b, stateDesc, root.ErrAlreadyPrinted } + if env.IsManagedState(ctx) { + // --plan persists a serial/lineage snapshot from the local state + // DB and validates it on apply. Under DMS the server is the + // single source of truth and acquires its own version-based lock + // on CreateVersion, so the local snapshot is meaningless. Reject + // the flag explicitly rather than silently accept it and risk an + // inconsistent deploy. + logdiag.LogError(ctx, errors.New("--plan is not supported with the deployment metadata service")) + return b, stateDesc, root.ErrAlreadyPrinted + } opts.Build = false opts.PreDeployChecks = false diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 00000000000..277dd1e112e --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,405 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + sdktime "github.com/databricks/databricks-sdk-go/common/types/time" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// deploymentMetadata holds in-memory state for the deployment metadata +// service. One instance lives inside each FakeWorkspace so tests can drive +// CRUD against the DMS routes the same way they drive jobs/apps/etc. +type deploymentMetadata struct { + // deployments keyed by deployment_id. + deployments map[string]sdkbundle.Deployment + + // versions keyed by "deploymentId/versionId". + versions map[string]sdkbundle.Version + + // operations keyed by "deploymentId/versionId/resourceKey". + operations map[string]sdkbundle.Operation + + // resources keyed by "deploymentId/resourceKey". + resources map[string]sdkbundle.Resource + + // lockHolder maps deploymentId -> the full version name that holds the + // lock (e.g. "deployments/{id}/versions/{vid}"). Absent when no lock is + // held. + lockHolder map[string]string + + // lockExpiry maps deploymentId -> when the lock expires; checked against + // time.Now() on every lock-acquiring or lock-respecting call. + lockExpiry map[string]time.Time +} + +func newDeploymentMetadata() *deploymentMetadata { + return &deploymentMetadata{ + deployments: map[string]sdkbundle.Deployment{}, + versions: map[string]sdkbundle.Version{}, + operations: map[string]sdkbundle.Operation{}, + resources: map[string]sdkbundle.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +// lockDuration matches the real service's default lease so heartbeat-renewal +// tests have a comfortable margin. +const lockDuration = 2 * time.Minute + +func nowPtr() *sdktime.Time { + return sdktime.New(time.Now().UTC()) +} + +func toSDKTime(t time.Time) *sdktime.Time { + return sdktime.New(t.UTC()) +} + +func badRequest(msg string) Response { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": msg}, + } +} + +func notFound(msg string) Response { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": msg}, + } +} + +func aborted(msg string) Response { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": msg}, + } +} + +// DeploymentMetadataCreateDeployment is mounted at +// POST /api/2.0/bundle/deployments. The SDK sends the inner Deployment as the +// body and passes deployment_id as a query parameter. +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + deploymentID := req.URL.Query().Get("deployment_id") + if deploymentID == "" { + return badRequest("deployment_id is required") + } + + var body sdkbundle.Deployment + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ALREADY_EXISTS", + "message": fmt.Sprintf("deployment %s already exists", deploymentID), + }, + } + } + + now := nowPtr() + dep := sdkbundle.Deployment{ + Name: "deployments/" + deploymentID, + DisplayName: deploymentID, + TargetName: body.TargetName, + Status: sdkbundle.DeploymentStatusDeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: now, + UpdateTime: now, + } + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataGetDeployment is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.deploymentMetadata.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + return Response{Body: dep} +} + +// DeploymentMetadataDeleteDeployment is mounted at +// DELETE /api/2.0/bundle/deployments/{deployment_id}. +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + now := nowPtr() + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusDeleted + dep.DestroyTime = now + dep.DestroyedBy = s.CurrentUser().UserName + dep.UpdateTime = now + state.deployments[deploymentID] = dep + return Response{Body: dep} +} + +// DeploymentMetadataCreateVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions. Body = Version, +// query = version_id. Validates monotonic version IDs and enforces the +// deployment-level lock. +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + dep, ok := state.deployments[deploymentID] + if !ok { + return notFound(fmt.Sprintf("deployment %s not found", deploymentID)) + } + + versionID := req.URL.Query().Get("version_id") + if versionID == "" { + return badRequest("version_id is required") + } + + var body sdkbundle.Version + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + // Enforce monotonic versions: version_id must equal last_version_id + 1. + expected := "1" + if dep.LastVersionId != "" { + n, err := strconv.ParseInt(dep.LastVersionId, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "stored last_version_id is not a valid number: " + dep.LastVersionId, + }, + } + } + expected = strconv.FormatInt(n+1, 10) + } + if versionID != expected { + return aborted(fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", + expected, versionID)) + } + + // Enforce lock: if a lock is held and not expired, reject. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if exp, ok := state.lockExpiry[deploymentID]; ok && exp.After(now) { + return aborted(fmt.Sprintf("deployment is locked by %s until %s", + holder, exp.Format(time.RFC3339))) + } + } + + versionKey := deploymentID + "/" + versionID + createTime := toSDKTime(now) + version := sdkbundle.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionId: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: createTime, + Status: sdkbundle.VersionStatusVersionStatusInProgress, + CliVersion: body.CliVersion, + VersionType: body.VersionType, + TargetName: body.TargetName, + } + state.versions[versionKey] = version + + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = now.Add(lockDuration) + + dep.LastVersionId = versionID + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusInProgress + dep.UpdateTime = createTime + state.deployments[deploymentID] = dep + + return Response{Body: version} +} + +// DeploymentMetadataGetVersion is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}. +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + versionKey := deploymentID + "/" + versionID + version, ok := s.deploymentMetadata.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + return Response{Body: version} +} + +// DeploymentMetadataHeartbeat is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat. +// Validates the version is in-progress and holds the lock, then resets the +// lock expiry. +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(_ Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is no longer in progress") + } + + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return aborted("lock is not held by this version") + } + + now := time.Now().UTC() + expiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = expiry + return Response{Body: sdkbundle.HeartbeatResponse{ExpireTime: toSDKTime(expiry)}} +} + +// DeploymentMetadataCompleteVersion is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete. +// Tests can inject a simulated failure by setting the deployment's target_name +// to "fail-complete": the endpoint returns a 500 so the caller exercises its +// "lock release failed" path. +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + if dep, ok := state.deployments[deploymentID]; ok && dep.TargetName == "fail-complete" { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{ + "error_code": "INTERNAL_ERROR", + "message": "simulated complete version failure", + }, + } + } + + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return notFound(fmt.Sprintf("version %s not found", versionKey)) + } + if version.Status != sdkbundle.VersionStatusVersionStatusInProgress { + return aborted("version is already completed") + } + + var body sdkbundle.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + + now := nowPtr() + version.Status = sdkbundle.VersionStatusVersionStatusCompleted + version.CompleteTime = now + version.CompletionReason = body.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + if dep, ok := state.deployments[deploymentID]; ok { + switch body.CompletionReason { + case sdkbundle.VersionCompleteVersionCompleteSuccess: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusActive + case sdkbundle.VersionCompleteVersionCompleteFailure, + sdkbundle.VersionCompleteVersionCompleteForceAbort, + sdkbundle.VersionCompleteVersionCompleteLeaseExpired: + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusFailed + } + dep.UpdateTime = now + state.deployments[deploymentID] = dep + } + + return Response{Body: version} +} + +// DeploymentMetadataCreateOperation is mounted at +// POST /api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations. +// Records the operation and upserts the deployment-level Resource so a +// follow-up ListResources sees the merged view. +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + resourceKey := req.URL.Query().Get("resource_key") + if resourceKey == "" { + return badRequest("resource_key is required") + } + + var body sdkbundle.Operation + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &body); err != nil { + return badRequest(fmt.Sprintf("invalid request: %s", err)) + } + } + + now := nowPtr() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + op := sdkbundle.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: now, + ActionType: body.ActionType, + State: body.State, + ResourceId: body.ResourceId, + Status: body.Status, + ErrorMessage: body.ErrorMessage, + } + state.operations[opKey] = op + + resKey := deploymentID + "/" + resourceKey + state.resources[resKey] = sdkbundle.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + State: body.State, + ResourceId: body.ResourceId, + LastActionType: body.ActionType, + LastVersionId: versionID, + } + + return Response{Body: op} +} + +// DeploymentMetadataListResources is mounted at +// GET /api/2.0/bundle/deployments/{deployment_id}/resources. +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []sdkbundle.Resource + for key, r := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, r) + } + } + if resources == nil { + resources = []sdkbundle.Resource{} + } + return Response{Body: sdkbundle.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index 4870e25e07c..07876994219 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -185,6 +185,10 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + // deploymentMetadata is the in-memory bundle deployment metadata service + // state. Accessed via the DeploymentMetadata* methods. + deploymentMetadata *deploymentMetadata } func (s *FakeWorkspace) LockUnlock() func() { @@ -314,6 +318,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadata(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index ed405470f20..0225f47311e 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -993,4 +993,33 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Bundle deployment metadata service. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) }