Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 119 additions & 9 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,48 @@ pub const MAX_ATTESTATIONS_DATA: usize = 16;
///
/// See: leanSpec PR #682.
pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1;
/// Local head lag beyond which the node is considered to be syncing.
///
/// See: leanSpec PR #708.
const SYNC_LAG_THRESHOLD: u64 = 4;
/// Freshest-known block lag beyond which the network is considered stalled.
///
/// During a network-wide stall the node remains synced so validators can help
/// the chain recover.
const NETWORK_STALL_THRESHOLD: u64 = 8;
/// Recovery band that prevents the sync status from flapping near the threshold.
const SYNC_HYSTERESIS_BAND: u64 = 2;

#[derive(Default)]
struct SyncStatusTracker {
syncing: bool,
}

impl SyncStatusTracker {
fn update(
&mut self,
current_slot: u64,
head_slot: u64,
max_seen_slot: u64,
) -> metrics::SyncStatus {
let head_lag = current_slot.saturating_sub(head_slot);
let network_lag = current_slot.saturating_sub(max_seen_slot);

if network_lag > NETWORK_STALL_THRESHOLD {
self.syncing = false;
} else if self.syncing {
self.syncing = head_lag > SYNC_LAG_THRESHOLD.saturating_sub(SYNC_HYSTERESIS_BAND);
} else {
self.syncing = head_lag > SYNC_LAG_THRESHOLD;
}

if self.syncing {
metrics::SyncStatus::Syncing
} else {
metrics::SyncStatus::Synced
}
}
}

/// Milliseconds until the next interval boundary, measured relative to genesis.
fn ms_until_next_interval(now_ms: u64, genesis_time_ms: u64) -> u64 {
Expand Down Expand Up @@ -100,6 +142,7 @@ impl BlockChain {
last_tick_instant: None,
attestation_committee_count,
pre_merge_coverage: None,
sync_status: SyncStatusTracker::default(),
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -164,6 +207,9 @@ pub struct BlockChainServer {
/// single-threaded message loop, so no synchronization is needed.
/// Observability-only.
pre_merge_coverage: Option<coverage::CoverageSnapshot>,

/// Stateful sync heuristic used by `lean_node_sync_status`.
sync_status: SyncStatusTracker,
}

impl BlockChainServer {
Expand All @@ -190,6 +236,7 @@ impl BlockChainServer {

// Update current slot metric
metrics::update_current_slot(slot);
self.update_sync_status(slot);

// Snapshot the aggregator flag once per tick so all read sites within
// the tick see a consistent value even if the admin API toggles it
Expand Down Expand Up @@ -451,15 +498,6 @@ impl BlockChainServer {
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
metrics::update_validators_count(self.key_manager.validator_ids().len() as u64);

// Update sync status based on head slot vs wall clock slot
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
let status = if head_slot >= current_slot {
metrics::SyncStatus::Synced
} else {
metrics::SyncStatus::Syncing
};
metrics::set_node_sync_status(status);

for table in ALL_TABLES {
metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table));
}
Expand Down Expand Up @@ -677,6 +715,18 @@ impl BlockChainServer {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}

fn update_sync_status(&mut self, current_slot: u64) {
let head_slot = self.store.head_slot();
let max_seen_slot = self
.store
.max_live_chain_slot()
.unwrap_or(head_slot);
Comment thread
dicethedev marked this conversation as resolved.
let status = self
.sync_status
.update(current_slot, head_slot, max_seen_slot);
metrics::set_node_sync_status(status);
}
}

// Protocol trait for internal messages only (tick scheduling).
Expand Down Expand Up @@ -826,3 +876,63 @@ impl Handler<AggregationDeadline> for BlockChainServer {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn sync_status_allows_lag_through_threshold() {
let mut tracker = SyncStatusTracker::default();

for lag in 0..=SYNC_LAG_THRESHOLD {
assert_eq!(
tracker.update(10 + lag, 10, 10 + lag),
metrics::SyncStatus::Synced
);
}
}

#[test]
fn sync_status_detects_local_lag_when_fresh_blocks_are_known() {
let mut tracker = SyncStatusTracker::default();
let current_slot = 10 + SYNC_LAG_THRESHOLD + 1;

assert_eq!(
tracker.update(current_slot, 10, current_slot),
metrics::SyncStatus::Syncing
);
}

#[test]
fn sync_status_treats_stale_known_blocks_as_network_stall() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(100, 0, 0), metrics::SyncStatus::Synced);
}

#[test]
fn sync_status_hysteresis_prevents_flapping() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 11, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 13, 15), metrics::SyncStatus::Synced);
}

#[test]
fn network_stall_reopens_sync_status() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(20, 0, 20), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(30, 0, 20), metrics::SyncStatus::Synced);
}

#[test]
fn future_head_saturates_lag_at_zero() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced);
}
}
1 change: 1 addition & 0 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ static LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED: std::sync::LazyLock<Histogram> =
// --- Sync Status ---

/// Node synchronization status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncStatus {
Idle,
Syncing,
Expand Down