diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 1a6fb766..dad5e6cd 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -56,6 +56,56 @@ pub const MAX_ATTESTATIONS_DATA: usize = 16; /// /// See: leanSpec PR #682. pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1; +/// Local head lag beyond which the node is considered to be syncing. +/// +/// See: leanSpec PR #708. +const SYNC_LAG_THRESHOLD: u64 = 4; +/// Freshest-known block lag beyond which the network is considered stalled. +/// +/// During a network-wide stall the node remains synced so validators can help +/// the chain recover. +const NETWORK_STALL_THRESHOLD: u64 = 8; +/// Recovery band that prevents the sync status from flapping near the threshold. +const SYNC_HYSTERESIS_BAND: u64 = 2; + +#[derive(Default)] +struct SyncStatusTracker { + syncing: bool, +} + +impl SyncStatusTracker { + fn update( + &mut self, + current_slot: u64, + head_slot: u64, + max_seen_slot: u64, + ) -> metrics::SyncStatus { + let head_lag = current_slot.saturating_sub(head_slot); + let network_lag = current_slot.saturating_sub(max_seen_slot); + + if network_lag > NETWORK_STALL_THRESHOLD { + self.syncing = false; + } else if self.syncing { + self.syncing = head_lag > SYNC_LAG_THRESHOLD.saturating_sub(SYNC_HYSTERESIS_BAND); + } else { + self.syncing = head_lag > SYNC_LAG_THRESHOLD; + } + + if self.syncing { + metrics::SyncStatus::Syncing + } else { + metrics::SyncStatus::Synced + } + } + + fn duties_allowed(&self) -> bool { + !self.syncing + } + + fn gate_proposer(&self, proposer: Option) -> Option { + proposer.filter(|_| self.duties_allowed()) + } +} /// Milliseconds until the next interval boundary, measured relative to genesis. fn ms_until_next_interval(now_ms: u64, genesis_time_ms: u64) -> u64 { @@ -100,6 +150,7 @@ impl BlockChain { last_tick_instant: None, attestation_committee_count, pre_merge_coverage: None, + sync_status: SyncStatusTracker::default(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -164,6 +215,9 @@ pub struct BlockChainServer { /// single-threaded message loop, so no synchronization is needed. /// Observability-only. pre_merge_coverage: Option, + + /// Stateful sync heuristic used by `lean_node_sync_status`. + sync_status: SyncStatusTracker, } impl BlockChainServer { @@ -190,6 +244,7 @@ impl BlockChainServer { // Update current slot metric metrics::update_current_slot(slot); + self.update_sync_status(slot); // Snapshot the aggregator flag once per tick so all read sites within // the tick see a consistent value even if the admin API toggles it @@ -201,9 +256,16 @@ impl BlockChainServer { // At interval 0, check if we will propose (but don't build the block yet). // Tick forkchoice first to accept attestations, then build the block // using the freshly-accepted attestations. - let proposer_validator_id = (interval == 0 && slot > 0) + let scheduled_proposer = (interval == 0 && slot > 0) .then(|| self.get_our_proposer(slot)) .flatten(); + let proposer_validator_id = self.sync_status.gate_proposer(scheduled_proposer); + + if let Some(validator_id) = scheduled_proposer + && proposer_validator_id.is_none() + { + info!(%slot, %validator_id, "Skipping block proposal while syncing"); + } // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote // (interval 4), so the post-block report for this round sees its @@ -255,7 +317,11 @@ impl BlockChainServer { slot - 1, ); } - self.produce_attestations(slot, is_aggregator); + if self.sync_status.duties_allowed() { + self.produce_attestations(slot, is_aggregator); + } else if !self.key_manager.validator_ids().is_empty() { + info!(%slot, "Skipping attestations while syncing"); + } } // Update safe target slot metric (updated by store.on_tick at interval 3) @@ -451,15 +517,6 @@ impl BlockChainServer { metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); - // Update sync status based on head slot vs wall clock slot - let current_slot = self.store.time() / INTERVALS_PER_SLOT; - let status = if head_slot >= current_slot { - metrics::SyncStatus::Synced - } else { - metrics::SyncStatus::Syncing - }; - metrics::set_node_sync_status(status); - for table in ALL_TABLES { metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table)); } @@ -677,6 +734,15 @@ impl BlockChainServer { let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); } + + fn update_sync_status(&mut self, current_slot: u64) { + let head_slot = self.store.head_slot(); + let max_seen_slot = self.store.max_live_chain_slot().unwrap_or(head_slot); + let status = self + .sync_status + .update(current_slot, head_slot, max_seen_slot); + metrics::set_node_sync_status(status); + } } // Protocol trait for internal messages only (tick scheduling). @@ -826,3 +892,97 @@ impl Handler for BlockChainServer { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_status_allows_lag_through_threshold() { + let mut tracker = SyncStatusTracker::default(); + + for lag in 0..=SYNC_LAG_THRESHOLD { + assert_eq!( + tracker.update(10 + lag, 10, 10 + lag), + metrics::SyncStatus::Synced + ); + } + + let first_syncing_slot = 10 + SYNC_LAG_THRESHOLD + 1; + assert_eq!( + tracker.update(first_syncing_slot, 10, first_syncing_slot), + metrics::SyncStatus::Syncing + ); + } + + #[test] + fn sync_status_detects_local_lag_when_fresh_blocks_are_known() { + let mut tracker = SyncStatusTracker::default(); + let current_slot = 10 + SYNC_LAG_THRESHOLD + 1; + + assert_eq!( + tracker.update(current_slot, 10, current_slot), + metrics::SyncStatus::Syncing + ); + } + + #[test] + fn sync_status_treats_stale_known_blocks_as_network_stall() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(100, 0, 0), metrics::SyncStatus::Synced); + } + + #[test] + fn sync_status_hysteresis_prevents_flapping() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 11, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 13, 15), metrics::SyncStatus::Synced); + } + + #[test] + fn network_stall_reopens_sync_status() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(20, 0, 20), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(30, 0, 20), metrics::SyncStatus::Synced); + } + + #[test] + fn future_head_saturates_lag_at_zero() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced); + } + + #[test] + fn syncing_gates_proposals_and_attestations() { + let mut tracker = SyncStatusTracker::default(); + tracker.update(20, 0, 20); + + assert!(!tracker.duties_allowed()); + assert_eq!(tracker.gate_proposer(Some(3)), None); + } + + #[test] + fn caught_up_node_allows_proposals_and_attestations() { + let mut tracker = SyncStatusTracker::default(); + tracker.update(20, 0, 20); + tracker.update(20, 18, 20); + + assert!(tracker.duties_allowed()); + assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); + } + + #[test] + fn network_stall_keeps_proposals_and_attestations_enabled() { + let mut tracker = SyncStatusTracker::default(); + tracker.update(100, 0, 0); + + assert!(tracker.duties_allowed()); + assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); + } +} diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 8a8cbcf3..0bdd5535 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -491,6 +491,7 @@ static LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED: std::sync::LazyLock = // --- Sync Status --- /// Node synchronization status. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SyncStatus { Idle, Syncing, diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index d765d8f6..ead7c2a7 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -843,6 +843,16 @@ impl Store { .collect() } + /// Return the highest slot in the live chain. + pub fn max_live_chain_slot(&self) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.prefix_iterator(Table::LiveChain, &[]) + .expect("iterator") + .filter_map(Result::ok) + .map(|(key, _)| decode_live_chain_key(&key).0) + .max() + } + /// Get all known block roots as HashSet. /// /// Useful for checking block existence without deserializing.