[flink-action][server][client] add orphan files cleanup action for remote storage#3404
[flink-action][server][client] add orphan files cleanup action for remote storage#3404platinumhamburg wants to merge 4 commits into
Conversation
9b95b89 to
9bd732b
Compare
9bd732b to
901dc41
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a Flink-based orphan_files_clean action to identify and delete orphaned remote-storage artifacts (log segments/manifests and KV snapshot files) by adding new coordinator read-only RPCs to enumerate the active reference set and wiring client/server support for those RPCs.
Changes:
- Add new RPCs
LIST_REMOTE_LOG_MANIFESTSandLIST_KV_SNAPSHOTS(proto + api keys), implement them inCoordinatorService, and expose them via the clientAdmin. - Add new Flink action module + SPI loader/entrypoint and implement the orphan cleanup DAG (scope enumeration → scan/clean → stats aggregation + empty-dir sweep) with rule-based file classification and audit logging.
- Extend filesystem metadata support (modification time) to enable safe age-based deletion, and add unit/integration tests for the new behavior.
Reviewed changes
Copilot reviewed 82 out of 82 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-test-coverage/pom.xml | Excludes newly introduced Flink action entry/SPI classes from coverage instrumentation. |
| fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java | Adds stub methods for the new list RPCs in the tablet gateway test implementation. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java | Adds stub methods for the new list RPCs in the coordinator gateway test implementation. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServiceOrphanRpcsITCase.java | Adds IT coverage for coordinator orphan-cleanup RPCs (manifests + snapshot listing). |
| fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java | Adds ZK helper methods to list remote log manifest handles and bucket snapshot IDs. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java | Rejects orphan-cleanup RPCs on tablet servers (coordinator-only RPCs). |
| fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java | Adds API to expose active snapshot IDs (retained ∪ still-in-use) for listing RPCs. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java | Exposes lease-pinned snapshot IDs to support “still-in-use” snapshot reporting. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Implements listRemoteLogManifests and listKvSnapshots coordinator RPC handlers. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java | Adjusts visibility/testing annotation around snapshot store manager accessor. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java | Adds per-bucket active snapshot ID computation with ZK fallback when store isn’t in-memory. |
| fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java | Adds stub methods for the new list RPCs in RPC test scaffolding. |
| fluss-rpc/src/main/proto/FlussApi.proto | Defines new request/response messages for manifest/snapshot listing. |
| fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java | Registers new API keys for the orphan-cleanup list RPCs. |
| fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java | Adds gateway methods for the two new read-only list RPCs. |
| fluss-flink/pom.xml | Adds the new fluss-flink-action module. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/adapter/FlinkMultipleParameterToolTest.java | Extends adapter tests for new convenience accessors. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/RuleDispatcherTest.java | Adds rule dispatch coverage for orphan-cleanup file classification. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/OrphanDirDetectorTest.java | Adds tests for orphan table/partition directory detection by ID guards. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/LogSegmentRuleTest.java | Adds log segment rule tests (active-set + cutoff semantics). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/LogManifestRuleTest.java | Adds manifest rule tests (default conservative + opt-in deletion). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/KvSnapshotFileRuleTest.java | Adds KV snapshot file rule tests (active snap dirs + cutoff semantics). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/rule/KvSharedSstRuleTest.java | Adds tests ensuring shared SSTs are never deleted. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/RpcErrorClassifierTest.java | Adds tests for stable RPC error categorization used in audit logs. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/job/EmptyDirSweeperTest.java | Adds tests for post-clean empty directory sweeping (dry-run + bottom-up). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/fs/SafeDeleterTest.java | Adds tests for safe deletion behavior (dry-run, non-empty dir no-op, etc.). |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/config/OrphanCleanConfigTest.java | Adds CLI config parsing/validation tests for orphan cleanup action. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/build/MaxKnownIdsTrackerTest.java | Adds tests for max-known ID tracking used for orphan directory guard logic. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/action/orphan/build/ActiveRefsFetcherTest.java | Adds tests for manifest/snapshot active-set fetching (retries + per-bucket failures). |
| fluss-flink/fluss-flink-common/src/main/resources/META-INF/services/org.apache.fluss.flink.action.ActionFactory | Registers OrphanFilesCleanActionFactory via ServiceLoader SPI. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java | Adds convenience accessors (has/get/getMultiParameter) for CLI parsing. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/RuleId.java | Introduces stable rule identifiers for audit tagging. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/RuleDispatcher.java | Implements rule dispatch based on path patterns (log/kv/manifest/shared/unknown). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/OrphanDirDetector.java | Implements orphan table/partition dir detection via parsed ID + max-known guards. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/LogSegmentRule.java | Implements log segment deletion decisions using active refs + cutoff + orphan-dir mode. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/LogManifestRule.java | Implements conservative manifest handling with opt-in deletion behavior. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/KvSnapshotFileRule.java | Implements KV snapshot file classification using active snapshot dir names. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/KvSharedSstRule.java | Implements “never delete” policy for shared SST files. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/FileRule.java | Defines the rule interface for single-file decisions. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/FileMeta.java | Adds immutable file metadata container for rule evaluation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/Decision.java | Adds decision vocabulary for cleanup (DELETE/KEEP/DEFER/SKIP_UNKNOWN). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/rule/BucketActiveRefs.java | Adds immutable bucket-scoped active reference sets for log+kv+manifest paths. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/RpcErrorClassifier.java | Adds stable classification of RPC failures for audit/reporting logic. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/OrphanFilesCleanActionFactory.java | Adds factory for the orphan_files_clean action and CLI help text. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/OrphanFilesCleanAction.java | Adds the action runner that executes the Flink job and logs final stats. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/OrphanCleanUtils.java | Adds shared utilities (paths, remote dir resolution, safe listing helpers). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/StatsAggregateOperator.java | Adds custom bounded operator to aggregate stats and run the empty-dir sweep. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/ScanAndCleanFunction.java | Implements stage-2 FS scan & cleanup with per-subtask rate limiting. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/OrphanFilesCleanJob.java | Builds and executes the 3-stage Flink batch DAG and returns final stats. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/OrphanDirCleanTask.java | Adds task type for cleaning an orphan table/partition directory. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/EmptyDirSweeper.java | Adds end-of-run empty directory reclamation logic (dry-run aware). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/CleanTask.java | Adds marker interface for stage-1 emitted work items. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/CleanStats.java | Adds aggregatable stats object including “touched dirs” for sweeping. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/BucketCleanTask.java | Adds task type carrying bucket dirs + active refs for file-level cleanup. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/job/BucketCleaner.java | Adds per-bucket directory walker applying rules and safe deletion. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/fs/SafeDeleter.java | Centralizes deletion operations with dry-run + rate limiting + audit logging. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/config/OrphanCleanConfig.java | Adds CLI configuration parsing and validation for the orphan cleanup action. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/RpcListStatus.java | Adds shared per-target RPC list status representation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/MaxKnownIdsTracker.java | Adds per-run max-known ID tracking used for orphan dir guards. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/LogActiveRefsFetchResult.java | Adds detailed per-target/per-bucket log manifest read results. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/KvActiveRefsFetchResult.java | Adds per-target KV active snapshot dir fetch result representation. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/build/ActiveRefsFetcher.java | Implements coordinator-RPC-driven active ref fetching with retries and parsing. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/orphan/audit/AuditLogger.java | Adds structured audit logger for the cleanup action. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/ActionLoader.java | Adds ServiceLoader-based action discovery and CLI dispatch. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/ActionFactory.java | Adds SPI interface for action factories. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/action/Action.java | Adds base action interface (build + run). |
| fluss-flink/fluss-flink-action/src/main/java/org/apache/fluss/flink/action/FlussFlinkActionEntrypoint.java | Adds the main entrypoint for the Flink action shaded jar. |
| fluss-flink/fluss-flink-action/pom.xml | Introduces new shaded Flink action jar module. |
| fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/action/orphan/Flink22OrphanFilesCleanITCase.java | Adds Flink 2.2 orphan cleanup IT case class. |
| fluss-flink/fluss-flink-2.2/src/main/java/org/apache/fluss/flink/adapter/MultipleParameterToolAdapter.java | Mirrors CLI adapter enhancements for Flink 2.2 variant. |
| fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/action/orphan/Flink20OrphanFilesCleanITCase.java | Adds Flink 1.20 orphan cleanup IT case class. |
| fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/action/orphan/Flink19OrphanFilesCleanITCase.java | Adds Flink 1.19 orphan cleanup IT case class. |
| fluss-filesystems/fluss-fs-hadoop/src/main/java/org/apache/fluss/fs/hdfs/HadoopFileStatus.java | Exposes HDFS modification time via FileStatus. |
| fluss-common/src/test/java/org/apache/fluss/fs/FileStatusTest.java | Adds test locking down fail-safe default for modification time. |
| fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java | Makes remote log metadata dir name constant public for reuse. |
| fluss-common/src/main/java/org/apache/fluss/fs/local/LocalFileStatus.java | Exposes local FS modification time via FileStatus. |
| fluss-common/src/main/java/org/apache/fluss/fs/FileStatus.java | Adds default getModificationTime() (fail-safe MAX_VALUE) to FileStatus interface. |
| fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java | Adds helper to convert TableBucket to PbTableBucket. |
| fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java | Implements client-side admin methods for the new list RPCs. |
| fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java | Extends Admin API with internal default methods for the new list RPCs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
901dc41 to
0a34c82
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
@platinumhamburg Thanks for the pr. Left minor comment. PTAL
swuferhong
left a comment
There was a problem hiding this comment.
Hi, @platinumhamburg Thanks for your great work, I left some comments.
- Move orphan RPCs from AdminReadOnlyGateway to AdminGateway - Fix CoordinatorContext thread safety via AccessContextEvent - Add partition ownership validation in orphan RPCs - Decouple Admin API from PB types with domain models - Catch IOException in SafeDeleter to prevent batch job failure - Skip shared SST directory listing in BucketCleaner - Pass extraConfigs to StatsAggregateOperator for FS init - Set maxParallelism(1) on single-parallelism operators - Rename FlussFlinkActionEntrypoint to FlussActionEntrypoint - Rename Flink19/Flink20 ITCase to Flink119/Flink120 - Remove unused methods and fix thread leak in test
1cb1a7e to
663fc5f
Compare
…rigger - ScanAndCleanFunction: change open(Configuration) to open(OpenContext) for Flink 2.x compatibility (Flink 2.x removed the Configuration overload) - FlussClusterExtension: triggerSnapshot() returns null on no-op instead of failing hard when snapshot ID does not advance (initSnapshot skips when logOffset <= lastSnapshotOffset) - triggerAndWaitSnapshots() silently skips null buckets (original behavior)
71892a7 to
1c61b9c
Compare
|
@swuferhong @luoyuxia Thanks for the review. All issues resolved — please re-review again when you have time. |
Purpose
Linked issue: close #3403 3403
Brief change log
Tests
API and Format
Documentation