From 57e9468a8bdb7896631ad6578ce14a8546617588 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Sat, 6 Jun 2026 05:47:38 +0100 Subject: [PATCH 1/5] fix: align sync status heuristic with leanSpec --- crates/blockchain/src/lib.rs | 131 ++++++++++++++++++++++++++++--- crates/blockchain/src/metrics.rs | 1 + 2 files changed, 123 insertions(+), 9 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 1a6fb766..449a3885 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -56,6 +56,48 @@ pub const MAX_ATTESTATIONS_DATA: usize = 16; /// /// See: leanSpec PR #682. pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1; +/// Local head lag beyond which the node is considered to be syncing. +/// +/// See: leanSpec PR #708. +const SYNC_LAG_THRESHOLD: u64 = 4; +/// Freshest-known block lag beyond which the network is considered stalled. +/// +/// During a network-wide stall the node remains synced so validators can help +/// the chain recover. +const NETWORK_STALL_THRESHOLD: u64 = 8; +/// Recovery band that prevents the sync status from flapping near the threshold. +const SYNC_HYSTERESIS_BAND: u64 = 2; + +#[derive(Default)] +struct SyncStatusTracker { + syncing: bool, +} + +impl SyncStatusTracker { + fn update( + &mut self, + current_slot: u64, + head_slot: u64, + max_seen_slot: u64, + ) -> metrics::SyncStatus { + let head_lag = current_slot.saturating_sub(head_slot); + let network_lag = current_slot.saturating_sub(max_seen_slot); + + if network_lag > NETWORK_STALL_THRESHOLD { + self.syncing = false; + } else if self.syncing { + self.syncing = head_lag > SYNC_LAG_THRESHOLD - SYNC_HYSTERESIS_BAND; + } else { + self.syncing = head_lag > SYNC_LAG_THRESHOLD; + } + + if self.syncing { + metrics::SyncStatus::Syncing + } else { + metrics::SyncStatus::Synced + } + } +} /// Milliseconds until the next interval boundary, measured relative to genesis. fn ms_until_next_interval(now_ms: u64, genesis_time_ms: u64) -> u64 { @@ -100,6 +142,7 @@ impl BlockChain { last_tick_instant: None, attestation_committee_count, pre_merge_coverage: None, + sync_status: SyncStatusTracker::default(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -164,6 +207,9 @@ pub struct BlockChainServer { /// single-threaded message loop, so no synchronization is needed. /// Observability-only. pre_merge_coverage: Option, + + /// Stateful sync heuristic used by `lean_node_sync_status`. + sync_status: SyncStatusTracker, } impl BlockChainServer { @@ -190,6 +236,7 @@ impl BlockChainServer { // Update current slot metric metrics::update_current_slot(slot); + self.update_sync_status(slot); // Snapshot the aggregator flag once per tick so all read sites within // the tick see a consistent value even if the admin API toggles it @@ -451,15 +498,6 @@ impl BlockChainServer { metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); - // Update sync status based on head slot vs wall clock slot - let current_slot = self.store.time() / INTERVALS_PER_SLOT; - let status = if head_slot >= current_slot { - metrics::SyncStatus::Synced - } else { - metrics::SyncStatus::Syncing - }; - metrics::set_node_sync_status(status); - for table in ALL_TABLES { metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table)); } @@ -677,6 +715,21 @@ impl BlockChainServer { let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); } + + fn update_sync_status(&mut self, current_slot: u64) { + let head_slot = self.store.head_slot(); + let max_seen_slot = self + .store + .get_live_chain() + .values() + .map(|(slot, _)| *slot) + .max() + .unwrap_or(head_slot); + let status = self + .sync_status + .update(current_slot, head_slot, max_seen_slot); + metrics::set_node_sync_status(status); + } } // Protocol trait for internal messages only (tick scheduling). @@ -826,3 +879,63 @@ impl Handler for BlockChainServer { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_status_allows_lag_through_threshold() { + let mut tracker = SyncStatusTracker::default(); + + for lag in 0..=SYNC_LAG_THRESHOLD { + assert_eq!( + tracker.update(10 + lag, 10, 10 + lag), + metrics::SyncStatus::Synced + ); + } + } + + #[test] + fn sync_status_detects_local_lag_when_fresh_blocks_are_known() { + let mut tracker = SyncStatusTracker::default(); + let current_slot = 10 + SYNC_LAG_THRESHOLD + 1; + + assert_eq!( + tracker.update(current_slot, 10, current_slot), + metrics::SyncStatus::Syncing + ); + } + + #[test] + fn sync_status_treats_stale_known_blocks_as_network_stall() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(100, 0, 0), metrics::SyncStatus::Synced); + } + + #[test] + fn sync_status_hysteresis_prevents_flapping() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 11, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 13, 15), metrics::SyncStatus::Synced); + } + + #[test] + fn network_stall_reopens_sync_status() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(20, 0, 20), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(30, 0, 20), metrics::SyncStatus::Synced); + } + + #[test] + fn future_head_saturates_lag_at_zero() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced); + } +} diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 8a8cbcf3..0bdd5535 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -491,6 +491,7 @@ static LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED: std::sync::LazyLock = // --- Sync Status --- /// Node synchronization status. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SyncStatus { Idle, Syncing, From fea607394800639c12d506b1cb3c9443455f75ce Mon Sep 17 00:00:00 2001 From: dicethedev Date: Sat, 6 Jun 2026 06:04:30 +0100 Subject: [PATCH 2/5] fix: skip validator duties while syncing --- crates/blockchain/src/lib.rs | 51 ++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 449a3885..d1950251 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -97,6 +97,14 @@ impl SyncStatusTracker { metrics::SyncStatus::Synced } } + + fn duties_allowed(&self) -> bool { + !self.syncing + } + + fn gate_proposer(&self, proposer: Option) -> Option { + proposer.filter(|_| self.duties_allowed()) + } } /// Milliseconds until the next interval boundary, measured relative to genesis. @@ -248,9 +256,16 @@ impl BlockChainServer { // At interval 0, check if we will propose (but don't build the block yet). // Tick forkchoice first to accept attestations, then build the block // using the freshly-accepted attestations. - let proposer_validator_id = (interval == 0 && slot > 0) + let scheduled_proposer = (interval == 0 && slot > 0) .then(|| self.get_our_proposer(slot)) .flatten(); + let proposer_validator_id = self.sync_status.gate_proposer(scheduled_proposer); + + if let Some(validator_id) = scheduled_proposer + && proposer_validator_id.is_none() + { + info!(%slot, %validator_id, "Skipping block proposal while syncing"); + } // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote // (interval 4), so the post-block report for this round sees its @@ -302,7 +317,11 @@ impl BlockChainServer { slot - 1, ); } - self.produce_attestations(slot, is_aggregator); + if self.sync_status.duties_allowed() { + self.produce_attestations(slot, is_aggregator); + } else if !self.key_manager.validator_ids().is_empty() { + info!(%slot, "Skipping attestations while syncing"); + } } // Update safe target slot metric (updated by store.on_tick at interval 3) @@ -938,4 +957,32 @@ mod tests { assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced); } + + #[test] + fn syncing_gates_proposals_and_attestations() { + let mut tracker = SyncStatusTracker::default(); + tracker.update(20, 0, 20); + + assert!(!tracker.duties_allowed()); + assert_eq!(tracker.gate_proposer(Some(3)), None); + } + + #[test] + fn caught_up_node_allows_proposals_and_attestations() { + let mut tracker = SyncStatusTracker::default(); + tracker.update(20, 0, 20); + tracker.update(20, 18, 20); + + assert!(tracker.duties_allowed()); + assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); + } + + #[test] + fn network_stall_keeps_proposals_and_attestations_enabled() { + let mut tracker = SyncStatusTracker::default(); + tracker.update(100, 0, 0); + + assert!(tracker.duties_allowed()); + assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); + } } From 57ce4fd9f303eb98505ca17c38a3af46e9a0df40 Mon Sep 17 00:00:00 2001 From: Blessing Samuel Date: Sat, 6 Jun 2026 06:08:44 +0100 Subject: [PATCH 3/5] Update crates/blockchain/src/lib.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/blockchain/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 449a3885..3928a7c6 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -720,10 +720,7 @@ impl BlockChainServer { let head_slot = self.store.head_slot(); let max_seen_slot = self .store - .get_live_chain() - .values() - .map(|(slot, _)| *slot) - .max() + .max_live_chain_slot() .unwrap_or(head_slot); let status = self .sync_status From d352e8e83b9959dab587c71597a0821fb7e8e2f0 Mon Sep 17 00:00:00 2001 From: Blessing Samuel Date: Sat, 6 Jun 2026 06:08:54 +0100 Subject: [PATCH 4/5] Update crates/blockchain/src/lib.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- crates/blockchain/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 3928a7c6..1285ded8 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -86,7 +86,7 @@ impl SyncStatusTracker { if network_lag > NETWORK_STALL_THRESHOLD { self.syncing = false; } else if self.syncing { - self.syncing = head_lag > SYNC_LAG_THRESHOLD - SYNC_HYSTERESIS_BAND; + self.syncing = head_lag > SYNC_LAG_THRESHOLD.saturating_sub(SYNC_HYSTERESIS_BAND); } else { self.syncing = head_lag > SYNC_LAG_THRESHOLD; } From 76942f2a85cf906c398ed540cb5fef7e97568485 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Sat, 6 Jun 2026 06:16:43 +0100 Subject: [PATCH 5/5] fix: address sync status review feedback --- crates/blockchain/src/lib.rs | 11 +++++++---- crates/storage/src/store.rs | 10 ++++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index d09f3c6d..dad5e6cd 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -737,10 +737,7 @@ impl BlockChainServer { fn update_sync_status(&mut self, current_slot: u64) { let head_slot = self.store.head_slot(); - let max_seen_slot = self - .store - .max_live_chain_slot() - .unwrap_or(head_slot); + let max_seen_slot = self.store.max_live_chain_slot().unwrap_or(head_slot); let status = self .sync_status .update(current_slot, head_slot, max_seen_slot); @@ -910,6 +907,12 @@ mod tests { metrics::SyncStatus::Synced ); } + + let first_syncing_slot = 10 + SYNC_LAG_THRESHOLD + 1; + assert_eq!( + tracker.update(first_syncing_slot, 10, first_syncing_slot), + metrics::SyncStatus::Syncing + ); } #[test] diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index d765d8f6..ead7c2a7 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -843,6 +843,16 @@ impl Store { .collect() } + /// Return the highest slot in the live chain. + pub fn max_live_chain_slot(&self) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.prefix_iterator(Table::LiveChain, &[]) + .expect("iterator") + .filter_map(Result::ok) + .map(|(key, _)| decode_live_chain_key(&key).0) + .max() + } + /// Get all known block roots as HashSet. /// /// Useful for checking block existence without deserializing.