<fix>[ceph]: cap & parallelize host conn check#4145
Conversation
Per-host ceph connectivity check shared one serial chain per primary storage. A stuck in-flight check (riding the default 1800s http timeout) blocked every other host's check on the same PS for ~30min. - raise check chain syncLevel 1 -> 10 so hosts run concurrently - give the check call a 5min timeout instead of default 1800s, honored via a per-message timeout on KVMHostAsyncHttpCallMsg Resolves: ZSTAC-85421 Change-Id: I717a746f676c6f776168646f6f776c64636e6870
概述此 PR 为 Ceph 主存储的主机存储连接检查添加了可配置的按消息粒度超时支持。在 KVM 主机的 HTTP 调用框架中实现了 timeout 字段和传播机制,Ceph 存储层定义了 5 分钟的检查超时常量并应用到异步调用,最后通过两个集成测试用例验证了并发检查的非串行化行为和超时强制执行。 变更主存储连接检查超时支持
序列图sequenceDiagram
participant Client
participant CephStorage
participant KVMHost
participant Http
participant REST
Client->>CephStorage: CheckHostStorageConnectionMsg
CephStorage->>CephStorage: ChainTask with syncLevel=10
CephStorage->>KVMHost: executeAsyncHttpCall(timeout=5min)
KVMHost->>Http: new Http().timeout(5min)
Http-->>KVMHost: configured
KVMHost->>REST: asyncJsonPost with timeout
REST-->>KVMHost: response or timeout
KVMHost-->>Client: MessageReply
代码审查工作量评估🎯 2 (简单) | ⏱️ ~12 分钟 诗
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.42.3)plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMHost.javaComment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java (1)
4904-4907: ⚡ Quick win把并发上限提取成具名常量。
这里的
10是本次修复的关键调参值,直接写裸值会让后续调优和排障时不够直观。建议像超时一样提成常量,例如CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL。As per coding guidelines “避免使用魔法值(Magic Value):直接使用未经定义的数值或字符串应替换为枚举或常量。”可参考的修改
+ private static final int CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL = 10; + `@Override` protected int getSyncLevel() { - return 10; + return CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java` around lines 4904 - 4907, Replace the magic number 10 in CephPrimaryStorageBase.getSyncLevel() with a named constant (e.g., CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL) to make the concurrency limit explicit and configurable; add a static final int CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL = 10 (near other class constants or top of the class) and return that constant from getSyncLevel() instead of the literal 10.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy`:
- Around line 159-163: 当前断言只验证“失败 + 耗时区间”,需要进一步收紧为超时错误:在
CephHostStorageCheckCase 的断言块(包含 reply、done、start 变量)中,除了 assert
!reply.get().isSuccess() 外,再断言返回的错误类型是 timeout(例如检查 reply.get().getError()
表示为超时,或 error.code/递归方法如 isTimeout() 为 true),以确保 setTimeout(3s) 被真正触发并返回超时错误。
- Around line 111-125: The test currently only checks that host1 was blocked
(host1Entered) and that host2 succeeded, but never asserts host1's final reply;
modify the host1 sendCheckMsg call to capture its MessageReply into an
AtomicReference (e.g., reply1), use reply1Done as the latch callback to set the
reference and count down, then after release.countDown() await reply1Done and
assert reply1.get().isSuccess() (mirroring the reply2 pattern for sendCheckMsg,
reply2, reply2Done).
---
Nitpick comments:
In
`@plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java`:
- Around line 4904-4907: Replace the magic number 10 in
CephPrimaryStorageBase.getSyncLevel() with a named constant (e.g.,
CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL) to make the concurrency limit explicit
and configurable; add a static final int
CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL = 10 (near other class constants or top
of the class) and return that constant from getSyncLevel() instead of the
literal 10.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)
Review profile: CHILL
Plan: Pro
Run ID: bd3d4d00-3965-4bc8-937c-6df87b34e69a
📒 Files selected for processing (3)
plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMHost.javatest/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
| CountDownLatch reply1Done = new CountDownLatch(1) | ||
| sendCheckMsg(ps.uuid, host1.uuid, { MessageReply r -> reply1Done.countDown() }) | ||
| assert host1Entered.await(10, TimeUnit.SECONDS) | ||
|
|
||
| AtomicReference<MessageReply> reply2 = new AtomicReference<>() | ||
| CountDownLatch reply2Done = new CountDownLatch(1) | ||
| sendCheckMsg(ps.uuid, host2.uuid, { MessageReply r -> reply2.set(r); reply2Done.countDown() }) | ||
|
|
||
| assert reply2Done.await(15, TimeUnit.SECONDS) | ||
| assert reply2.get().isSuccess() | ||
| assert reply1Done.getCount() == 1 | ||
|
|
||
| release.countDown() | ||
| assert reply1Done.await(15, TimeUnit.SECONDS) | ||
| } |
There was a problem hiding this comment.
补上 host1 最终结果断言。
这里现在只证明了 host2 没被串行化,但没有验证 host1 在放行后是否成功完成;如果实现变成“并发了但最终失败”,这个用例仍然会通过。建议像 reply2 一样保存 reply1 并断言成功。
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy`
around lines 111 - 125, The test currently only checks that host1 was blocked
(host1Entered) and that host2 succeeded, but never asserts host1's final reply;
modify the host1 sendCheckMsg call to capture its MessageReply into an
AtomicReference (e.g., reply1), use reply1Done as the latch callback to set the
reference and count down, then after release.countDown() await reply1Done and
assert reply1.get().isSuccess() (mirroring the reply2 pattern for sendCheckMsg,
reply2, reply2Done).
| assert done.await(20, TimeUnit.SECONDS) | ||
| long elapsed = System.currentTimeMillis() - start | ||
| assert !reply.get().isSuccess() | ||
| assert elapsed >= 2000 | ||
| assert elapsed < 20000 |
There was a problem hiding this comment.
把失败类型收紧到 timeout。
当前只断言“失败 + 耗时区间”,这会把路由错误、模拟器异常等非超时失败也算成通过,不能直接证明 setTimeout(3s) 被真正尊重。建议再断言返回的是 timeout 类错误。
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy`
around lines 159 - 163, 当前断言只验证“失败 + 耗时区间”,需要进一步收紧为超时错误:在
CephHostStorageCheckCase 的断言块(包含 reply、done、start 变量)中,除了 assert
!reply.get().isSuccess() 外,再断言返回的错误类型是 timeout(例如检查 reply.get().getError()
表示为超时,或 error.code/递归方法如 isTimeout() 为 true),以确保 setTimeout(3s) 被真正触发并返回超时错误。
问题
同一 ceph primary storage 下,每台 host 的存储连通性检查 (
CheckHostStorageConnectionMsg) 共用一条按 PS 串行的 chain(sync 签名只含 psUuid,不含 host)。当某台 host 的 in-flight check 卡住(吃默认 1800s http 超时),同 PS 其它所有 host 的 check 在队头被堵满 ~30min。现网表现:MN 重启后并发 reconnect,一台 host 的 ceph check 被孤立(kvmagent 重启 + drain 60s cushion 漏过),空等 1800s,拖垮同 PS 其它 host 的恢复。
修复(两条,正交)
getSyncLevel()由默认 1 提到 10,同 PS 不同 host 的 check 可并发,单台卡死只占 1/10 槽位,不再独占队头。KVMHostAsyncHttpCallMsg单独设 5min 超时(CHECK_HOST_STORAGE_CONNECTION_TIMEOUT),替代默认 1800s。通过新增的 per-message timeout 透传实现:KVMHost.executeAsyncHttpCall读msg.getTimeout()(复用NeedReplyMessage.timeout,-1=默认)传给内部Http,>0时覆盖timeoutManager.getTimeout()。合起来:最坏情况从「N 台 × 30min」压到「单台 5min、互不牵连」。
测试
新增 IT
CephHostStorageCheckCase(ceph PS + 2 host):testCheckNotSerializedAcrossHosts— 卡住 host1 的 check,断言 host2 的 check 仍并发返回、host1 仍 pendingtestPerMessageTimeoutHonored— 发带setTimeout(3s)的 KVMHostAsyncHttpCallMsg 到 stuck simulator,断言 ~3s 超时失败而非默认 1800s本地隔离 .m2 串行实跑:
Tests run: 1, Failures: 0, Errors: 0 — BUILD SUCCESSAsync Http Timeout ... timeout after 3000 MILLISECONDSreply2Done.await(15s)失败(host2 被串行堵死)→ BUILD FAILURE,证明测试守得住修复影响
NeedReplyMessage.timeout字段)Resolves: ZSTAC-85421
sync from gitlab !10049