Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mempool/src/pool/tests/orphans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ async fn transaction_graph_subset_permutation(#[case] seed: Seed) {

log::info!(
"Stats: count {}, memory {}, encoded size {}",
mempool.tx_store().txs_by_id.len(),
mempool.tx_store().txs_by_id().len(),
mempool.memory_usage(),
mempool.tx_store().txs_by_id.values().map(|e| e.size().get()).sum::<usize>(),
mempool.tx_store().txs_by_id().values().map(|e| e.size().get()).sum::<usize>(),
);

// Check the final state of each transaction in the original sequence
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/pool/tx_pool/collect_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub fn collect_txs<M>(
// Transaction IDs taken from mempool to fill in the rest of the block
let mempool_txids = {
// Get transactions from mempool by score
let txids = mempool.store.txs_by_ancestor_score.iter().map(|x| &x.1).rev();
let txids = mempool.store.txs_by_ancestor_score().iter().map(|x| &x.1).rev();
// Take the appropriate amount of them as determined by the packing strategy
txids.take(match packing_strategy {
PackingStrategy::FillSpaceFromMempool => usize::MAX,
Expand Down
27 changes: 17 additions & 10 deletions mempool/src/pool/tx_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<M> TxPool<M> {

pub fn get_all(&self) -> Vec<SignedTransaction> {
self.store
.txs_by_descendant_score
.txs_by_descendant_score()
.iter()
.map(|(_score, id)| self.store.get_entry(id).expect("entry").transaction().clone())
.collect()
Expand Down Expand Up @@ -486,7 +486,7 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
conflicts_with_descendants: &StoreHashSet<Id<Transaction>>,
) -> Result<Fee, MempoolPolicyError> {
let conflicts_with_descendants = conflicts_with_descendants.iter().map(|conflict_id| {
self.store.txs_by_id.get(conflict_id).expect("tx should exist in mempool")
self.store.txs_by_id().get(conflict_id).expect("tx should exist in mempool")
});

let total_conflict_fees = conflicts_with_descendants
Expand Down Expand Up @@ -626,7 +626,7 @@ impl<M: MemoryUsageEstimator> TxPool<M> {

let expired_ids = self
.store
.txs_by_creation_time
.txs_by_creation_time()
.iter()
// Note: entries in txs_by_creation_time are sorted by the creation time in ascending order,
// so once we find a tx that is not expired, the rest will not be expired either.
Expand All @@ -653,16 +653,22 @@ impl<M: MemoryUsageEstimator> TxPool<M> {

fn trim(&mut self) -> Result<Vec<FeeRate>, MempoolPolicyError> {
let mut removed_fees = Vec::new();
while !self.store.is_empty() && self.memory_usage() > self.max_size.as_bytes() {
loop {
self.store.shrink_capacity_if_needed();

if self.store.is_empty() || self.memory_usage() <= self.max_size.as_bytes() {
break;
}

// TODO sort by descendant score, not by fee
let removed_id = self
.store
.txs_by_descendant_score
.txs_by_descendant_score()
.iter()
.map(|(_score, entry)| *entry)
.next()
.expect("pool not empty");
let removed = self.store.txs_by_id.get(&removed_id).expect("tx with id should exist");
let removed = self.store.txs_by_id().get(&removed_id).expect("tx with id should exist");

log::debug!(
"Mempool trim: Evicting tx {:x} which has a descendant score of {:?} and has size {}",
Expand All @@ -673,6 +679,7 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
removed_fees.push(FeeRate::from_total_tx_fee(removed.fee(), removed.size())?);
self.remove_tx_and_descendants(&removed_id, MempoolRemovalReason::SizeLimit);
}

Ok(removed_fees)
}

Expand Down Expand Up @@ -946,8 +953,8 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
in_top_x_mb,
&self.mempool_config,
&self.rolling_fee_rate.read(),
&self.store.txs_by_descendant_score,
&self.store.txs_by_id,
self.store.txs_by_descendant_score(),
self.store.txs_by_id(),
)
}

Expand Down Expand Up @@ -991,8 +998,8 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
num_points,
&self.mempool_config,
&self.rolling_fee_rate.read(),
&self.store.txs_by_descendant_score,
&self.store.txs_by_id,
self.store.txs_by_descendant_score(),
self.store.txs_by_id(),
)
}

Expand Down
112 changes: 107 additions & 5 deletions mempool/src/pool/tx_pool/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub struct MempoolStore {
// and doesn't free the memory when an item is removed - it's only replaced with a tombstone.
// Since TxMempoolEntry is relatively big (size_of = 350+ bytes), we'd waste a noticeable
// amount of memory without boxing.)
pub txs_by_id: TrackedHashMap<Id<Transaction>, Tracked<Box<TxMempoolEntry>, StrictDropPolicy>>,
txs_by_id: TrackedHashMap<Id<Transaction>, Tracked<Box<TxMempoolEntry>, StrictDropPolicy>>,

// Mempool entries sorted by descendant score.
// We keep this index so that when the mempool grows full, we know which transactions are the
Expand All @@ -136,22 +136,22 @@ pub struct MempoolStore {
// max(fee/size of entry's tx, fee/size with all descendants).
// TODO if we wish to follow Bitcoin Core, "size" is not simply the encoded size, but
// rather a value that takes into account witness and sigop data (see CTxMemPoolEntry::GetTxSize).
pub txs_by_descendant_score: TrackedTxIdMultiMap<DescendantScore>,
txs_by_descendant_score: TrackedTxIdMultiMap<DescendantScore>,

// Mempool entries sorted by ancestor score.
// This is used to select the most economically attractive transactions for block production.
// The ancestor score of an entry is defined as
// min(fee/size of entry's tx, fee/size with all ancestors).
pub txs_by_ancestor_score: TrackedTxIdMultiMap<AncestorScore>,
txs_by_ancestor_score: TrackedTxIdMultiMap<AncestorScore>,

// Entries that have remained in the mempool for a long time (see DEFAULT_MEMPOOL_EXPIRY) are
// evicted. To efficiently know which entries to evict, we store the mempool entries sorted by
// their creation time, from earliest to latest.
pub txs_by_creation_time: TrackedTxIdMultiMap<Time>,
txs_by_creation_time: TrackedTxIdMultiMap<Time>,

// We keep the information of which inputs are spent by entries currently in the mempool.
// This allows us to recognize conflicts (double-spends) and handle them
pub spender_txs: Tracked<BTreeMap<TxDependency, Id<Transaction>>>,
spender_txs: Tracked<BTreeMap<TxDependency, Id<Transaction>>>,

// Track transactions by internal unique sequence number. This is used to recover the order in
// which the transactions have been inserted into the mempool, so they can be re-inserted in
Expand Down Expand Up @@ -243,6 +243,29 @@ impl MempoolStore {
self.mem_tracker.get_usage()
}

pub fn txs_by_id(
&self,
) -> &TrackedHashMap<Id<Transaction>, Tracked<Box<TxMempoolEntry>, StrictDropPolicy>> {
&self.txs_by_id
}

pub fn txs_by_descendant_score(&self) -> &TrackedTxIdMultiMap<DescendantScore> {
&self.txs_by_descendant_score
}

pub fn txs_by_ancestor_score(&self) -> &TrackedTxIdMultiMap<AncestorScore> {
&self.txs_by_ancestor_score
}

pub fn txs_by_creation_time(&self) -> &TrackedTxIdMultiMap<Time> {
&self.txs_by_creation_time
}

#[cfg(test)]
pub fn seq_nos_by_tx(&self) -> &TrackedHashMap<Id<Transaction>, usize> {
&self.seq_nos_by_tx
}

pub fn assert_valid(&self) {
#[cfg(test)]
self.assert_valid_inner()
Expand Down Expand Up @@ -663,8 +686,87 @@ impl MempoolStore {
let entry = self.get_existing_entry(tx_id)?;
entry.collect_cluster(self)
}

/// For internal containers that have capacity, check if the capacity is excessive; shrink
/// the container if it is.
pub fn shrink_capacity_if_needed(&mut self) {
// Note:
// * Hashbrown tables never shrink their capacity automatically.
// * According to the pseudo-test `estimate_max_tx_count_in_store`, the store with the default
// size of 300Mb can fit over 230'000 txs of the smallest possible size. Due to how hashbrown
// tables work (1/8 of all buckets should always be empty, and reallocation doubles the number
// of buckets), `txs_by_id` and `seq_nos_by_tx` may end up with more than 500'000 buckets each.
// Given that the bucket size in each table is 40 bytes (in non-test builds), this results in
// roughly 20Mb of allocated memory per table, which will not go down even if the tables'
// element counts become zero. Since table's entire allocation_size counts towards the
// mempool size, this will effectively reduce the max mempool size by 40Mb.
// * On the other hand, the mempool re-creates its store completely every time a new block
// arrives, so the situation described above can only exist for a few minutes. Still,
// it's better for the store not to depend on such a behavior of its owner code and
// manage the capacities explicitly.

// Implementation notes:
// * table's `capacity` doesn't count the tombstones, so in a degenerate case like the one
// described above it's possible to have a table with a huge allocation size and small
// capacity. So below we don't use capacity when deciding whether to shrink, and estimate
// (roughly) the number of buckets instead.
// * even though `shrink_to` accepts capacity, it'll compare the estimated number of buckets
// (from the passed capacity) with the current one and reallocate/rehash the table if the
// latter is bigger.

fn maybe_shrink<K, V>(
table: &mut TrackedHashMap<K, V>,
mem_tracker: &mut MemUsageTracker,
table_name: &str,
) where
K: Eq + std::hash::Hash,
{
let bucket_size = hash_map_bucket_size(table);
let bucket_count = hash_map_bucket_count_upper_bound(table);

let max_bucket_count = table.len() * HASH_TABLE_MAX_BUCKET_COUNT_FACTOR;
let adjusted_capacity = table.len() * HASH_TABLE_ADJUSTED_CAPACITY_FACTOR;

if bucket_count > max_bucket_count {
let potentially_reclaimable_mem_size =
(bucket_count - adjusted_capacity) * bucket_size;

// Only bother shrinking if the win is noticeable.
if potentially_reclaimable_mem_size >= HASH_TABLE_MIN_RECLAIMABLE_MEM_SIZE {
log::debug!("Shrinking {table_name} to {adjusted_capacity}");
mem_tracker.modify(table, |table, _| table.shrink_to(adjusted_capacity));
}
}
}

maybe_shrink(&mut self.txs_by_id, &mut self.mem_tracker, "txs_by_id");
maybe_shrink(
&mut self.seq_nos_by_tx,
&mut self.mem_tracker,
"seq_nos_by_tx",
);
}
}

pub fn hash_map_bucket_size<K, V>(_: &StoreHashMap<K, V>) -> usize {
std::mem::size_of::<(K, V)>()
}

// Return the upper bound for the number of buckets in the map.
pub fn hash_map_bucket_count_upper_bound<K, V>(map: &StoreHashMap<K, V>) -> usize
where
K: Eq + std::hash::Hash,
{
// Note: the actual number of buckets will be smaller than this, because `allocation_size` also
// includes control bytes and padding.
map.allocation_size() / hash_map_bucket_size(map)
}

// Constants that determine whether store's hash tables should be shrunk and, if yes, to what capacity.
pub const HASH_TABLE_MAX_BUCKET_COUNT_FACTOR: usize = 5;
pub const HASH_TABLE_ADJUSTED_CAPACITY_FACTOR: usize = 2;
pub const HASH_TABLE_MIN_RECLAIMABLE_MEM_SIZE: usize = 10_000;

#[cfg(test)]
impl Drop for MempoolStore {
fn drop(&mut self) {
Expand Down
5 changes: 4 additions & 1 deletion mempool/src/pool/tx_pool/tests/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ async fn collect_transactions(#[case] seed: Seed) -> anyhow::Result<()> {
.unwrap();
let collected_txs = returned_accumulator.unwrap();
let collected_txs = collected_txs.transactions();
log::debug!("ancestor index: {:?}", mempool.store.txs_by_ancestor_score);
log::debug!(
"ancestor index: {:?}",
mempool.store.txs_by_ancestor_score()
);
let expected_num_txs_collected = 6;
assert_eq!(collected_txs.len(), expected_num_txs_collected);
let total_tx_size: usize = collected_txs.iter().map(|tx| tx.encoded_size()).sum();
Expand Down
Loading
Loading