From 7d2ea68eeb4cd305bb22af3af9c658b19b350839 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Tue, 23 Jun 2026 18:43:36 +0700 Subject: [PATCH 1/3] fix: tracing metrics --- crates/app/src/health/checker.rs | 261 +++++++++ crates/app/src/health/checks.rs | 844 +++++++++++++++++++++++++++ crates/app/src/health/error.rs | 41 ++ crates/app/src/health/gatherer.rs | 311 ++++++++++ crates/app/src/health/metrics.rs | 25 + crates/app/src/health/mod.rs | 70 +++ crates/app/src/health/model.rs | 66 +++ crates/app/src/health/reducers.rs | 49 ++ crates/app/src/health/select.rs | 103 ++++ crates/tracing/src/layers/metrics.rs | 120 +++- crates/tracing/src/metrics.rs | 18 +- 11 files changed, 1885 insertions(+), 23 deletions(-) create mode 100644 crates/app/src/health/checker.rs create mode 100644 crates/app/src/health/checks.rs create mode 100644 crates/app/src/health/error.rs create mode 100644 crates/app/src/health/gatherer.rs create mode 100644 crates/app/src/health/metrics.rs create mode 100644 crates/app/src/health/mod.rs create mode 100644 crates/app/src/health/model.rs create mode 100644 crates/app/src/health/reducers.rs create mode 100644 crates/app/src/health/select.rs diff --git a/crates/app/src/health/checker.rs b/crates/app/src/health/checker.rs new file mode 100644 index 00000000..320d16a2 --- /dev/null +++ b/crates/app/src/health/checker.rs @@ -0,0 +1,261 @@ +//! The health checker background service. + +use std::time::Duration; + +use tokio_util::sync::CancellationToken; +use tracing::warn; + +use super::{ + LABELS_CARDINALITY_THRESHOLD, MAX_SCRAPES, Metadata, SCRAPE_PERIOD, + checks::{Check, all_checks, new_query_func}, + error::{Error, Result}, + gatherer::Gatherer, + metrics::HEALTH_METRICS, + model::MetricFamily, +}; + +/// Name of the high-cardinality metric, skipped during the cardinality scan. +const HIGH_CARDINALITY_METRIC: &str = "app_health_metrics_high_cardinality"; + +/// Health checker: periodically scrapes metrics and publishes check results. +pub struct Checker { + metadata: Metadata, + checks: Vec, + metrics: Vec>, + gatherer: Box, + scrape_period: Duration, + max_scrapes: usize, + num_validators: i64, +} + +impl Checker { + /// Returns a new health checker. + /// + /// `num_validators` is used for the high-cardinality threshold and is + /// distinct from [`Metadata::num_validators`] (which the checks use), + /// mirroring Charon's `NewChecker`. + pub fn new(metadata: Metadata, gatherer: Box, num_validators: i64) -> Self { + Self { + metadata, + checks: all_checks(), + metrics: Vec::new(), + gatherer, + scrape_period: SCRAPE_PERIOD, + max_scrapes: MAX_SCRAPES, + num_validators, + } + } + + /// Runs the checker until `quit` is cancelled. All logs emitted while + /// running carry the `health` topic, mirroring Charon's + /// `log.WithTopic(ctx, "health")`. + pub async fn run(mut self, quit: CancellationToken) { + let span = tracing::debug_span!("health", topic = "health"); + // Full-path trait call so it doesn't shadow the inherent `instrument`. + tracing::Instrument::instrument(self.run_loop(quit), span).await; + } + + /// The scrape/instrument loop, ticking every [`Checker::scrape_period`]. + async fn run_loop(&mut self, quit: CancellationToken) { + let mut interval = tokio::time::interval(self.scrape_period); + // Skip the immediate first tick so the first scrape happens after one + // full period, matching Charon's `time.NewTicker` semantics. + interval.tick().await; + + loop { + tokio::select! { + () = quit.cancelled() => return, + _ = interval.tick() => { + if let Err(error) = self.scrape() { + warn!(?error, "Failed to scrape metrics"); + continue; + } + self.instrument(); + } + } + } + } + + /// Scrapes metrics into the rolling window, detecting high-cardinality + /// families and re-gathering once if any are found. + fn scrape(&mut self) -> Result<()> { + let mut scrape = self.gatherer.gather().map_err(Error::GatherMetrics)?; + + let threshold = LABELS_CARDINALITY_THRESHOLD + .saturating_mul(usize::try_from(self.num_validators).unwrap_or(0)); + let mut gather_again = false; + + for family in &scrape { + if family.name == HIGH_CARDINALITY_METRIC { + continue; + } + + let max_labels = family + .metrics + .iter() + .map(|metric| metric.labels.len()) + .max() + .unwrap_or(0); + + if max_labels > threshold { + HEALTH_METRICS.metrics_high_cardinality[&family.name] + .set(i64::try_from(max_labels).unwrap_or(i64::MAX)); + gather_again = true; + } + } + + if gather_again { + scrape = self.gatherer.gather().map_err(Error::GatherMetrics)?; + } + + self.metrics.push(scrape); + if self.metrics.len() > self.max_scrapes { + self.metrics.remove(0); + } + + Ok(()) + } + + /// Runs all checks against the rolling window and updates the gauge. + fn instrument(&self) { + let query = new_query_func(&self.metrics); + for check in &self.checks { + let failing = match (check.func)(&query, &self.metadata) { + Ok(failing) => failing, + Err(error) => { + // Charon rate-limits this warning via log.Filter(); Pluto + // has no equivalent, so it is logged each tick. The gauge is + // still cleared (set to 0) on error, matching Charon. + warn!(check = check.name, ?error, "Health check failed"); + false + } + }; + + let value: i64 = i64::from(failing); + HEALTH_METRICS.checks[&(check.severity.as_str().to_owned(), check.name.to_owned())] + .set(value); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + use super::*; + use crate::health::{ + gatherer::GatherError, + model::{LabelPair, Metric, MetricType}, + }; + + type Responder = + Box std::result::Result, GatherError> + Send + Sync>; + + struct MockGatherer { + calls: Arc, + responder: Responder, + } + + impl MockGatherer { + fn new(responder: Responder) -> (Self, Arc) { + let calls = Arc::new(AtomicUsize::new(0)); + ( + Self { + calls: Arc::clone(&calls), + responder, + }, + calls, + ) + } + } + + impl Gatherer for MockGatherer { + fn gather(&self) -> std::result::Result, GatherError> { + let n = self.calls.fetch_add(1, Ordering::SeqCst); + (self.responder)(n) + } + } + + fn empty_checker(responder: Responder, num_validators: i64) -> (Checker, Arc) { + let (mock, calls) = MockGatherer::new(responder); + let checker = Checker::new(Metadata::default(), Box::new(mock), num_validators); + (checker, calls) + } + + fn labeled_family(name: &str) -> MetricFamily { + MetricFamily { + name: name.to_owned(), + metric_type: MetricType::Gauge, + metrics: vec![Metric { + labels: vec![LabelPair { + name: "peer".to_owned(), + value: "1".to_owned(), + }], + counter: None, + gauge: Some(1.0), + }], + } + } + + #[test] + fn scrape_trims_rolling_window() { + let (mut checker, calls) = empty_checker(Box::new(|_| Ok(Vec::new())), 1); + + for _ in 0..(MAX_SCRAPES + 2) { + checker.scrape().expect("scrape"); + } + + assert_eq!(checker.metrics.len(), MAX_SCRAPES); + assert_eq!(calls.load(Ordering::SeqCst), MAX_SCRAPES + 2); + } + + #[test] + fn scrape_high_cardinality_regathers() { + // num_validators = 0 → threshold = 0, so any labelled series trips it. + let (mut checker, calls) = empty_checker( + Box::new(|_| Ok(vec![labeled_family("p2p_ping_success")])), + 0, + ); + + checker.scrape().expect("scrape"); + + // Gathered once for the scan, once more after detecting high cardinality. + assert_eq!(calls.load(Ordering::SeqCst), 2); + assert_eq!(checker.metrics.len(), 1); + } + + #[test] + fn scrape_propagates_gather_error() { + let (mut checker, _calls) = empty_checker(Box::new(|_| Err("boom".into())), 1); + + let err = checker.scrape().expect_err("should error"); + assert_eq!(err.to_string(), "gather metrics"); + assert!(checker.metrics.is_empty()); + } + + #[tokio::test] + async fn run_scrapes_until_cancelled() { + let (mut checker, calls) = empty_checker(Box::new(|_| Ok(Vec::new())), 1); + checker.scrape_period = Duration::from_millis(5); + + let quit = CancellationToken::new(); + let handle = tokio::spawn(checker.run(quit.clone())); + + let deadline = tokio::time::Instant::now() + .checked_add(Duration::from_secs(2)) + .expect("deadline overflow"); + while calls.load(Ordering::SeqCst) < 2 { + assert!( + tokio::time::Instant::now() < deadline, + "checker did not scrape" + ); + tokio::time::sleep(Duration::from_millis(1)).await; + } + + quit.cancel(); + handle.await.expect("join run task"); + } +} diff --git a/crates/app/src/health/checks.rs b/crates/app/src/health/checks.rs new file mode 100644 index 00000000..75e2c7fe --- /dev/null +++ b/crates/app/src/health/checks.rs @@ -0,0 +1,844 @@ +//! The fixed set of health checks and the query function over scraped metrics. + +use super::{ + Metadata, Severity, + error::{Error, Result}, + model::{LabelPair, MetricFamily}, + reducers::{Reducer, gauge_max, increase}, + select::{Selector, count_labels, count_non_zero_labels, no_labels, sum_labels}, +}; + +/// A health check. +pub(crate) struct Check { + /// Name of the check (also the `name` label value). + pub(crate) name: &'static str, + /// Human-readable description. + /// + /// Retained for parity with Charon's `check.Description`; not yet surfaced + /// (Charon does not consume it within the package either). + #[allow(dead_code, reason = "ported for parity; surfaced by future tooling")] + pub(crate) description: &'static str, + /// Severity. + pub(crate) severity: Severity, + /// Returns true if the check is failing. + pub(crate) func: fn(&QueryFunc<'_>, &Metadata) -> Result, +} + +/// Query function bound to a rolling window of scrapes. +pub(crate) struct QueryFunc<'a> { + metrics: &'a [Vec], +} + +/// Returns a query function over `metrics` (a rolling window of scrapes). +pub(crate) fn new_query_func(metrics: &[Vec]) -> QueryFunc<'_> { + QueryFunc { metrics } +} + +impl QueryFunc<'_> { + /// For each scrape, finds the first family matching `name` with at least + /// one series, applies `selector` to it, and collects the resulting + /// samples; then reduces them with `reducer`. + pub(crate) fn query(&self, name: &str, selector: Selector, reducer: Reducer) -> Result { + let mut selected = Vec::new(); + + for scrape in self.metrics { + for family in scrape { + if family.name != name || family.metrics.is_empty() { + continue; + } + match selector(family).map_err(|e| Error::LabelSelector(Box::new(e)))? { + None => continue, + Some(metric) => { + selected.push(metric); + break; + } + } + } + } + + reducer(&selected).map_err(|e| Error::SeriesReducer(Box::new(e))) + } +} + +/// Convenience constructor for a label pair. +fn label(name: &str, value: &str) -> LabelPair { + LabelPair { + name: name.to_owned(), + value: value.to_owned(), + } +} + +/// Lossy `i64` → `f64` conversion used only for threshold comparisons. +#[allow( + clippy::cast_precision_loss, + reason = "validator/peer counts are small; threshold comparison does not require exactness" +)] +fn to_f64(n: i64) -> f64 { + n as f64 +} + +fn high_error_log_rate(q: &QueryFunc<'_>, m: &Metadata) -> Result { + // Allow 2 errors per validator. + let value = q.query("app_log_error_total", sum_labels(Vec::new()), increase)?; + Ok(value > 2.0 * to_f64(m.num_validators)) +} + +fn high_warning_log_rate(q: &QueryFunc<'_>, m: &Metadata) -> Result { + // Allow 2 warnings per validator. + let value = q.query("app_log_warning_total", sum_labels(Vec::new()), increase)?; + Ok(value > 2.0 * to_f64(m.num_validators)) +} + +fn beacon_node_syncing(q: &QueryFunc<'_>, _m: &Metadata) -> Result { + let max_val = q.query("app_monitoring_beacon_node_syncing", no_labels(), gauge_max)?; + Ok(max_val == 1.0) +} + +fn insufficient_connected_peers(q: &QueryFunc<'_>, m: &Metadata) -> Result { + let max_val = q.query("p2p_ping_success", count_non_zero_labels(), gauge_max)?; + let required = to_f64(m.quorum_peers) - 1.0; // Exclude self. + Ok(max_val < required) +} + +fn pending_validators(q: &QueryFunc<'_>, _m: &Metadata) -> Result { + let max_val = q.query( + "core_scheduler_validator_status", + count_labels(vec![label("status", "pending")]), + gauge_max, + )?; + Ok(max_val > 0.0) +} + +fn proposal_failures(q: &QueryFunc<'_>, _m: &Metadata) -> Result { + let value = q.query( + "core_tracker_failed_duties_total", + sum_labels(vec![label("duty", ".*proposal")]), + increase, + )?; + Ok(value > 0.0) +} + +fn high_registration_failures_rate(q: &QueryFunc<'_>, _m: &Metadata) -> Result { + let value = q.query( + "core_bcast_recast_errors_total", + sum_labels(Vec::new()), + increase, + )?; + Ok(value > 0.0) +} + +fn metrics_high_cardinality(q: &QueryFunc<'_>, _m: &Metadata) -> Result { + let max_val = q.query( + "app_health_metrics_high_cardinality", + sum_labels(Vec::new()), + gauge_max, + )?; + Ok(max_val > 0.0) +} + +fn using_fallback_beacon_nodes(q: &QueryFunc<'_>, _m: &Metadata) -> Result { + let max_val = q.query("app_eth2_using_fallback", sum_labels(Vec::new()), gauge_max)?; + Ok(max_val > 0.0) +} + +/// Returns the full set of health checks, in Charon order. +pub(crate) fn all_checks() -> Vec { + vec![ + Check { + name: "high_error_log_rate", + description: "High rate of error logs. Please check the logs for more details.", + severity: Severity::Warning, + func: high_error_log_rate, + }, + Check { + name: "high_warning_log_rate", + description: "High rate of warning logs. Please check the logs for more details.", + severity: Severity::Warning, + func: high_warning_log_rate, + }, + Check { + name: "beacon_node_syncing", + description: "Beacon Node in syncing state.", + severity: Severity::Critical, + func: beacon_node_syncing, + }, + Check { + name: "insufficient_connected_peers", + description: "Not connected to at least quorum peers. Check logs for networking issue or coordinate with peers.", + severity: Severity::Critical, + func: insufficient_connected_peers, + }, + Check { + name: "pending_validators", + description: "Pending validators detected. Activate them to start validating.", + severity: Severity::Info, + func: pending_validators, + }, + Check { + name: "proposal_failures", + description: "Proposal failures detected. See .", + severity: Severity::Warning, + func: proposal_failures, + }, + Check { + name: "high_registration_failures_rate", + description: "High rate of failed validator registrations. Please check the logs for more details.", + severity: Severity::Warning, + func: high_registration_failures_rate, + }, + Check { + name: "metrics_high_cardinality", + description: "Metrics reached high cardinality threshold. Please check metrics reported by app_health_metrics_high_cardinality.", + severity: Severity::Warning, + func: metrics_high_cardinality, + }, + Check { + name: "using_fallback_beacon_nodes", + description: "Using fallback beacon nodes. Please check primary beacon nodes health.", + severity: Severity::Warning, + func: using_fallback_beacon_nodes, + }, + ] +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::health::{ + Metadata, + model::{LabelPair, Metric, MetricFamily, MetricType}, + }; + + fn gen_labels(name_vals: &[&str]) -> Vec { + assert!( + name_vals.len().is_multiple_of(2), + "must have even number of name/value pairs" + ); + name_vals + .chunks(2) + .map(|c| LabelPair { + name: c[0].to_owned(), + value: c[1].to_owned(), + }) + .collect() + } + + fn gen_counter(labels: &[LabelPair], values: &[i32]) -> Vec { + values + .iter() + .map(|&v| Metric { + labels: labels.to_vec(), + counter: Some(f64::from(v)), + gauge: None, + }) + .collect() + } + + fn gen_gauge(labels: &[LabelPair], values: &[i32]) -> Vec { + values + .iter() + .map(|&v| Metric { + labels: labels.to_vec(), + counter: None, + gauge: Some(f64::from(v)), + }) + .collect() + } + + /// Transposes a set of series (series × time) into per-scrape families + /// (time × family), mirroring Charon's `genFam`. + fn gen_fam(name: &str, series: &[Vec]) -> Vec { + let metric_type = if series + .first() + .and_then(|s| s.first()) + .map(|m| m.gauge.is_some()) + .unwrap_or(false) + { + MetricType::Gauge + } else { + MetricType::Counter + }; + + let max_len = series.iter().map(Vec::len).max().unwrap_or(0); + let mut resp: Vec = (0..max_len) + .map(|_| MetricFamily { + name: name.to_owned(), + metric_type, + metrics: Vec::new(), + }) + .collect(); + + for s in series { + for (i, metric) in s.iter().enumerate() { + resp[i].metrics.push(metric.clone()); + } + } + + resp + } + + /// Mirrors Charon's `testCheck`: interleaves the check's per-scrape + /// families with two noise families, runs the named check, and asserts + /// the outcome. + fn test_check(m: &Metadata, check_name: &str, expect: bool, metrics: Vec) { + let random_foo = gen_fam( + "foo", + &[ + gen_counter(&gen_labels(&["foo", "foo1"]), &[1, 2, 3]), + gen_counter(&gen_labels(&["foo", "foo2"]), &[1, 4, 8]), + ], + ); + let random_bar = gen_fam( + "bar", + &[ + gen_gauge(&gen_labels(&["bar", "bar1"]), &[1, 1, 4]), + gen_gauge(&gen_labels(&["bar", "bar2"]), &[1, 1, 1]), + ], + ); + + let max_len = metrics.len().max(random_foo.len()).max(random_bar.len()); + let mut multi: Vec> = Vec::with_capacity(max_len); + for i in 0..max_len { + let mut fam = Vec::new(); + if i < metrics.len() { + fam.push(metrics[i].clone()); + } + if i < random_foo.len() { + fam.push(random_foo[i].clone()); + } + if i < random_bar.len() { + fam.push(random_bar[i].clone()); + } + multi.push(fam); + } + + let query = new_query_func(&multi); + let check = all_checks() + .into_iter() + .find(|c| c.name == check_name) + .expect("check not found"); + let failed = (check.func)(&query, m).expect("check should not error"); + assert_eq!(failed, expect); + } + + #[test] + fn proposal_failures_check() { + let m = Metadata { + quorum_peers: 2, + ..Metadata::default() + }; + let name = "proposal_failures"; + let metric = "core_tracker_failed_duties_total"; + + let proposal_full = gen_labels(&["duty", "proposal"]); + let proposal_blind = gen_labels(&["duty", "builder_proposal"]); + let attestation = gen_labels(&["duty", "attester"]); + + // no data + test_check(&m, name, false, Vec::new()); + + // no failures + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_counter(&proposal_full, &[1, 1, 1, 1]), + gen_counter(&proposal_blind, &[0, 0, 0, 0]), + gen_counter(&attestation, &[2, 2, 2, 2]), + ], + ), + ); + + // full proposal failures + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_counter(&proposal_full, &[0, 0, 1, 1]), + gen_counter(&proposal_blind, &[0, 0, 0, 0]), + gen_counter(&attestation, &[0, 0, 0, 0]), + ], + ), + ); + + // blind proposal failures + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_counter(&proposal_full, &[0, 0, 0, 0]), + gen_counter(&proposal_blind, &[0, 0, 1, 1]), + gen_counter(&attestation, &[0, 0, 0, 0]), + ], + ), + ); + + // attestation failures + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_counter(&proposal_full, &[0, 0, 0, 0]), + gen_counter(&proposal_blind, &[0, 0, 0, 0]), + gen_counter(&attestation, &[0, 0, 1, 1]), + ], + ), + ); + + // multiple failures + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_counter(&proposal_full, &[0, 0, 1, 1]), + gen_counter(&proposal_blind, &[0, 0, 1, 1]), + gen_counter(&attestation, &[0, 0, 1, 1]), + ], + ), + ); + } + + #[test] + fn pending_validators_check() { + let m = Metadata { + quorum_peers: 2, + ..Metadata::default() + }; + let name = "pending_validators"; + let metric = "core_scheduler_validator_status"; + + let val1_pending = gen_labels(&["pubkey", "1", "status", "pending"]); + let val1_active = gen_labels(&["pubkey", "1", "status", "active"]); + let val2_active = gen_labels(&["pubkey", "2", "status", "active"]); + let val3_pending = gen_labels(&["pubkey", "3", "status", "pending"]); + + // no data + test_check(&m, name, false, Vec::new()); + + // single active + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_gauge(&val1_active, &[1, 1, 1, 1])]), + ); + + // single pending + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_gauge(&val1_pending, &[1, 1, 1, 1])]), + ); + + // single activated + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_gauge(&val1_pending, &[1, 1, 0, 0]), + gen_gauge(&val1_active, &[0, 0, 1, 1]), + ], + ), + ); + + // 1o3 pending + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_gauge(&val1_pending, &[0, 0, 0, 0]), + gen_gauge(&val1_active, &[1, 1, 1, 1]), + gen_gauge(&val2_active, &[1, 1, 1, 1]), + gen_gauge(&val3_pending, &[1, 1, 1, 1]), + ], + ), + ); + } + + #[test] + fn insufficient_peer_check() { + let m = Metadata { + quorum_peers: 2, + ..Metadata::default() + }; + let name = "insufficient_connected_peers"; + let metric = "p2p_ping_success"; + + let peer1 = gen_labels(&["peer", "1"]); + let peer2 = gen_labels(&["peer", "2"]); + let peer3 = gen_labels(&["peer", "3"]); + + // no data + test_check(&m, name, true, Vec::new()); + + // no peers + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_gauge(&peer1, &[0, 0, 0, 0]), + gen_gauge(&peer2, &[0, 0, 0, 0]), + gen_gauge(&peer3, &[0, 0, 0, 0]), + ], + ), + ); + + // all peers + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_gauge(&peer1, &[1, 1, 1]), + gen_gauge(&peer2, &[1, 1, 1]), + gen_gauge(&peer3, &[1, 1, 1]), + ], + ), + ); + + // quorum peers + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_gauge(&peer1, &[0, 0, 0]), + gen_gauge(&peer2, &[1, 1, 1]), + gen_gauge(&peer3, &[1, 1, 1]), + ], + ), + ); + + // blip + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_gauge(&peer1, &[1, 0, 1]), + gen_gauge(&peer2, &[1, 0, 1]), + gen_gauge(&peer3, &[1, 0, 1]), + ], + ), + ); + } + + #[test] + fn bn_syncing_check() { + let m = Metadata::default(); + let name = "beacon_node_syncing"; + let metric = "app_monitoring_beacon_node_syncing"; + + // no data + test_check(&m, name, false, Vec::new()); + + // single zero + test_check(&m, name, false, gen_fam(metric, &[gen_gauge(&[], &[0])])); + + // multiple constants + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_gauge(&[], &[1, 1, 1])]), + ); + + // blip + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_gauge(&[], &[0, 1, 0])]), + ); + } + + #[test] + fn error_logs_check() { + let m = Metadata { + num_validators: 10, + ..Metadata::default() + }; + let name = "high_error_log_rate"; + let metric = "app_log_error_total"; + + let topic_a = gen_labels(&["topic", "a"]); + let topic_b = gen_labels(&["topic", "b"]); + + // no data + test_check(&m, name, false, Vec::new()); + + // single zero + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_counter(&topic_a, &[0])]), + ); + + // multiple zeros + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_counter(&topic_a, &[0, 0, 0]), + gen_counter(&topic_b, &[0, 0, 0]), + ], + ), + ); + + // multiple constants + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_counter(&topic_a, &[1, 1, 1])]), + ); + + // too few + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_counter(&topic_a, &[0, 0, 10])]), + ); + + // too few multi + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_counter(&topic_a, &[0, 0, 5]), + gen_counter(&topic_b, &[0, 0, 5]), + ], + ), + ); + + // sufficient + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_counter(&topic_a, &[10, 20, 30, 40, 500])]), + ); + } + + #[test] + fn warn_logs_check() { + let m = Metadata { + num_validators: 10, + ..Metadata::default() + }; + let name = "high_warning_log_rate"; + let metric = "app_log_warning_total"; + + let topic_a = gen_labels(&["topic", "a"]); + let topic_b = gen_labels(&["topic", "b"]); + + // no data + test_check(&m, name, false, Vec::new()); + + // single zero + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_counter(&topic_a, &[0])]), + ); + + // multiple zeros + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_counter(&topic_a, &[0, 0, 0]), + gen_counter(&topic_b, &[0, 0, 0]), + ], + ), + ); + + // multiple constants + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_counter(&topic_a, &[1, 1, 1])]), + ); + + // too few + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_counter(&topic_a, &[0, 0, 10])]), + ); + + // too few multi + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_counter(&topic_a, &[0, 0, 5]), + gen_counter(&topic_b, &[0, 0, 5]), + ], + ), + ); + + // sufficient + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_counter(&topic_a, &[10, 20, 30, 40, 500])]), + ); + } + + #[test] + fn high_registration_failures_rate_check() { + let m = Metadata::default(); + let name = "high_registration_failures_rate"; + let metric = "core_bcast_recast_errors_total"; + + let pregen = gen_labels(&["source", "pregen"]); + let downstream = gen_labels(&["source", "downstream"]); + + // no data + test_check(&m, name, false, Vec::new()); + + // same errors count + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_gauge(&pregen, &[1, 1, 1])]), + ); + + // incrementing errors count + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_gauge(&downstream, &[0, 1, 2, 10])]), + ); + + // both labels have stable errors count + test_check( + &m, + name, + false, + gen_fam( + metric, + &[ + gen_gauge(&pregen, &[1, 1, 1]), + gen_gauge(&downstream, &[1, 1, 1]), + ], + ), + ); + + // both labels have increasing errors count + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_gauge(&pregen, &[10, 15, 18]), + gen_gauge(&downstream, &[1, 2, 3]), + ], + ), + ); + } + + #[test] + fn metrics_high_cardinality_check() { + let m = Metadata::default(); + let name = "metrics_high_cardinality"; + let metric = "app_health_metrics_high_cardinality"; + + // no data + test_check(&m, name, false, Vec::new()); + + // high cardinality + test_check( + &m, + name, + true, + gen_fam( + metric, + &[ + gen_gauge(&gen_labels(&["name", "metric1"]), &[1, 1, 1]), + gen_gauge(&gen_labels(&["name", "metric2"]), &[3, 5, 0]), + ], + ), + ); + } + + #[test] + fn using_fallback_beacon_nodes_check() { + let m = Metadata::default(); + let name = "using_fallback_beacon_nodes"; + let metric = "app_eth2_using_fallback"; + + // no data + test_check(&m, name, false, Vec::new()); + + // no fallback + test_check( + &m, + name, + false, + gen_fam(metric, &[gen_gauge(&[], &[0, 0, 0])]), + ); + + // single fallback + test_check( + &m, + name, + true, + gen_fam(metric, &[gen_gauge(&[], &[0, 1, 0])]), + ); + } +} diff --git a/crates/app/src/health/error.rs b/crates/app/src/health/error.rs new file mode 100644 index 00000000..896c1022 --- /dev/null +++ b/crates/app/src/health/error.rs @@ -0,0 +1,41 @@ +//! Health check errors. + +use super::gatherer::GatherError; + +/// Errors produced while evaluating health checks or gathering metrics. +/// +/// The leaf message strings match Charon's `app/health` error strings exactly. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// A metric family expected to contain exactly one series did not. + #[error("expected exactly one metric")] + ExpectedExactlyOneMetric, + + /// A selector received a family that is neither a gauge nor a counter. + #[error("bug: unsupported metric type")] + UnsupportedMetricType, + + /// The increase reducer received a sample that is neither a counter nor a + /// gauge. + #[error("bug: unsupported metric passed")] + UnsupportedMetricPassed, + + /// The gauge-max reducer received a non-gauge sample. + #[error("bug: non-gauge metric passed")] + NonGaugeMetricPassed, + + /// A label selector failed. + #[error("label selector")] + LabelSelector(#[source] Box), + + /// A series reducer failed. + #[error("series reducer")] + SeriesReducer(#[source] Box), + + /// Gathering metrics from the registry failed. + #[error("gather metrics")] + GatherMetrics(#[source] GatherError), +} + +/// Result type for health operations. +pub type Result = std::result::Result; diff --git a/crates/app/src/health/gatherer.rs b/crates/app/src/health/gatherer.rs new file mode 100644 index 00000000..880b73ec --- /dev/null +++ b/crates/app/src/health/gatherer.rs @@ -0,0 +1,311 @@ +//! Bridge from vise's text exposition to the in-memory metric model. +//! +//! vise (via `prometheus-client`) exposes no structured/typed collection API; +//! metrics are only reachable by encoding to text. We encode the registry in +//! the exact format the `vise-exporter` serves to Prometheus +//! (`Format::OpenMetricsForPrometheus`) and parse it back, so the checker reads +//! the same metric names Prometheus/Grafana scrape. In that format vise strips +//! the OpenMetrics `_total` suffix from counter samples, so counter names match +//! the registered base name (and Charon). + +use vise::{Format, MetricsCollection, Registry}; + +use super::model::{LabelPair, Metric, MetricFamily, MetricType}; + +/// Error returned by a [`Gatherer`]. +pub type GatherError = Box; + +/// Source of metric families for the health checker. +pub trait Gatherer: Send + Sync { + /// Gathers the current metric families. + fn gather(&self) -> std::result::Result, GatherError>; +} + +/// Gathers metrics from vise's global registry. +#[derive(Debug, Default)] +pub struct ViseGatherer; + +impl Gatherer for ViseGatherer { + fn gather(&self) -> std::result::Result, GatherError> { + let registry = MetricsCollection::default().collect(); + gather_registry(®istry) + } +} + +/// Encodes `registry` to text and parses it into the metric model. +fn gather_registry(registry: &Registry) -> std::result::Result, GatherError> { + let mut buffer = String::new(); + registry + .encode(&mut buffer, Format::OpenMetricsForPrometheus) + .map_err(|e| Box::new(e) as GatherError)?; + Ok(parse_exposition(&buffer)) +} + +/// Parses a Prometheus/OpenMetrics text exposition into metric families. +/// +/// Best-effort: comment lines other than `# TYPE` are skipped, and sample lines +/// that fail to parse are dropped. Each `# TYPE` line starts a new family; +/// subsequent sample lines are grouped under it. +fn parse_exposition(text: &str) -> Vec { + let mut families: Vec = Vec::new(); + let mut current: Option = None; + + for line in text.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + + if let Some(rest) = line.strip_prefix("# TYPE ") { + if let Some(family) = current.take() { + families.push(family); + } + let (name, type_str) = split_once_ws(rest); + current = Some(MetricFamily { + name: name.to_owned(), + metric_type: parse_type(type_str), + metrics: Vec::new(), + }); + } else if line.starts_with('#') { + // # HELP / # UNIT / # EOF / other comment. + } else if let Some(family) = current.as_mut() { + match parse_sample(line, family.metric_type) { + Some(metric) => family.metrics.push(metric), + // Input is trusted (we encode it ourselves), so a failure here + // means a parser gap. Surface it at debug rather than dropping + // silently (which could mask a queried metric). Not warn/error: + // those would feed back into `app_log_*_total` and the checker. + None => { + tracing::debug!(sample = line, "dropping unparseable metric sample"); + } + } + } + } + + if let Some(family) = current.take() { + families.push(family); + } + + families +} + +/// Parses a `# TYPE` type token into a [`MetricType`]. +fn parse_type(type_str: &str) -> MetricType { + match type_str.trim() { + "counter" => MetricType::Counter, + "gauge" => MetricType::Gauge, + "histogram" => MetricType::Histogram, + "info" => MetricType::Info, + _ => MetricType::Unknown, + } +} + +/// Splits `s` at the first ASCII whitespace, trimming the remainder's leading +/// space. +fn split_once_ws(s: &str) -> (&str, &str) { + match s.split_once(|c: char| c.is_ascii_whitespace()) { + Some((head, tail)) => (head, tail.trim_start()), + None => (s, ""), + } +} + +/// Parses one sample line (`name`, `name value`, or `name{labels} value`). +fn parse_sample(line: &str, metric_type: MetricType) -> Option { + let (labels, value_str) = match line.split_once('{') { + Some((_name, rest)) => { + let (labels, after) = scan_labels(rest)?; + let value = after.split_ascii_whitespace().next()?; + (labels, value) + } + None => { + let mut parts = line.split_ascii_whitespace(); + let _name = parts.next()?; + let value = parts.next()?; + (Vec::new(), value) + } + }; + + let value: f64 = value_str.parse().ok()?; + Some(make_metric(labels, value, metric_type)) +} + +/// Scans label pairs starting just after the opening `{`, returning the pairs +/// and the remainder of the line after the closing `}`. Handles `\"`, `\\` and +/// `\n` escapes inside quoted values. +fn scan_labels(s: &str) -> Option<(Vec, &str)> { + let mut pairs = Vec::new(); + let mut chars = s.chars(); + + loop { + // Skip separators; detect the end of the label set. + loop { + let mut probe = chars.clone(); + match probe.next() { + Some(c) if c == ',' || c.is_ascii_whitespace() => chars = probe, + Some('}') => return Some((pairs, probe.as_str())), + Some(_) => break, + None => return None, + } + } + + // Read the label name up to '='. + let mut name = String::new(); + loop { + match chars.next() { + Some('=') => break, + Some(c) => name.push(c), + None => return None, + } + } + + // Expect the opening quote. + if chars.next() != Some('"') { + return None; + } + + // Read the value up to the closing quote, applying escapes. + let mut value = String::new(); + loop { + match chars.next() { + Some('\\') => match chars.next() { + Some('n') => value.push('\n'), + Some('"') => value.push('"'), + Some('\\') => value.push('\\'), + Some(other) => value.push(other), + None => return None, + }, + Some('"') => break, + Some(c) => value.push(c), + None => return None, + } + } + + pairs.push(LabelPair { name, value }); + } +} + +/// Builds a [`Metric`] storing `value` in the slot for `metric_type`. +fn make_metric(labels: Vec, value: f64, metric_type: MetricType) -> Metric { + match metric_type { + MetricType::Counter => Metric { + labels, + counter: Some(value), + gauge: None, + }, + MetricType::Gauge => Metric { + labels, + counter: None, + gauge: Some(value), + }, + MetricType::Histogram | MetricType::Info | MetricType::Unknown => Metric { + labels, + counter: None, + gauge: None, + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use vise::{Counter, Gauge, LabeledFamily, Metrics, Registry}; + + #[derive(Debug, Metrics)] + #[metrics(prefix = "app_log")] + struct LogLike { + error_total: Counter, + } + + #[derive(Debug, Metrics)] + #[metrics(prefix = "core_tracker")] + struct TrackerLike { + #[metrics(labels = ["duty"])] + failed_duties_total: LabeledFamily, + } + + #[derive(Debug, Metrics)] + #[metrics(prefix = "p2p")] + struct P2pLike { + #[metrics(labels = ["peer"])] + ping_success: LabeledFamily, + } + + fn family<'a>(families: &'a [MetricFamily], name: &str) -> &'a MetricFamily { + families + .iter() + .find(|f| f.name == name) + .expect("family not found") + } + + #[test] + fn gather_emits_charon_names_and_strips_total() { + let log = LogLike::default(); + log.error_total.inc(); + let tracker = TrackerLike::default(); + tracker.failed_duties_total[&"proposal".to_owned()].inc(); + let p2p = P2pLike::default(); + p2p.ping_success[&"peerA".to_owned()].set(1); + + let mut registry = Registry::empty(); + registry.register_metrics(&log); + registry.register_metrics(&tracker); + registry.register_metrics(&p2p); + + let families = gather_registry(®istry).expect("gather"); + + // Counter `_total` is preserved as the registered base name (no doubling). + let log_fam = family(&families, "app_log_error_total"); + assert_eq!(log_fam.metric_type, MetricType::Counter); + assert_eq!(log_fam.metrics.len(), 1); + assert_eq!(log_fam.metrics[0].counter_value(), 1.0); + + let tracker_fam = family(&families, "core_tracker_failed_duties_total"); + assert_eq!(tracker_fam.metric_type, MetricType::Counter); + assert_eq!(tracker_fam.metrics[0].counter_value(), 1.0); + assert_eq!(tracker_fam.metrics[0].labels[0].name, "duty"); + assert_eq!(tracker_fam.metrics[0].labels[0].value, "proposal"); + + let p2p_fam = family(&families, "p2p_ping_success"); + assert_eq!(p2p_fam.metric_type, MetricType::Gauge); + assert_eq!(p2p_fam.metrics[0].gauge_value(), 1.0); + assert_eq!(p2p_fam.metrics[0].labels[0].value, "peerA"); + } + + #[test] + fn parses_labels_values_and_types() { + let text = "\ +# TYPE core_scheduler_validator_status gauge +core_scheduler_validator_status{pubkey=\"1\",status=\"pending\"} 1 +core_scheduler_validator_status{pubkey=\"2\",status=\"active\"} 0 +# HELP app_log_error_total Error count. +# TYPE app_log_error_total counter +app_log_error_total 7 +# EOF"; + let families = parse_exposition(text); + + let status = family(&families, "core_scheduler_validator_status"); + assert_eq!(status.metric_type, MetricType::Gauge); + assert_eq!(status.metrics.len(), 2); + assert_eq!(status.metrics[0].labels.len(), 2); + assert_eq!(status.metrics[0].labels[1].name, "status"); + assert_eq!(status.metrics[0].labels[1].value, "pending"); + assert_eq!(status.metrics[0].gauge_value(), 1.0); + + let errors = family(&families, "app_log_error_total"); + assert_eq!(errors.metric_type, MetricType::Counter); + assert_eq!(errors.metrics.len(), 1); + assert!(errors.metrics[0].labels.is_empty()); + assert_eq!(errors.metrics[0].counter_value(), 7.0); + } + + #[test] + fn unescapes_quoted_label_values() { + let text = "\ +# TYPE demo gauge +demo{path=\"a\\\"b\",note=\"x\\\\y\"} 3"; + let families = parse_exposition(text); + let demo = family(&families, "demo"); + assert_eq!(demo.metrics[0].labels[0].value, "a\"b"); + assert_eq!(demo.metrics[0].labels[1].value, "x\\y"); + } +} diff --git a/crates/app/src/health/metrics.rs b/crates/app/src/health/metrics.rs new file mode 100644 index 00000000..6b38aa42 --- /dev/null +++ b/crates/app/src/health/metrics.rs @@ -0,0 +1,25 @@ +//! Prometheus metrics published by the health checker. + +use vise::{Gauge, Global, LabeledFamily, Metrics}; + +/// Health metrics published by the checker. +/// +/// Emitted (after vise's `_total`-strip / Prometheus naming) as +/// `app_health_checks{severity,name}` and +/// `app_health_metrics_high_cardinality{name}`, matching Charon. +#[derive(Debug, Metrics)] +#[metrics(prefix = "app_health")] +pub struct HealthMetrics { + /// Application health checks by name and severity. Set to 1 for failing, 0 + /// for ok + #[metrics(labels = ["severity", "name"])] + pub checks: LabeledFamily<(String, String), Gauge, 2>, + + /// Metrics with high cardinality by name + #[metrics(labels = ["name"])] + pub metrics_high_cardinality: LabeledFamily, +} + +/// Global health metrics. +#[vise::register] +pub static HEALTH_METRICS: Global = Global::new(); diff --git a/crates/app/src/health/mod.rs b/crates/app/src/health/mod.rs new file mode 100644 index 00000000..d95a82b7 --- /dev/null +++ b/crates/app/src/health/mod.rs @@ -0,0 +1,70 @@ +//! Application health checks. +//! +//! A background service that, every [`SCRAPE_PERIOD`], scrapes all process +//! metrics, keeps a rolling window of the last [`MAX_SCRAPES`] scrapes, runs a +//! fixed set of health checks over that window (query a metric by name → select +//! series by label → reduce the time series to one number → compare to a +//! threshold), and publishes the per-check pass/fail state as the +//! `app_health_checks{severity,name}` gauge (1 = failing, 0 = ok). It also +//! detects high-cardinality metrics and publishes +//! `app_health_metrics_high_cardinality{name}`. + +mod checker; +mod checks; +mod error; +mod gatherer; +mod metrics; +mod model; +mod reducers; +mod select; + +pub use checker::Checker; +pub use error::{Error, Result}; +pub use gatherer::{GatherError, Gatherer, ViseGatherer}; +pub use metrics::{HEALTH_METRICS, HealthMetrics}; +pub use model::{LabelPair, Metric, MetricFamily, MetricType}; + +use std::time::Duration; + +/// Period between metric scrapes. +const SCRAPE_PERIOD: Duration = Duration::from_secs(30); + +/// Maximum number of scrapes retained in the rolling window. +const MAX_SCRAPES: usize = 10; + +/// High-cardinality threshold for a single validator; for `n` validators the +/// effective threshold is `LABELS_CARDINALITY_THRESHOLD * n`. +const LABELS_CARDINALITY_THRESHOLD: usize = 100; + +/// Severity of a health check. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Severity { + /// Critical: the node is likely not performing its duties. + Critical, + /// Warning: something needs attention. + Warning, + /// Info: informational only. + Info, +} + +impl Severity { + /// Returns the lowercase string used as the `severity` label value. + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Critical => "critical", + Self::Warning => "warning", + Self::Info => "info", + } + } +} + +/// Metadata about the cluster, used by the health checks. +#[derive(Debug, Clone, Copy, Default)] +pub struct Metadata { + /// Number of validators in the cluster. + pub num_validators: i64, + /// Number of peers in the cluster. + pub num_peers: i64, + /// Number of peers required for quorum. + pub quorum_peers: i64, +} diff --git a/crates/app/src/health/model.rs b/crates/app/src/health/model.rs new file mode 100644 index 00000000..fe7c1d19 --- /dev/null +++ b/crates/app/src/health/model.rs @@ -0,0 +1,66 @@ +//! In-memory Prometheus metric model used by the health checks. +//! +//! This mirrors the subset of the Prometheus protobuf model that Charon's +//! health checks rely on. Per-sample timestamps are intentionally omitted: the +//! reducers never read them — the time dimension comes from the checker storing +//! successive scrapes. + +/// Type of a metric family. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MetricType { + /// A monotonically increasing counter. + Counter, + /// A gauge that can go up or down. + Gauge, + /// A histogram (not queried by any check; retained for the cardinality + /// scan). + Histogram, + /// An info metric. + Info, + /// An unrecognised type. + Unknown, +} + +/// A name/value label pair. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LabelPair { + /// Label name. + pub name: String, + /// Label value. + pub value: String, +} + +/// A single metric sample (one time series at one scrape). +#[derive(Debug, Clone)] +pub struct Metric { + /// Labels on this series. + pub labels: Vec, + /// Counter value, if this is a counter sample. + pub counter: Option, + /// Gauge value, if this is a gauge sample. + pub gauge: Option, +} + +impl Metric { + /// Counter value, defaulting to `0.0` when absent (mirrors protobuf nil → + /// 0). + pub fn counter_value(&self) -> f64 { + self.counter.unwrap_or(0.0) + } + + /// Gauge value, defaulting to `0.0` when absent (mirrors protobuf nil → 0). + pub fn gauge_value(&self) -> f64 { + self.gauge.unwrap_or(0.0) + } +} + +/// A metric family: a named, typed group of series. +#[derive(Debug, Clone)] +pub struct MetricFamily { + /// Family name. + pub name: String, + /// Family type. + pub metric_type: MetricType, + /// Series in this family. + pub metrics: Vec, +} diff --git a/crates/app/src/health/reducers.rs b/crates/app/src/health/reducers.rs new file mode 100644 index 00000000..a124729c --- /dev/null +++ b/crates/app/src/health/reducers.rs @@ -0,0 +1,49 @@ +//! Series reducers: reduce a time series of samples to a single value. + +use super::{ + error::{Error, Result}, + model::Metric, +}; + +/// Reduces a time series of samples to a single value. +pub(crate) type Reducer = fn(&[Metric]) -> Result; + +/// Returns the increase across a counter (or gauge) time series: the last +/// sample's value minus the first. Fewer than two samples yields `0.0`. +pub(crate) fn increase(samples: &[Metric]) -> Result { + if samples.len() < 2 { + return Ok(0.0); + } + + let (Some(first), Some(last)) = (samples.first(), samples.last()) else { + return Ok(0.0); + }; + + if first.counter.is_none() && first.gauge.is_none() { + return Err(Error::UnsupportedMetricPassed); + } + + let first_val = first.counter_value() + first.gauge_value(); + let last_val = last.counter_value() + last.gauge_value(); + + Ok(last_val - first_val) +} + +/// Returns the maximum value across a gauge time series. Errors if any sample +/// is not a gauge. +pub(crate) fn gauge_max(samples: &[Metric]) -> Result { + let mut max_val = 0.0_f64; + + for sample in samples { + if sample.gauge.is_none() { + return Err(Error::NonGaugeMetricPassed); + } + + let value = sample.gauge_value(); + if value > max_val { + max_val = value; + } + } + + Ok(max_val) +} diff --git a/crates/app/src/health/select.rs b/crates/app/src/health/select.rs new file mode 100644 index 00000000..08a2c8d6 --- /dev/null +++ b/crates/app/src/health/select.rs @@ -0,0 +1,103 @@ +//! Label selectors: reduce a metric family to at most one synthetic gauge +//! sample. + +use regex::Regex; + +use super::{ + error::{Error, Result}, + model::{LabelPair, Metric, MetricFamily, MetricType}, +}; + +/// Maps a metric family to at most one synthetic sample. +pub(crate) type Selector = Box Result>>; + +/// Builds a synthetic gauge sample holding `value`. +fn gauge_metric(value: f64) -> Metric { + Metric { + labels: Vec::new(), + counter: None, + gauge: Some(value), + } +} + +/// Counts the series in the family with a non-zero gauge or counter value. +pub(crate) fn count_non_zero_labels() -> Selector { + Box::new(|fam: &MetricFamily| { + let mut count = 0.0_f64; + for metric in &fam.metrics { + if metric.gauge_value() != 0.0 || metric.counter_value() != 0.0 { + count += 1.0; + } + } + Ok(Some(gauge_metric(count))) + }) +} + +/// Returns the family's only series, erroring unless there is exactly one. +pub(crate) fn no_labels() -> Selector { + Box::new(|fam: &MetricFamily| { + let Some(metric) = fam.metrics.first() else { + return Err(Error::ExpectedExactlyOneMetric); + }; + if fam.metrics.len() != 1 { + return Err(Error::ExpectedExactlyOneMetric); + } + Ok(Some(metric.clone())) + }) +} + +/// Sums the values of series matching all of `labels`. +pub(crate) fn count_labels(labels: Vec) -> Selector { + Box::new(move |fam: &MetricFamily| { + let mut sum = 0.0_f64; + for metric in &fam.metrics { + if labels_contain(&metric.labels, &labels) { + sum += metric.gauge_value() + metric.counter_value(); + } + } + Ok(Some(gauge_metric(sum))) + }) +} + +/// Sums the values of series matching all of `labels`; errors on non +/// gauge/counter families. +pub(crate) fn sum_labels(labels: Vec) -> Selector { + Box::new(move |fam: &MetricFamily| { + if fam.metric_type != MetricType::Gauge && fam.metric_type != MetricType::Counter { + return Err(Error::UnsupportedMetricType); + } + let mut sum = 0.0_f64; + for metric in &fam.metrics { + if labels_contain(&metric.labels, &labels) { + sum += metric.gauge_value() + metric.counter_value(); + } + } + Ok(Some(gauge_metric(sum))) + }) +} + +/// Returns true if every pair in `contain` matches some label in `labels`: +/// names must be equal and the `contain` value is matched as a regex against +/// the label value. A regex that fails to compile is treated as no match +/// (matching Charon, which swallows the compile error). +pub(crate) fn labels_contain(labels: &[LabelPair], contain: &[LabelPair]) -> bool { + for c in contain { + let mut found = false; + for l in labels { + if l.name != c.name { + continue; + } + if Regex::new(&c.value) + .map(|re| re.is_match(&l.value)) + .unwrap_or(false) + { + found = true; + break; + } + } + if !found { + return false; + } + } + true +} diff --git a/crates/tracing/src/layers/metrics.rs b/crates/tracing/src/layers/metrics.rs index d7c0cb04..e484e5de 100644 --- a/crates/tracing/src/layers/metrics.rs +++ b/crates/tracing/src/layers/metrics.rs @@ -1,33 +1,117 @@ +use std::fmt; + +use tracing::{ + field::{Field, Visit}, + span::{Attributes, Id}, +}; +use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan}; + use crate::metrics::TRACING_METRICS; -/// Metrics layer. +/// Metrics layer: counts error/warn log events, labelled by `topic` +/// (mirroring Charon's `app_log_{error,warn}_total{topic}`). pub struct MetricsLayer; -fn inc_error_count() { - TRACING_METRICS.error_count.inc(); +/// The `topic` field value recorded for a span. +struct SpanTopic(String); + +/// Records the `topic` span field, if present. +struct TopicVisitor(Option); + +impl Visit for TopicVisitor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "topic" { + self.0 = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + if field.name() == "topic" && self.0.is_none() { + self.0 = Some(format!("{value:?}")); + } + } } -fn inc_warn_count() { - TRACING_METRICS.warn_count.inc(); +/// Returns the `topic` of the nearest enclosing span, or empty when none is set +/// (mirroring Charon's `metricsTopicFromCtx`, which defaults to empty). +fn event_topic(ctx: &Context<'_, S>, event: &tracing::Event<'_>) -> String +where + S: tracing::Subscriber + for<'a> LookupSpan<'a>, +{ + for span in ctx.event_scope(event).into_iter().flatten() { + if let Some(topic) = span.extensions().get::() { + return topic.0.clone(); + } + } + String::new() } -impl tracing_subscriber::Layer for MetricsLayer { - fn on_event( - &self, - event: &tracing::Event<'_>, - _ctx: tracing_subscriber::layer::Context<'_, S>, - ) { - // check level - match *event.metadata().level() { +impl Layer for MetricsLayer +where + S: tracing::Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let mut visitor = TopicVisitor(None); + attrs.record(&mut visitor); + if let Some(topic) = visitor.0 + && let Some(span) = ctx.span(id) + { + span.extensions_mut().insert(SpanTopic(topic)); + } + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let level = *event.metadata().level(); + if level != tracing::Level::ERROR && level != tracing::Level::WARN { + return; + } + + let topic = event_topic(&ctx, event); + match level { tracing::Level::ERROR => { - inc_error_count(); + TRACING_METRICS.error_total[&topic].inc(); } tracing::Level::WARN => { - inc_warn_count(); - } - _ => { - // do nothing + TRACING_METRICS.warn_total[&topic].inc(); } + _ => {} } } } + +#[cfg(test)] +mod tests { + use super::*; + use tracing_subscriber::layer::SubscriberExt as _; + + #[test] + fn counts_errors_and_warns_by_topic() { + // Unique topic so the global counter is touched only by this test. + let topic = "metrics_layer_test_topic"; + let subscriber = tracing_subscriber::registry().with(MetricsLayer); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!("test", topic); + let _guard = span.enter(); + tracing::error!("an error"); + tracing::warn!("a warning"); + tracing::info!("ignored"); + }); + + assert_eq!(TRACING_METRICS.error_total[&topic.to_owned()].get(), 1); + assert_eq!(TRACING_METRICS.warn_total[&topic.to_owned()].get(), 1); + } + + #[test] + fn events_without_topic_use_empty_label() { + let subscriber = tracing_subscriber::registry().with(MetricsLayer); + + let before = TRACING_METRICS.error_total[&String::new()].get(); + tracing::subscriber::with_default(subscriber, || { + tracing::error!("no topic"); + }); + let after = TRACING_METRICS.error_total[&String::new()].get(); + + assert!(after > before); + } +} diff --git a/crates/tracing/src/metrics.rs b/crates/tracing/src/metrics.rs index 2374fd7b..c7b09b93 100644 --- a/crates/tracing/src/metrics.rs +++ b/crates/tracing/src/metrics.rs @@ -1,13 +1,21 @@ -use vise::{Counter, Metrics}; +use vise::{Counter, LabeledFamily, Metrics}; /// Metrics for the tracing. +/// +/// Emitted as `app_log_error_total{topic}` / `app_log_warn_total{topic}`, +/// matching Charon's `app/log` metrics (name, `_total` suffix, and `topic` +/// label), so Charon's Grafana dashboards and the health checker pick them up +/// by their Charon names. #[derive(Debug, Metrics)] +#[metrics(prefix = "app_log")] pub struct TracingMetrics { - /// Error count. - pub error_count: Counter, + /// Total count of logged errors by topic + #[metrics(labels = ["topic"])] + pub error_total: LabeledFamily, - /// Warn count. - pub warn_count: Counter, + /// Total count of logged warnings by topic + #[metrics(labels = ["topic"])] + pub warn_total: LabeledFamily, } /// Global metrics for the tracing. From 5a80a7f354f69e7974877e1c9fccbf054b190c2e Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 24 Jun 2026 16:58:42 +0700 Subject: [PATCH 2/3] feat(app): implement health --- Cargo.lock | 1 + crates/app/Cargo.toml | 1 + crates/app/src/health/checker.rs | 158 ++++++++++++++---- crates/app/src/health/checks.rs | 233 ++++++++++++++------------- crates/app/src/health/error.rs | 6 +- crates/app/src/health/gatherer.rs | 167 ++++++++++++++++--- crates/app/src/health/metrics.rs | 2 +- crates/app/src/health/mod.rs | 66 ++------ crates/app/src/health/model.rs | 12 +- crates/app/src/health/select.rs | 3 +- crates/app/src/lib.rs | 5 + crates/tracing/src/layers/metrics.rs | 8 +- crates/tracing/src/metrics.rs | 8 +- 13 files changed, 424 insertions(+), 246 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b65c3ffb..bca6cfdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5496,6 +5496,7 @@ dependencies = [ "tokio-util", "tracing", "url", + "vise", "wiremock", ] diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 77f14edf..872e5e9c 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -32,6 +32,7 @@ pluto-cluster.workspace = true pluto-k1util.workspace = true pluto-crypto.workspace = true pluto-ssz.workspace = true +vise.workspace = true [build-dependencies] pluto-build-proto.workspace = true diff --git a/crates/app/src/health/checker.rs b/crates/app/src/health/checker.rs index 320d16a2..c6139ad7 100644 --- a/crates/app/src/health/checker.rs +++ b/crates/app/src/health/checker.rs @@ -6,38 +6,46 @@ use tokio_util::sync::CancellationToken; use tracing::warn; use super::{ - LABELS_CARDINALITY_THRESHOLD, MAX_SCRAPES, Metadata, SCRAPE_PERIOD, - checks::{Check, all_checks, new_query_func}, + checks::{CHECKS, Metadata}, error::{Error, Result}, gatherer::Gatherer, metrics::HEALTH_METRICS, - model::MetricFamily, + model::{MetricFamily, MetricType}, + reducers::Reducer, + select::Selector, }; +/// Period between metric scrapes. +const SCRAPE_PERIOD: Duration = Duration::from_secs(30); + +/// Maximum number of scrapes retained in the rolling window. +const MAX_SCRAPES: usize = 10; + +/// High-cardinality threshold for a single validator; for `n` validators the +/// effective threshold is `LABELS_CARDINALITY_THRESHOLD * n`. +const LABELS_CARDINALITY_THRESHOLD: usize = 100; + /// Name of the high-cardinality metric, skipped during the cardinality scan. const HIGH_CARDINALITY_METRIC: &str = "app_health_metrics_high_cardinality"; /// Health checker: periodically scrapes metrics and publishes check results. pub struct Checker { metadata: Metadata, - checks: Vec, metrics: Vec>, gatherer: Box, scrape_period: Duration, max_scrapes: usize, - num_validators: i64, + num_validators: usize, } impl Checker { /// Returns a new health checker. /// /// `num_validators` is used for the high-cardinality threshold and is - /// distinct from [`Metadata::num_validators`] (which the checks use), - /// mirroring Charon's `NewChecker`. - pub fn new(metadata: Metadata, gatherer: Box, num_validators: i64) -> Self { + /// distinct from [`Metadata::num_validators`] (which the checks use). + pub fn new(metadata: Metadata, gatherer: Box, num_validators: usize) -> Self { Self { metadata, - checks: all_checks(), metrics: Vec::new(), gatherer, scrape_period: SCRAPE_PERIOD, @@ -46,25 +54,24 @@ impl Checker { } } - /// Runs the checker until `quit` is cancelled. All logs emitted while - /// running carry the `health` topic, mirroring Charon's - /// `log.WithTopic(ctx, "health")`. - pub async fn run(mut self, quit: CancellationToken) { + /// Runs the checker until `ct` is cancelled. All logs emitted while + /// running carry the `health` topic. + pub async fn run(mut self, ct: CancellationToken) { let span = tracing::debug_span!("health", topic = "health"); // Full-path trait call so it doesn't shadow the inherent `instrument`. - tracing::Instrument::instrument(self.run_loop(quit), span).await; + tracing::Instrument::instrument(self.run_loop(ct), span).await; } /// The scrape/instrument loop, ticking every [`Checker::scrape_period`]. - async fn run_loop(&mut self, quit: CancellationToken) { + async fn run_loop(&mut self, ct: CancellationToken) { let mut interval = tokio::time::interval(self.scrape_period); // Skip the immediate first tick so the first scrape happens after one - // full period, matching Charon's `time.NewTicker` semantics. + // full period rather than immediately. interval.tick().await; loop { tokio::select! { - () = quit.cancelled() => return, + () = ct.cancelled() => return, _ = interval.tick() => { if let Err(error) = self.scrape() { warn!(?error, "Failed to scrape metrics"); @@ -81,8 +88,7 @@ impl Checker { fn scrape(&mut self) -> Result<()> { let mut scrape = self.gatherer.gather().map_err(Error::GatherMetrics)?; - let threshold = LABELS_CARDINALITY_THRESHOLD - .saturating_mul(usize::try_from(self.num_validators).unwrap_or(0)); + let threshold = LABELS_CARDINALITY_THRESHOLD.saturating_mul(self.num_validators); let mut gather_again = false; for family in &scrape { @@ -90,16 +96,11 @@ impl Checker { continue; } - let max_labels = family - .metrics - .iter() - .map(|metric| metric.labels.len()) - .max() - .unwrap_or(0); + let max_labels = max_label_count(family); if max_labels > threshold { HEALTH_METRICS.metrics_high_cardinality[&family.name] - .set(i64::try_from(max_labels).unwrap_or(i64::MAX)); + .set(i64::try_from(max_labels)?); gather_again = true; } } @@ -119,13 +120,12 @@ impl Checker { /// Runs all checks against the rolling window and updates the gauge. fn instrument(&self) { let query = new_query_func(&self.metrics); - for check in &self.checks { + for check in &CHECKS { let failing = match (check.func)(&query, &self.metadata) { Ok(failing) => failing, Err(error) => { - // Charon rate-limits this warning via log.Filter(); Pluto - // has no equivalent, so it is logged each tick. The gauge is - // still cleared (set to 0) on error, matching Charon. + // Logged every tick (no rate-limiting). The gauge is still + // cleared (set to 0) when the check errors. warn!(check = check.name, ?error, "Health check failed"); false } @@ -138,6 +138,66 @@ impl Checker { } } +/// Maximum label count across a family's series, used for high-cardinality +/// detection. For histograms the synthetic `le` bucket label is excluded so the +/// count matches the protobuf model, where `le` is bucket structure rather than +/// a label. +fn max_label_count(family: &MetricFamily) -> usize { + let exclude_le = family.metric_type == MetricType::Histogram; + family + .metrics + .iter() + .map(|metric| { + if exclude_le { + metric + .labels + .iter() + .filter(|label| label.name != "le") + .count() + } else { + metric.labels.len() + } + }) + .max() + .unwrap_or(0) +} + +/// Query function bound to a rolling window of scrapes. +pub(crate) struct QueryFunc<'a> { + metrics: &'a [Vec], +} + +/// Returns a query function over `metrics` (a rolling window of scrapes). +pub(crate) fn new_query_func(metrics: &[Vec]) -> QueryFunc<'_> { + QueryFunc { metrics } +} + +impl QueryFunc<'_> { + /// For each scrape, finds the first family matching `name` with at least + /// one series, applies `selector` to it, and collects the resulting + /// samples; then reduces them with `reducer`. + pub(crate) fn query(&self, name: &str, selector: Selector, reducer: Reducer) -> Result { + let mut selected = Vec::new(); + + for scrape in self.metrics { + for family in scrape { + if family.name != name || family.metrics.is_empty() { + continue; + } + match selector(family).map_err(|e| Error::LabelSelector(Box::new(e)))? { + None => continue, + Some(metric) => { + selected.push(metric); + break; + } + } + } + } + + reducer(&selected).map_err(|e| Error::SeriesReducer(Box::new(e))) + } +} + #[cfg(test)] mod tests { use std::sync::{ @@ -179,7 +239,7 @@ mod tests { } } - fn empty_checker(responder: Responder, num_validators: i64) -> (Checker, Arc) { + fn empty_checker(responder: Responder, num_validators: usize) -> (Checker, Arc) { let (mock, calls) = MockGatherer::new(responder); let checker = Checker::new(Metadata::default(), Box::new(mock), num_validators); (checker, calls) @@ -241,8 +301,8 @@ mod tests { let (mut checker, calls) = empty_checker(Box::new(|_| Ok(Vec::new())), 1); checker.scrape_period = Duration::from_millis(5); - let quit = CancellationToken::new(); - let handle = tokio::spawn(checker.run(quit.clone())); + let ct = CancellationToken::new(); + let handle = tokio::spawn(checker.run(ct.clone())); let deadline = tokio::time::Instant::now() .checked_add(Duration::from_secs(2)) @@ -255,7 +315,37 @@ mod tests { tokio::time::sleep(Duration::from_millis(1)).await; } - quit.cancel(); + ct.cancel(); handle.await.expect("join run task"); } + + #[test] + fn max_label_count_excludes_le_only_for_histograms() { + let series = |metric_type, labels: &[&str]| MetricFamily { + name: "x".to_owned(), + metric_type, + metrics: vec![Metric { + labels: labels + .iter() + .map(|&name| LabelPair { + name: name.to_owned(), + value: "v".to_owned(), + }) + .collect(), + counter: None, + gauge: Some(1.0), + }], + }; + + // Histogram bucket series: `le` is bucket structure, not counted. + assert_eq!( + max_label_count(&series(MetricType::Histogram, &["peer", "le"])), + 1 + ); + // Non-histogram: a literal `le` label is counted (matches the protobuf model). + assert_eq!( + max_label_count(&series(MetricType::Gauge, &["peer", "le"])), + 2 + ); + } } diff --git a/crates/app/src/health/checks.rs b/crates/app/src/health/checks.rs index 75e2c7fe..230b0673 100644 --- a/crates/app/src/health/checks.rs +++ b/crates/app/src/health/checks.rs @@ -1,22 +1,57 @@ -//! The fixed set of health checks and the query function over scraped metrics. +//! Health checks: severity, cluster metadata, the check type, the fixed list of +//! 9 checks, and the label-pair helper. use super::{ - Metadata, Severity, - error::{Error, Result}, - model::{LabelPair, MetricFamily}, - reducers::{Reducer, gauge_max, increase}, - select::{Selector, count_labels, count_non_zero_labels, no_labels, sum_labels}, + checker::QueryFunc, + error::Result, + model::LabelPair, + reducers::{gauge_max, increase}, + select::{count_labels, count_non_zero_labels, no_labels, sum_labels}, }; +/// Severity of a health check. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Severity { + /// Critical: the node is likely not performing its duties. + Critical, + /// Warning: something needs attention. + Warning, + /// Info: informational only. + Info, +} + +impl Severity { + /// Returns the lowercase string used as the `severity` label value. + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Critical => "critical", + Self::Warning => "warning", + Self::Info => "info", + } + } +} + +/// Metadata about the cluster, used by the health checks. +#[derive(Debug, Clone, Copy, Default)] +pub struct Metadata { + /// Number of validators in the cluster. + pub num_validators: i64, + /// Number of peers in the cluster. + pub num_peers: i64, + /// Number of peers required for quorum. + pub quorum_peers: i64, +} + /// A health check. pub(crate) struct Check { /// Name of the check (also the `name` label value). pub(crate) name: &'static str, - /// Human-readable description. - /// - /// Retained for parity with Charon's `check.Description`; not yet surfaced - /// (Charon does not consume it within the package either). - #[allow(dead_code, reason = "ported for parity; surfaced by future tooling")] + /// Human-readable description. Not yet surfaced anywhere; retained for + /// completeness. + #[allow( + dead_code, + reason = "retained for completeness; surfaced by future tooling" + )] pub(crate) description: &'static str, /// Severity. pub(crate) severity: Severity, @@ -24,42 +59,6 @@ pub(crate) struct Check { pub(crate) func: fn(&QueryFunc<'_>, &Metadata) -> Result, } -/// Query function bound to a rolling window of scrapes. -pub(crate) struct QueryFunc<'a> { - metrics: &'a [Vec], -} - -/// Returns a query function over `metrics` (a rolling window of scrapes). -pub(crate) fn new_query_func(metrics: &[Vec]) -> QueryFunc<'_> { - QueryFunc { metrics } -} - -impl QueryFunc<'_> { - /// For each scrape, finds the first family matching `name` with at least - /// one series, applies `selector` to it, and collects the resulting - /// samples; then reduces them with `reducer`. - pub(crate) fn query(&self, name: &str, selector: Selector, reducer: Reducer) -> Result { - let mut selected = Vec::new(); - - for scrape in self.metrics { - for family in scrape { - if family.name != name || family.metrics.is_empty() { - continue; - } - match selector(family).map_err(|e| Error::LabelSelector(Box::new(e)))? { - None => continue, - Some(metric) => { - selected.push(metric); - break; - } - } - } - } - - reducer(&selected).map_err(|e| Error::SeriesReducer(Box::new(e))) - } -} - /// Convenience constructor for a label pair. fn label(name: &str, value: &str) -> LabelPair { LabelPair { @@ -84,8 +83,11 @@ fn high_error_log_rate(q: &QueryFunc<'_>, m: &Metadata) -> Result { } fn high_warning_log_rate(q: &QueryFunc<'_>, m: &Metadata) -> Result { + // Deviation from Charon: Charon's check queries `app_log_warning_total`, + // but the warn counter is emitted as `app_log_warn_total`, so Charon's own + // check never matches it. We query the emitted name so this check fires. // Allow 2 warnings per validator. - let value = q.query("app_log_warning_total", sum_labels(Vec::new()), increase)?; + let value = q.query("app_log_warn_total", sum_labels(Vec::new()), increase)?; Ok(value > 2.0 * to_f64(m.num_validators)) } @@ -141,71 +143,71 @@ fn using_fallback_beacon_nodes(q: &QueryFunc<'_>, _m: &Metadata) -> Result Ok(max_val > 0.0) } -/// Returns the full set of health checks, in Charon order. -pub(crate) fn all_checks() -> Vec { - vec![ - Check { - name: "high_error_log_rate", - description: "High rate of error logs. Please check the logs for more details.", - severity: Severity::Warning, - func: high_error_log_rate, - }, - Check { - name: "high_warning_log_rate", - description: "High rate of warning logs. Please check the logs for more details.", - severity: Severity::Warning, - func: high_warning_log_rate, - }, - Check { - name: "beacon_node_syncing", - description: "Beacon Node in syncing state.", - severity: Severity::Critical, - func: beacon_node_syncing, - }, - Check { - name: "insufficient_connected_peers", - description: "Not connected to at least quorum peers. Check logs for networking issue or coordinate with peers.", - severity: Severity::Critical, - func: insufficient_connected_peers, - }, - Check { - name: "pending_validators", - description: "Pending validators detected. Activate them to start validating.", - severity: Severity::Info, - func: pending_validators, - }, - Check { - name: "proposal_failures", - description: "Proposal failures detected. See .", - severity: Severity::Warning, - func: proposal_failures, - }, - Check { - name: "high_registration_failures_rate", - description: "High rate of failed validator registrations. Please check the logs for more details.", - severity: Severity::Warning, - func: high_registration_failures_rate, - }, - Check { - name: "metrics_high_cardinality", - description: "Metrics reached high cardinality threshold. Please check metrics reported by app_health_metrics_high_cardinality.", - severity: Severity::Warning, - func: metrics_high_cardinality, - }, - Check { - name: "using_fallback_beacon_nodes", - description: "Using fallback beacon nodes. Please check primary beacon nodes health.", - severity: Severity::Warning, - func: using_fallback_beacon_nodes, - }, - ] -} +/// The full set of health checks. +pub(crate) const CHECKS: [Check; 9] = [ + Check { + name: "high_error_log_rate", + description: "High rate of error logs. Please check the logs for more details.", + severity: Severity::Warning, + func: high_error_log_rate, + }, + Check { + name: "high_warning_log_rate", + description: "High rate of warning logs. Please check the logs for more details.", + severity: Severity::Warning, + func: high_warning_log_rate, + }, + Check { + name: "beacon_node_syncing", + description: "Beacon Node in syncing state.", + severity: Severity::Critical, + func: beacon_node_syncing, + }, + Check { + name: "insufficient_connected_peers", + description: "Not connected to at least quorum peers. Check logs for networking issue or coordinate with peers.", + severity: Severity::Critical, + func: insufficient_connected_peers, + }, + Check { + name: "pending_validators", + description: "Pending validators detected. Activate them to start validating.", + severity: Severity::Info, + func: pending_validators, + }, + Check { + name: "proposal_failures", + description: "Proposal failures detected. See .", + severity: Severity::Warning, + func: proposal_failures, + }, + Check { + name: "high_registration_failures_rate", + description: "High rate of failed validator registrations. Please check the logs for more details.", + severity: Severity::Warning, + func: high_registration_failures_rate, + }, + Check { + name: "metrics_high_cardinality", + description: "Metrics reached high cardinality threshold. Please check metrics reported by app_health_metrics_high_cardinality.", + severity: Severity::Warning, + func: metrics_high_cardinality, + }, + Check { + name: "using_fallback_beacon_nodes", + description: "Using fallback beacon nodes. Please check primary beacon nodes health.", + severity: Severity::Warning, + func: using_fallback_beacon_nodes, + }, +]; #[cfg(test)] mod tests { - use super::*; + //! Tests for the checks and the query function. + + use super::{CHECKS, Metadata}; use crate::health::{ - Metadata, + checker::new_query_func, model::{LabelPair, Metric, MetricFamily, MetricType}, }; @@ -246,7 +248,7 @@ mod tests { } /// Transposes a set of series (series × time) into per-scrape families - /// (time × family), mirroring Charon's `genFam`. + /// (time × family). fn gen_fam(name: &str, series: &[Vec]) -> Vec { let metric_type = if series .first() @@ -277,9 +279,8 @@ mod tests { resp } - /// Mirrors Charon's `testCheck`: interleaves the check's per-scrape - /// families with two noise families, runs the named check, and asserts - /// the outcome. + /// Interleaves the check's per-scrape families with two noise families, + /// runs the named check, and asserts the outcome. fn test_check(m: &Metadata, check_name: &str, expect: bool, metrics: Vec) { let random_foo = gen_fam( "foo", @@ -313,8 +314,8 @@ mod tests { } let query = new_query_func(&multi); - let check = all_checks() - .into_iter() + let check = CHECKS + .iter() .find(|c| c.name == check_name) .expect("check not found"); let failed = (check.func)(&query, m).expect("check should not error"); @@ -666,7 +667,7 @@ mod tests { ..Metadata::default() }; let name = "high_warning_log_rate"; - let metric = "app_log_warning_total"; + let metric = "app_log_warn_total"; let topic_a = gen_labels(&["topic", "a"]); let topic_b = gen_labels(&["topic", "b"]); diff --git a/crates/app/src/health/error.rs b/crates/app/src/health/error.rs index 896c1022..6e231a09 100644 --- a/crates/app/src/health/error.rs +++ b/crates/app/src/health/error.rs @@ -3,8 +3,6 @@ use super::gatherer::GatherError; /// Errors produced while evaluating health checks or gathering metrics. -/// -/// The leaf message strings match Charon's `app/health` error strings exactly. #[derive(Debug, thiserror::Error)] pub enum Error { /// A metric family expected to contain exactly one series did not. @@ -35,6 +33,10 @@ pub enum Error { /// Gathering metrics from the registry failed. #[error("gather metrics")] GatherMetrics(#[source] GatherError), + + /// An integer conversion overflowed. + #[error("conversion error")] + ConversionError(#[from] std::num::TryFromIntError), } /// Result type for health operations. diff --git a/crates/app/src/health/gatherer.rs b/crates/app/src/health/gatherer.rs index 880b73ec..7d1f55af 100644 --- a/crates/app/src/health/gatherer.rs +++ b/crates/app/src/health/gatherer.rs @@ -6,7 +6,7 @@ //! (`Format::OpenMetricsForPrometheus`) and parse it back, so the checker reads //! the same metric names Prometheus/Grafana scrape. In that format vise strips //! the OpenMetrics `_total` suffix from counter samples, so counter names match -//! the registered base name (and Charon). +//! the registered base name. use vise::{Format, MetricsCollection, Registry}; @@ -43,9 +43,12 @@ fn gather_registry(registry: &Registry) -> std::result::Result /// Parses a Prometheus/OpenMetrics text exposition into metric families. /// -/// Best-effort: comment lines other than `# TYPE` are skipped, and sample lines -/// that fail to parse are dropped. Each `# TYPE` line starts a new family; -/// subsequent sample lines are grouped under it. +/// Each `# TYPE` line starts a new family. A sample line is added to the +/// current family — except that a queried (counter/gauge) family rejects a +/// sample whose name doesn't match the `# TYPE` name, so a stray line can't be +/// folded into a queried metric; histogram/info families keep all their lines +/// for the cardinality scan. Comment lines and unparseable samples are skipped +/// (debug). fn parse_exposition(text: &str) -> Vec { let mut families: Vec = Vec::new(); let mut current: Option = None; @@ -69,15 +72,25 @@ fn parse_exposition(text: &str) -> Vec { } else if line.starts_with('#') { // # HELP / # UNIT / # EOF / other comment. } else if let Some(family) = current.as_mut() { - match parse_sample(line, family.metric_type) { - Some(metric) => family.metrics.push(metric), - // Input is trusted (we encode it ourselves), so a failure here - // means a parser gap. Surface it at debug rather than dropping - // silently (which could mask a queried metric). Not warn/error: - // those would feed back into `app_log_*_total` and the checker. - None => { - tracing::debug!(sample = line, "dropping unparseable metric sample"); + if let Some((name, metric)) = parse_sample(line, family.metric_type) { + // Counter/gauge families are queried by the checks, so they must + // contain only their own series: reject a sample whose name does + // not match the `# TYPE` name, which would otherwise be folded + // into (e.g. summed onto) a queried metric. Histogram/info + // families are never queried; keep all their lines so the + // cardinality scan still sees them. + let queried = matches!(family.metric_type, MetricType::Counter | MetricType::Gauge); + if !queried || name == family.name { + family.metrics.push(metric); + } else { + tracing::debug!(sample = line, "dropping sample: name mismatches its family"); } + } else { + // We encode this text ourselves, so an unparseable sample means a + // parser gap. Surface it at debug rather than dropping silently + // (which could mask a queried metric). Not warn/error: those feed + // back into `app_log_*_total` and the checker. + tracing::debug!(sample = line, "dropping unparseable metric sample"); } } } @@ -109,24 +122,25 @@ fn split_once_ws(s: &str) -> (&str, &str) { } } -/// Parses one sample line (`name`, `name value`, or `name{labels} value`). -fn parse_sample(line: &str, metric_type: MetricType) -> Option { - let (labels, value_str) = match line.split_once('{') { - Some((_name, rest)) => { +/// Parses one sample line (`name`, `name value`, or `name{labels} value`), +/// returning the sample's metric name and the parsed metric. +fn parse_sample(line: &str, metric_type: MetricType) -> Option<(&str, Metric)> { + let (name, labels, value_str) = match line.split_once('{') { + Some((name, rest)) => { let (labels, after) = scan_labels(rest)?; let value = after.split_ascii_whitespace().next()?; - (labels, value) + (name, labels, value) } None => { let mut parts = line.split_ascii_whitespace(); - let _name = parts.next()?; + let name = parts.next()?; let value = parts.next()?; - (Vec::new(), value) + (name, Vec::new(), value) } }; let value: f64 = value_str.parse().ok()?; - Some(make_metric(labels, value, metric_type)) + Some((name, make_metric(labels, value, metric_type))) } /// Scans label pairs starting just after the opening `{`, returning the pairs @@ -208,7 +222,7 @@ fn make_metric(labels: Vec, value: f64, metric_type: MetricType) -> M #[cfg(test)] mod tests { use super::*; - use vise::{Counter, Gauge, LabeledFamily, Metrics, Registry}; + use vise::{Counter, Gauge, Histogram, LabeledFamily, Metrics, Registry}; #[derive(Debug, Metrics)] #[metrics(prefix = "app_log")] @@ -238,7 +252,7 @@ mod tests { } #[test] - fn gather_emits_charon_names_and_strips_total() { + fn gather_emits_expected_names_and_strips_total() { let log = LogLike::default(); log.error_total.inc(); let tracker = TrackerLike::default(); @@ -308,4 +322,113 @@ demo{path=\"a\\\"b\",note=\"x\\\\y\"} 3"; assert_eq!(demo.metrics[0].labels[0].value, "a\"b"); assert_eq!(demo.metrics[0].labels[1].value, "x\\y"); } + + #[derive(Debug, Metrics)] + #[metrics(prefix = "demo")] + struct Mixed { + requests_total: Counter, + #[metrics(labels = ["peer"])] + connected: LabeledFamily, + #[metrics(buckets = &[0.1, 1.0])] + latency_seconds: Histogram, + } + + #[test] + fn histogram_does_not_corrupt_queried_families() { + let m = Mixed::default(); + m.requests_total.inc_by(2); + m.connected[&"peerA".to_owned()].set(1); + m.latency_seconds.observe(0.5); + m.latency_seconds.observe(2.0); + + let mut registry = Registry::empty(); + registry.register_metrics(&m); + let families = gather_registry(®istry).expect("gather"); + + // Queried counter family: exactly its own series and value — not + // polluted by the histogram's `_bucket`/`_sum`/`_count` lines. + let counter = family(&families, "demo_requests_total"); + assert_eq!(counter.metric_type, MetricType::Counter); + assert_eq!(counter.metrics.len(), 1); + assert_eq!(counter.metrics[0].counter_value(), 2.0); + + // Queried gauge family: exactly its own series. + let gauge = family(&families, "demo_connected"); + assert_eq!(gauge.metric_type, MetricType::Gauge); + assert_eq!(gauge.metrics.len(), 1); + assert_eq!(gauge.metrics[0].gauge_value(), 1.0); + + // Histogram family is kept (for the cardinality scan); its sub-lines did + // not leak into the families above and carry no counter/gauge value. + let hist = family(&families, "demo_latency_seconds"); + assert_eq!(hist.metric_type, MetricType::Histogram); + assert!(!hist.metrics.is_empty()); + assert!( + hist.metrics + .iter() + .all(|metric| metric.counter.is_none() && metric.gauge.is_none()) + ); + } + + #[test] + fn rejects_sample_name_mismatch_in_queried_family() { + // A stray line whose name differs from the counter's `# TYPE` must not + // be folded into the queried counter family. + let text = "\ +# TYPE app_log_error_total counter +app_log_error_total 5 +app_log_error_total_bucket{le=\"1\"} 99"; + let families = parse_exposition(text); + let errors = family(&families, "app_log_error_total"); + assert_eq!(errors.metrics.len(), 1); + assert_eq!(errors.metrics[0].counter_value(), 5.0); + } + + #[test] + fn format_pin_counter_encoding() { + // Pins vise's OpenMetricsForPrometheus output: a future vise change + // (e.g. `_total` doubling, a `_created` line, or different spacing) must + // fail here rather than silently break the gatherer. + let log = LogLike::default(); + log.error_total.inc_by(3); + let mut registry = Registry::empty(); + registry.register_metrics(&log); + + let mut text = String::new(); + registry + .encode(&mut text, vise::Format::OpenMetricsForPrometheus) + .expect("encode"); + + assert!( + text.contains("# TYPE app_log_error_total counter"), + "type line missing: {text}" + ); + assert!( + text.contains("app_log_error_total 3"), + "sample line missing/changed: {text}" + ); + assert!( + !text.contains("app_log_error_total_total"), + "_total must not be doubled: {text}" + ); + assert!( + !text.contains("_created"), + "_created must not be emitted: {text}" + ); + } + + #[test] + fn special_float_values_parse() { + // `+Inf` / `-Inf` / `NaN` are valid f64 strings, so such sample values + // parse rather than being silently dropped. + let text = "\ +# TYPE g gauge +g{k=\"a\"} +Inf +g{k=\"b\"} -Inf +g{k=\"c\"} NaN +g{k=\"d\"} 1.5"; + let families = parse_exposition(text); + let g = family(&families, "g"); + assert_eq!(g.metrics.len(), 4); + } } diff --git a/crates/app/src/health/metrics.rs b/crates/app/src/health/metrics.rs index 6b38aa42..cf2d418c 100644 --- a/crates/app/src/health/metrics.rs +++ b/crates/app/src/health/metrics.rs @@ -6,7 +6,7 @@ use vise::{Gauge, Global, LabeledFamily, Metrics}; /// /// Emitted (after vise's `_total`-strip / Prometheus naming) as /// `app_health_checks{severity,name}` and -/// `app_health_metrics_high_cardinality{name}`, matching Charon. +/// `app_health_metrics_high_cardinality{name}`. #[derive(Debug, Metrics)] #[metrics(prefix = "app_health")] pub struct HealthMetrics { diff --git a/crates/app/src/health/mod.rs b/crates/app/src/health/mod.rs index d95a82b7..d1496870 100644 --- a/crates/app/src/health/mod.rs +++ b/crates/app/src/health/mod.rs @@ -1,13 +1,17 @@ //! Application health checks. //! -//! A background service that, every [`SCRAPE_PERIOD`], scrapes all process -//! metrics, keeps a rolling window of the last [`MAX_SCRAPES`] scrapes, runs a -//! fixed set of health checks over that window (query a metric by name → select -//! series by label → reduce the time series to one number → compare to a -//! threshold), and publishes the per-check pass/fail state as the -//! `app_health_checks{severity,name}` gauge (1 = failing, 0 = ok). It also -//! detects high-cardinality metrics and publishes -//! `app_health_metrics_high_cardinality{name}`. +//! A background service that, every 30s, scrapes all process metrics, keeps a +//! rolling window of the last 10 scrapes, runs a fixed set of health checks +//! over that window (query a metric by name → select series by label → reduce +//! the time series to one number → compare to a threshold), and publishes the +//! per-check pass/fail state as the `app_health_checks{severity,name}` gauge +//! (1 = failing, 0 = ok). It also detects high-cardinality metrics and +//! publishes `app_health_metrics_high_cardinality{name}`. +//! +//! The module is split into `checker.rs`, `checks.rs` (with the check tests +//! inline), `select.rs`, `reducers.rs`, and `metrics.rs`. `model.rs`, +//! `error.rs`, and `gatherer.rs` provide the metric model, the error type, and +//! the registry-to-model bridge. mod checker; mod checks; @@ -19,52 +23,8 @@ mod reducers; mod select; pub use checker::Checker; +pub use checks::Metadata; pub use error::{Error, Result}; pub use gatherer::{GatherError, Gatherer, ViseGatherer}; pub use metrics::{HEALTH_METRICS, HealthMetrics}; pub use model::{LabelPair, Metric, MetricFamily, MetricType}; - -use std::time::Duration; - -/// Period between metric scrapes. -const SCRAPE_PERIOD: Duration = Duration::from_secs(30); - -/// Maximum number of scrapes retained in the rolling window. -const MAX_SCRAPES: usize = 10; - -/// High-cardinality threshold for a single validator; for `n` validators the -/// effective threshold is `LABELS_CARDINALITY_THRESHOLD * n`. -const LABELS_CARDINALITY_THRESHOLD: usize = 100; - -/// Severity of a health check. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum Severity { - /// Critical: the node is likely not performing its duties. - Critical, - /// Warning: something needs attention. - Warning, - /// Info: informational only. - Info, -} - -impl Severity { - /// Returns the lowercase string used as the `severity` label value. - pub(crate) fn as_str(self) -> &'static str { - match self { - Self::Critical => "critical", - Self::Warning => "warning", - Self::Info => "info", - } - } -} - -/// Metadata about the cluster, used by the health checks. -#[derive(Debug, Clone, Copy, Default)] -pub struct Metadata { - /// Number of validators in the cluster. - pub num_validators: i64, - /// Number of peers in the cluster. - pub num_peers: i64, - /// Number of peers required for quorum. - pub quorum_peers: i64, -} diff --git a/crates/app/src/health/model.rs b/crates/app/src/health/model.rs index fe7c1d19..c8bde2c3 100644 --- a/crates/app/src/health/model.rs +++ b/crates/app/src/health/model.rs @@ -1,9 +1,8 @@ //! In-memory Prometheus metric model used by the health checks. //! -//! This mirrors the subset of the Prometheus protobuf model that Charon's -//! health checks rely on. Per-sample timestamps are intentionally omitted: the -//! reducers never read them — the time dimension comes from the checker storing -//! successive scrapes. +//! A minimal subset of the Prometheus metric model the checks rely on. +//! Per-sample timestamps are intentionally omitted: the reducers never read +//! them — the time dimension comes from the checker storing successive scrapes. /// Type of a metric family. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -42,13 +41,12 @@ pub struct Metric { } impl Metric { - /// Counter value, defaulting to `0.0` when absent (mirrors protobuf nil → - /// 0). + /// Counter value, defaulting to `0.0` when absent. pub fn counter_value(&self) -> f64 { self.counter.unwrap_or(0.0) } - /// Gauge value, defaulting to `0.0` when absent (mirrors protobuf nil → 0). + /// Gauge value, defaulting to `0.0` when absent. pub fn gauge_value(&self) -> f64 { self.gauge.unwrap_or(0.0) } diff --git a/crates/app/src/health/select.rs b/crates/app/src/health/select.rs index 08a2c8d6..6869242c 100644 --- a/crates/app/src/health/select.rs +++ b/crates/app/src/health/select.rs @@ -78,8 +78,7 @@ pub(crate) fn sum_labels(labels: Vec) -> Selector { /// Returns true if every pair in `contain` matches some label in `labels`: /// names must be equal and the `contain` value is matched as a regex against -/// the label value. A regex that fails to compile is treated as no match -/// (matching Charon, which swallows the compile error). +/// the label value. A regex that fails to compile is treated as no match. pub(crate) fn labels_contain(labels: &[LabelPair], contain: &[LabelPair]) -> bool { for c in contain { let mut found = false; diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 87d7061e..85d72563 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -24,3 +24,8 @@ pub mod privkeylock; /// Utility helpers for archiving, extracting, and comparing files/directories. pub mod utils; + +/// Application health checks: periodically scrapes process metrics, evaluates a +/// fixed set of checks over a rolling window, and publishes per-check pass/fail +/// state as the `app_health_checks` gauge. +pub mod health; diff --git a/crates/tracing/src/layers/metrics.rs b/crates/tracing/src/layers/metrics.rs index e484e5de..2137ca4e 100644 --- a/crates/tracing/src/layers/metrics.rs +++ b/crates/tracing/src/layers/metrics.rs @@ -8,8 +8,8 @@ use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan}; use crate::metrics::TRACING_METRICS; -/// Metrics layer: counts error/warn log events, labelled by `topic` -/// (mirroring Charon's `app_log_{error,warn}_total{topic}`). +/// Metrics layer: counts error/warn log events, labelled by `topic`, into +/// `app_log_{error,warn}_total{topic}`. pub struct MetricsLayer; /// The `topic` field value recorded for a span. @@ -32,8 +32,8 @@ impl Visit for TopicVisitor { } } -/// Returns the `topic` of the nearest enclosing span, or empty when none is set -/// (mirroring Charon's `metricsTopicFromCtx`, which defaults to empty). +/// Returns the `topic` of the nearest enclosing span, or empty when none is +/// set. fn event_topic(ctx: &Context<'_, S>, event: &tracing::Event<'_>) -> String where S: tracing::Subscriber + for<'a> LookupSpan<'a>, diff --git a/crates/tracing/src/metrics.rs b/crates/tracing/src/metrics.rs index c7b09b93..382902b8 100644 --- a/crates/tracing/src/metrics.rs +++ b/crates/tracing/src/metrics.rs @@ -1,11 +1,9 @@ use vise::{Counter, LabeledFamily, Metrics}; -/// Metrics for the tracing. +/// Metrics for the tracing layer. /// -/// Emitted as `app_log_error_total{topic}` / `app_log_warn_total{topic}`, -/// matching Charon's `app/log` metrics (name, `_total` suffix, and `topic` -/// label), so Charon's Grafana dashboards and the health checker pick them up -/// by their Charon names. +/// Emitted as `app_log_error_total{topic}` / `app_log_warn_total{topic}` so the +/// monitoring dashboards and the health checker pick them up by these names. #[derive(Debug, Metrics)] #[metrics(prefix = "app_log")] pub struct TracingMetrics { From dfa61d6cbf382131fe2c9e644dea6ad4200f8dea Mon Sep 17 00:00:00 2001 From: Quang Le Date: Wed, 24 Jun 2026 17:12:05 +0700 Subject: [PATCH 3/3] fix: conflict vise --- crates/app/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 4253bf1d..2977731a 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -35,7 +35,6 @@ pluto-cluster.workspace = true pluto-k1util.workspace = true pluto-crypto.workspace = true pluto-ssz.workspace = true -vise.workspace = true [build-dependencies] pluto-build-proto.workspace = true