From 8ca0112e447e44f5a328a5d790c95f6c46b10714 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Tue, 9 Jun 2026 19:13:46 +0300 Subject: [PATCH 1/2] Mempool: shrink hash tables' capacity from time to time --- mempool/src/pool/tx_pool/mod.rs | 9 ++- mempool/src/pool/tx_pool/store/mod.rs | 79 +++++++++++++++++++++++++ mempool/src/pool/tx_pool/tests/basic.rs | 76 +++++++++++++++++++++++- 3 files changed, 161 insertions(+), 3 deletions(-) diff --git a/mempool/src/pool/tx_pool/mod.rs b/mempool/src/pool/tx_pool/mod.rs index 2497ad088..45f1184a9 100644 --- a/mempool/src/pool/tx_pool/mod.rs +++ b/mempool/src/pool/tx_pool/mod.rs @@ -653,7 +653,13 @@ impl TxPool { fn trim(&mut self) -> Result, MempoolPolicyError> { let mut removed_fees = Vec::new(); - while !self.store.is_empty() && self.memory_usage() > self.max_size.as_bytes() { + loop { + self.store.shrink_capacity_if_needed(); + + if self.store.is_empty() || self.memory_usage() <= self.max_size.as_bytes() { + break; + } + // TODO sort by descendant score, not by fee let removed_id = self .store @@ -673,6 +679,7 @@ impl TxPool { removed_fees.push(FeeRate::from_total_tx_fee(removed.fee(), removed.size())?); self.remove_tx_and_descendants(&removed_id, MempoolRemovalReason::SizeLimit); } + Ok(removed_fees) } diff --git a/mempool/src/pool/tx_pool/store/mod.rs b/mempool/src/pool/tx_pool/store/mod.rs index 862e33393..6922a4d81 100644 --- a/mempool/src/pool/tx_pool/store/mod.rs +++ b/mempool/src/pool/tx_pool/store/mod.rs @@ -663,8 +663,87 @@ impl MempoolStore { let entry = self.get_existing_entry(tx_id)?; entry.collect_cluster(self) } + + /// For internal containers that have capacity, check if the capacity is excessive; shrink + /// the container if it is. + pub fn shrink_capacity_if_needed(&mut self) { + // Note: + // * Hashbrown tables never shrink their capacity automatically. + // * According to the pseudo-test `estimate_max_tx_count_in_store`, the store with the default + // size of 300Mb can fit over 230'000 txs of the smallest possible size. Due to how hashbrown + // tables work (1/8 of all buckets should always be empty, and reallocation doubles the number + // of buckets), `txs_by_id` and `seq_nos_by_tx` may end up with more than 500'000 buckets each. + // Given that the bucket size in each table is 40 bytes (in non-test builds), this results in + // roughly 20Mb of allocated memory per table, which will not go down even if the tables' + // element counts become zero. Since table's entire allocation_size counts towards the + // mempool size, this will effectively reduce the max mempool size by 40Mb. + // * On the other hand, the mempool re-creates its store completely every time a new block + // arrives, so the situation described above can only exist for a few minutes. Still, + // it's better for the store not to depend on such a behavior of its owner code and + // manage the capacities explicitly. + + // Implementation notes: + // * table's `capacity` doesn't count the tombstones, so in a degenerate case like the one + // described above it's possible to have a table with a huge allocation size and small + // capacity. So below we don't use capacity when deciding whether to shrink, and estimate + // (roughly) the number of buckets instead. + // * even though `shrink_to` accepts capacity, it'll compare the estimated number of buckets + // (from the passed capacity) with the current one and reallocate/rehash the table if the + // latter is bigger. + + fn maybe_shrink( + table: &mut TrackedHashMap, + mem_tracker: &mut MemUsageTracker, + table_name: &str, + ) where + K: Eq + std::hash::Hash, + { + let bucket_size = hash_map_bucket_size(table); + let bucket_count = hash_map_bucket_count_upper_bound(table); + + let max_bucket_count = table.len() * HASH_TABLE_MAX_BUCKET_COUNT_FACTOR; + let adjusted_capacity = table.len() * HASH_TABLE_ADJUSTED_CAPACITY_FACTOR; + + if bucket_count > max_bucket_count { + let potentially_reclaimable_mem_size = + (bucket_count - adjusted_capacity) * bucket_size; + + // Only bother shrinking if the win is noticeable. + if potentially_reclaimable_mem_size >= HASH_TABLE_MIN_RECLAIMABLE_MEM_SIZE { + log::debug!("Shrinking {table_name} to {adjusted_capacity}"); + mem_tracker.modify(table, |table, _| table.shrink_to(adjusted_capacity)); + } + } + } + + maybe_shrink(&mut self.txs_by_id, &mut self.mem_tracker, "txs_by_id"); + maybe_shrink( + &mut self.seq_nos_by_tx, + &mut self.mem_tracker, + "seq_nos_by_tx", + ); + } } +pub fn hash_map_bucket_size(_: &StoreHashMap) -> usize { + std::mem::size_of::<(K, V)>() +} + +// Return the upper bound for the number of buckets in the map. +pub fn hash_map_bucket_count_upper_bound(map: &StoreHashMap) -> usize +where + K: Eq + std::hash::Hash, +{ + // Note: the actual number of buckets will be smaller than this, because `allocation_size` also + // includes control bytes and padding. + map.allocation_size() / hash_map_bucket_size(map) +} + +// Constants that determine whether store's hash tables should be shrunk and, if yes, to what capacity. +pub const HASH_TABLE_MAX_BUCKET_COUNT_FACTOR: usize = 5; +pub const HASH_TABLE_ADJUSTED_CAPACITY_FACTOR: usize = 2; +pub const HASH_TABLE_MIN_RECLAIMABLE_MEM_SIZE: usize = 10_000; + #[cfg(test)] impl Drop for MempoolStore { fn drop(&mut self) { diff --git a/mempool/src/pool/tx_pool/tests/basic.rs b/mempool/src/pool/tx_pool/tests/basic.rs index 22c5a7d32..a96e0e7bd 100644 --- a/mempool/src/pool/tx_pool/tests/basic.rs +++ b/mempool/src/pool/tx_pool/tests/basic.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use mempool_types::tx_origin::LocalTxOrigin; + use crate::pool::tx_pool::store::{StoreHashSet, TxMempoolEntryWithAncestors}; use super::*; @@ -1314,7 +1316,6 @@ async fn mempool_full_mock(#[case] seed: Seed) -> anyhow::Result<()> { #[rstest] #[case(Seed::from_entropy())] -#[case::fail(Seed(1))] #[trace] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mempool_full_real(#[case] seed: Seed) { @@ -1370,7 +1371,14 @@ async fn mempool_full_real(#[case] seed: Seed) { // Bump the memory limit again, and re-insert the evicted transaction(s). Also reset the // rolling fee since recently evicted transactions bump it up. - mempool.max_size = MempoolMaxSize::from_bytes(memory_size); + // Note: since the switch to using hashbrown tables in the mempool store, simply resetting + // `mempool.max_size` back to `memory_size` may not be enough for the previously evicted + // txs to fit back in. This is because hashbrown tables are probing tables that leave tombstones + // during element removal and tombstones may not always be reused during insertion. In such a + // case insertion will eat up one additional capacity element, and if the capacity is already at + // the maximum, reallocation will occur, doubling the number of buckets, which will contribute + // to the current mempool size and will cause some of the previously fitting txs to be evicted. + mempool.max_size = MempoolMaxSize::from_bytes(memory_size * 2); mempool.drop_rolling_fee(); for tx in txs { @@ -1720,3 +1728,67 @@ fn stack_overflow_on_transaction_addition( .unwrap(); }); } + +// Note: this is not a real test; it's just a piece of code to estimate the maximum number +// of transactions that can fit into a store of the default size. I.e. it fills the store +// with txs of minimally possible size, stops when it's full and prints the count. +// In the end, it removes all txs from the store and prints the resulting memory usage, which +// will be the capacity overhead introduced by the 2 hashbrown tables. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore] +async fn estimate_max_tx_count_in_store() { + let make_minimal_tx = |input| { + let tx = Transaction::new( + 0, + vec![input], + vec![TxOutput::Transfer(OutputValue::Coin(Amount::ZERO), Destination::AnyoneCanSpend)], + ) + .unwrap(); + SignedTransaction::new(tx, vec![InputWitness::NoSignature(None)]).unwrap() + }; + + let fake_source = OutPointSourceId::Transaction(Id::new(common::primitives::H256::zero())); + let minimal_tx = make_minimal_tx(TxInput::from_utxo(fake_source.clone(), 0)); + log::info!("minimal tx size = {}", minimal_tx.encoded_size()); + + let time = TimeGetter::default().get_time(); + let origin = TxOrigin::Local(LocalTxOrigin::P2p); + let options = TxOptions::default_for(origin); + let fee = Amount::from_atoms(1_000_000).into(); + + let mut storage = MempoolStore::new(Default::default()); + storage.disable_heavy_validity_checks(); + + loop { + let tx_count = storage.txs_by_id.len(); + + let tx = make_minimal_tx(TxInput::from_utxo(fake_source.clone(), tx_count as u32)); + let entry = TxEntry::new(tx, time, origin, options.clone()); + storage.add_transaction(TxEntryWithFee::new(entry, fee)).unwrap(); + + if storage.memory_usage() > MAX_MEMPOOL_SIZE_BYTES { + log::info!("max tx count: {tx_count}",); + break; + } + } + + let tx_ids = storage.txs_by_id.keys().copied().collect::>(); + + for tx_id in tx_ids { + storage.remove_tx(&tx_id, MempoolRemovalReason::Expiry); + } + + assert!(storage.txs_by_id.is_empty()); + + // After all txs have been removed, the only thing that can occupy storage is allocated capacity + // of containers that have it, which at the moment of writing this are the 2 hash tables - + // `txs_by_id` and `seq_nos_by_tx`. + // Note though that this value will not exactly represent what will happen in the actual app, + // because in `cfg(test)` builds AssertDropPolicy is used as StrictDropPolicy, which contains + // an additional bool, which will turn into 8 extra bytes for each bucket of `txs_by_id`, due + // to alignment. + log::info!( + "memory usage after store cleanup: {}", + storage.memory_usage() + ); +} From 818b349de578ba41e4c2ce77cb8a61a69059515e Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Wed, 10 Jun 2026 16:08:03 +0300 Subject: [PATCH 2/2] Mempool: test for hash tables' capacity shrinkage, some cleanup --- mempool/src/pool/tests/orphans.rs | 4 +- mempool/src/pool/tx_pool/collect_txs.rs | 2 +- mempool/src/pool/tx_pool/mod.rs | 18 +- mempool/src/pool/tx_pool/store/mod.rs | 33 +++- mempool/src/pool/tx_pool/tests/accumulator.rs | 5 +- mempool/src/pool/tx_pool/tests/basic.rs | 178 +++++++++++++++--- mempool/src/pool/tx_pool/tests/utils.rs | 32 +++- .../src/pool/tx_pool/tx_verifier/utxo_view.rs | 2 +- 8 files changed, 218 insertions(+), 56 deletions(-) diff --git a/mempool/src/pool/tests/orphans.rs b/mempool/src/pool/tests/orphans.rs index 1b1f4dcbf..f726531b2 100644 --- a/mempool/src/pool/tests/orphans.rs +++ b/mempool/src/pool/tests/orphans.rs @@ -258,9 +258,9 @@ async fn transaction_graph_subset_permutation(#[case] seed: Seed) { log::info!( "Stats: count {}, memory {}, encoded size {}", - mempool.tx_store().txs_by_id.len(), + mempool.tx_store().txs_by_id().len(), mempool.memory_usage(), - mempool.tx_store().txs_by_id.values().map(|e| e.size().get()).sum::(), + mempool.tx_store().txs_by_id().values().map(|e| e.size().get()).sum::(), ); // Check the final state of each transaction in the original sequence diff --git a/mempool/src/pool/tx_pool/collect_txs.rs b/mempool/src/pool/tx_pool/collect_txs.rs index e60627f32..2078c8e7c 100644 --- a/mempool/src/pool/tx_pool/collect_txs.rs +++ b/mempool/src/pool/tx_pool/collect_txs.rs @@ -133,7 +133,7 @@ pub fn collect_txs( // Transaction IDs taken from mempool to fill in the rest of the block let mempool_txids = { // Get transactions from mempool by score - let txids = mempool.store.txs_by_ancestor_score.iter().map(|x| &x.1).rev(); + let txids = mempool.store.txs_by_ancestor_score().iter().map(|x| &x.1).rev(); // Take the appropriate amount of them as determined by the packing strategy txids.take(match packing_strategy { PackingStrategy::FillSpaceFromMempool => usize::MAX, diff --git a/mempool/src/pool/tx_pool/mod.rs b/mempool/src/pool/tx_pool/mod.rs index 45f1184a9..9dbc5aea6 100644 --- a/mempool/src/pool/tx_pool/mod.rs +++ b/mempool/src/pool/tx_pool/mod.rs @@ -172,7 +172,7 @@ impl TxPool { pub fn get_all(&self) -> Vec { self.store - .txs_by_descendant_score + .txs_by_descendant_score() .iter() .map(|(_score, id)| self.store.get_entry(id).expect("entry").transaction().clone()) .collect() @@ -486,7 +486,7 @@ impl TxPool { conflicts_with_descendants: &StoreHashSet>, ) -> Result { let conflicts_with_descendants = conflicts_with_descendants.iter().map(|conflict_id| { - self.store.txs_by_id.get(conflict_id).expect("tx should exist in mempool") + self.store.txs_by_id().get(conflict_id).expect("tx should exist in mempool") }); let total_conflict_fees = conflicts_with_descendants @@ -626,7 +626,7 @@ impl TxPool { let expired_ids = self .store - .txs_by_creation_time + .txs_by_creation_time() .iter() // Note: entries in txs_by_creation_time are sorted by the creation time in ascending order, // so once we find a tx that is not expired, the rest will not be expired either. @@ -663,12 +663,12 @@ impl TxPool { // TODO sort by descendant score, not by fee let removed_id = self .store - .txs_by_descendant_score + .txs_by_descendant_score() .iter() .map(|(_score, entry)| *entry) .next() .expect("pool not empty"); - let removed = self.store.txs_by_id.get(&removed_id).expect("tx with id should exist"); + let removed = self.store.txs_by_id().get(&removed_id).expect("tx with id should exist"); log::debug!( "Mempool trim: Evicting tx {:x} which has a descendant score of {:?} and has size {}", @@ -953,8 +953,8 @@ impl TxPool { in_top_x_mb, &self.mempool_config, &self.rolling_fee_rate.read(), - &self.store.txs_by_descendant_score, - &self.store.txs_by_id, + self.store.txs_by_descendant_score(), + self.store.txs_by_id(), ) } @@ -998,8 +998,8 @@ impl TxPool { num_points, &self.mempool_config, &self.rolling_fee_rate.read(), - &self.store.txs_by_descendant_score, - &self.store.txs_by_id, + self.store.txs_by_descendant_score(), + self.store.txs_by_id(), ) } diff --git a/mempool/src/pool/tx_pool/store/mod.rs b/mempool/src/pool/tx_pool/store/mod.rs index 6922a4d81..4381589b3 100644 --- a/mempool/src/pool/tx_pool/store/mod.rs +++ b/mempool/src/pool/tx_pool/store/mod.rs @@ -124,7 +124,7 @@ pub struct MempoolStore { // and doesn't free the memory when an item is removed - it's only replaced with a tombstone. // Since TxMempoolEntry is relatively big (size_of = 350+ bytes), we'd waste a noticeable // amount of memory without boxing.) - pub txs_by_id: TrackedHashMap, Tracked, StrictDropPolicy>>, + txs_by_id: TrackedHashMap, Tracked, StrictDropPolicy>>, // Mempool entries sorted by descendant score. // We keep this index so that when the mempool grows full, we know which transactions are the @@ -136,22 +136,22 @@ pub struct MempoolStore { // max(fee/size of entry's tx, fee/size with all descendants). // TODO if we wish to follow Bitcoin Core, "size" is not simply the encoded size, but // rather a value that takes into account witness and sigop data (see CTxMemPoolEntry::GetTxSize). - pub txs_by_descendant_score: TrackedTxIdMultiMap, + txs_by_descendant_score: TrackedTxIdMultiMap, // Mempool entries sorted by ancestor score. // This is used to select the most economically attractive transactions for block production. // The ancestor score of an entry is defined as // min(fee/size of entry's tx, fee/size with all ancestors). - pub txs_by_ancestor_score: TrackedTxIdMultiMap, + txs_by_ancestor_score: TrackedTxIdMultiMap, // Entries that have remained in the mempool for a long time (see DEFAULT_MEMPOOL_EXPIRY) are // evicted. To efficiently know which entries to evict, we store the mempool entries sorted by // their creation time, from earliest to latest. - pub txs_by_creation_time: TrackedTxIdMultiMap