-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol_attack_sim.py
More file actions
2597 lines (2350 loc) · 108 KB
/
Copy pathprotocol_attack_sim.py
File metadata and controls
2597 lines (2350 loc) · 108 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
"""
Live-protocol-faithful adversarial simulator for the finalized checkpoint path.
This models the live rules documented in:
- docs/spec/CHECKPOINT_DERIVATION_SPEC.md
- docs/spec/AVAILABILITY_STATE_COMPLETENESS.md
- docs/LIVE_PROTOCOL.md
Scope:
- epoch-boundary checkpoint derivation
- finalized-history-driven validator lifecycle at epoch granularity
- operator-native committee derivation
- BPoAR-gated eligibility with live fallback/hysteresis semantics
- deterministic committee selection and proposer ordering
Explicit abstractions:
- validator warmup/cooldown are projected from live block counts into epoch lags
- availability state transitions are driven by deterministic scenario plans rather
than raw retained-prefix/audit-response bytes
- deterministic ticket search is modeled with the live v2 tag and bounded nonce
range, but not by reusing live C++ types directly
"""
from __future__ import annotations
import argparse
import csv
import hashlib
import json
import math
import sys
from dataclasses import asdict, dataclass, field, replace
from functools import cmp_to_key
from pathlib import Path
from typing import Any, Callable, Iterable, Mapping, Sequence
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from scripts.sybil_model import (
BASE_UNITS_PER_COIN,
DEFAULT_TICKET_DIFFICULTY_BITS,
effective_weight as sqrt_effective_weight,
)
DEFAULT_EPOCH_SIZE = 32
DEFAULT_WARMUP_BLOCKS = 100
DEFAULT_COOLDOWN_BLOCKS = 100
DEFAULT_MAX_EFFECTIVE_BOND_MULTIPLE = 10
DEFAULT_TICKET_BONUS_CAP_BPS = 1_000
DEFAULT_MIN_BOND_COINS = 100.0
DEFAULT_AVAILABILITY_MIN_BOND_COINS = 100.0
DEFAULT_AVAILABILITY_MIN_ELIGIBLE = 3
DEFAULT_COMMITTEE_SIZE = 4
DEFAULT_EPOCHS = 8
DEFAULT_NONCE_LIMIT = 4096
CHECKPOINT_FIXTURE_ROOT = Path(__file__).resolve().parents[1] / "tests" / "fixtures" / "checkpoint"
MODE_NORMAL = "NORMAL"
MODE_FALLBACK = "FALLBACK"
REASON_NONE = "NONE"
REASON_INSUFFICIENT = "INSUFFICIENT_ELIGIBLE_OPERATORS"
REASON_STICKY = "HYSTERESIS_RECOVERY_PENDING"
STATUS_WARMUP = "WARMUP"
STATUS_ACTIVE = "ACTIVE"
STATUS_PROBATION = "PROBATION"
STATUS_EJECTED = "EJECTED"
VALID_AVAILABILITY_STATUSES = {STATUS_WARMUP, STATUS_ACTIVE, STATUS_PROBATION, STATUS_EJECTED}
VALID_JOIN_SOURCES = {"GENESIS", "POST_GENESIS"}
@dataclass(frozen=True)
class RankedCandidate:
pubkey: bytes
selection_id: bytes
bonded_amount: int
capped_bonded_amount: int
effective_weight: int
ticket_work_hash: bytes
ticket_nonce: int
ticket_bonus_bps: int
ticket_bonus_cap_bps: int
actor_id: str
adversarial: bool
validator_count: int
def sha256d(data: bytes) -> bytes:
return hashlib.sha256(hashlib.sha256(data).digest()).digest()
def coins_to_units(coins: float) -> int:
return max(0, int(round(coins * BASE_UNITS_PER_COIN)))
def units_to_coins(units: int) -> float:
return float(units) / float(BASE_UNITS_PER_COIN)
def ceil_div(a: int, b: int) -> int:
return (a + b - 1) // max(1, b)
def percent(value: float) -> float:
return round(value * 100.0, 4)
def clamp(value: float, lo: float, hi: float) -> float:
return min(hi, max(lo, value))
@dataclass(frozen=True)
class ProtocolParameters:
epoch_size: int = DEFAULT_EPOCH_SIZE
committee_size: int = DEFAULT_COMMITTEE_SIZE
min_eligible: int = DEFAULT_AVAILABILITY_MIN_ELIGIBLE
dynamic_min_bond_coins: float = DEFAULT_MIN_BOND_COINS
availability_min_bond_coins: float = DEFAULT_AVAILABILITY_MIN_BOND_COINS
validator_warmup_blocks: int = DEFAULT_WARMUP_BLOCKS
validator_cooldown_blocks: int = DEFAULT_COOLDOWN_BLOCKS
max_effective_bond_multiple: int = DEFAULT_MAX_EFFECTIVE_BOND_MULTIPLE
ticket_bonus_cap_bps: int = DEFAULT_TICKET_BONUS_CAP_BPS
ticket_difficulty_bits: int = DEFAULT_TICKET_DIFFICULTY_BITS
ticket_nonce_limit: int = DEFAULT_NONCE_LIMIT
def validate(self) -> None:
if self.epoch_size <= 0:
raise ValueError("epoch_size must be positive")
if self.committee_size <= 0:
raise ValueError("committee_size must be positive")
if self.min_eligible <= 0:
raise ValueError("min_eligible must be positive")
if self.dynamic_min_bond_coins <= 0:
raise ValueError("dynamic_min_bond_coins must be positive")
if self.availability_min_bond_coins <= 0:
raise ValueError("availability_min_bond_coins must be positive")
if self.validator_warmup_blocks < 0 or self.validator_cooldown_blocks < 0:
raise ValueError("warmup/cooldown blocks must be non-negative")
if self.max_effective_bond_multiple <= 0:
raise ValueError("max_effective_bond_multiple must be positive")
if self.ticket_bonus_cap_bps < 0:
raise ValueError("ticket_bonus_cap_bps must be non-negative")
if self.ticket_difficulty_bits < 0:
raise ValueError("ticket_difficulty_bits must be non-negative")
if self.ticket_nonce_limit <= 0:
raise ValueError("ticket_nonce_limit must be positive")
@property
def dynamic_min_bond_units(self) -> int:
return coins_to_units(self.dynamic_min_bond_coins)
@property
def availability_min_bond_units(self) -> int:
return coins_to_units(self.availability_min_bond_coins)
@property
def warmup_epochs(self) -> int:
return ceil_div(self.validator_warmup_blocks, self.epoch_size)
@property
def cooldown_epochs(self) -> int:
return ceil_div(self.validator_cooldown_blocks, self.epoch_size)
@property
def effective_bond_cap_units(self) -> int:
return self.dynamic_min_bond_units * self.max_effective_bond_multiple
@dataclass(frozen=True)
class ActorSpec:
actor_id: str
adversarial: bool = False
def validate(self) -> None:
if not self.actor_id:
raise ValueError("actor_id must be non-empty")
@dataclass(frozen=True)
class ValidatorSpec:
validator_id: str
operator_id: str
actor_id: str
bond_coins: float
join_epoch: int = 0
exit_epoch: int | None = None
join_height: int | None = None
exit_height: int | None = None
join_source: str = "GENESIS"
def validate(self) -> None:
if not self.validator_id:
raise ValueError("validator_id must be non-empty")
if not self.operator_id:
raise ValueError("operator_id must be non-empty")
if not self.actor_id:
raise ValueError("actor_id must be non-empty")
if self.bond_coins <= 0:
raise ValueError("validator bond must be positive")
if self.join_epoch < 0:
raise ValueError("join_epoch must be non-negative")
if self.exit_epoch is not None and self.exit_epoch < self.join_epoch:
raise ValueError("exit_epoch cannot be earlier than join_epoch")
if self.join_height is not None and self.join_height < 0:
raise ValueError("join_height must be non-negative")
if self.exit_height is not None and self.exit_height < 0:
raise ValueError("exit_height must be non-negative")
if self.join_height is not None and self.join_source == "GENESIS" and self.join_height != 0:
raise ValueError("genesis validators must have join_height 0 when provided")
if self.exit_height is not None and self.join_height is not None and self.exit_height < self.join_height:
raise ValueError("exit_height cannot be earlier than join_height")
if self.join_source not in VALID_JOIN_SOURCES:
raise ValueError(f"unsupported join_source: {self.join_source}")
@dataclass(frozen=True)
class OperatorSpec:
operator_id: str
actor_id: str
default_status: str = STATUS_ACTIVE
status_by_epoch: dict[int, str] = field(default_factory=dict)
score_ok_by_epoch: dict[int, bool] = field(default_factory=dict)
bond_ok_by_epoch: dict[int, bool] = field(default_factory=dict)
def validate(self) -> None:
if not self.operator_id:
raise ValueError("operator_id must be non-empty")
if not self.actor_id:
raise ValueError("actor_id must be non-empty")
if self.default_status not in VALID_AVAILABILITY_STATUSES:
raise ValueError(f"unsupported default_status: {self.default_status}")
for epoch, status in self.status_by_epoch.items():
if epoch <= 0:
raise ValueError("status_by_epoch keys must be positive epochs")
if status not in VALID_AVAILABILITY_STATUSES:
raise ValueError(f"unsupported status value: {status}")
for epoch in self.score_ok_by_epoch:
if epoch <= 0:
raise ValueError("score_ok_by_epoch keys must be positive epochs")
for epoch in self.bond_ok_by_epoch:
if epoch <= 0:
raise ValueError("bond_ok_by_epoch keys must be positive epochs")
@dataclass(frozen=True)
class SimulationScenario:
name: str
description: str
epochs: int
protocol: ProtocolParameters
actors: tuple[ActorSpec, ...]
operators: tuple[OperatorSpec, ...]
validators: tuple[ValidatorSpec, ...]
seed_label: str = "finalis-protocol-sim"
strategy_family: str = "baseline"
baseline_scenario: str | None = None
hypothetical_knobs: tuple[str, ...] = ()
threshold_controls: dict[str, Any] = field(default_factory=dict)
def validate(self) -> None:
if not self.name:
raise ValueError("scenario name must be non-empty")
if self.epochs <= 0:
raise ValueError("epochs must be positive")
self.protocol.validate()
actor_ids = {actor.actor_id for actor in self.actors}
if len(actor_ids) != len(self.actors):
raise ValueError("duplicate actor_id")
operator_ids = {operator.operator_id for operator in self.operators}
if len(operator_ids) != len(self.operators):
raise ValueError("duplicate operator_id")
validator_ids = {validator.validator_id for validator in self.validators}
if len(validator_ids) != len(self.validators):
raise ValueError("duplicate validator_id")
for actor in self.actors:
actor.validate()
for operator in self.operators:
operator.validate()
if operator.actor_id not in actor_ids:
raise ValueError(f"operator {operator.operator_id} references unknown actor {operator.actor_id}")
for validator in self.validators:
validator.validate()
if validator.actor_id not in actor_ids:
raise ValueError(f"validator {validator.validator_id} references unknown actor {validator.actor_id}")
if validator.operator_id not in operator_ids:
raise ValueError(f"validator {validator.validator_id} references unknown operator {validator.operator_id}")
@dataclass(frozen=True)
class EpochOperatorResult:
operator_id: str
actor_id: str
adversarial: bool
bonded_units: int
capped_bonded_units: int
effective_weight: int
validator_count: int
base_eligible_validator_count: int
availability_status: str
availability_score_ok: bool
availability_bond_ok: bool
availability_eligible: bool
selected: bool
proposer: bool
@dataclass(frozen=True)
class EpochResult:
epoch: int
derivation_mode: str
fallback_reason: str
fallback_sticky: bool
eligible_operator_count: int
min_eligible_operators: int
committee_operator_ids: tuple[str, ...]
proposer_operator_id: str | None
coalition_committee_share: float
coalition_proposer_share: float
committee_hhi: float
committee_top1_share: float
committee_top3_share: float
operators: tuple[EpochOperatorResult, ...]
join_activations: tuple[str, ...]
exits_removed: tuple[str, ...]
@dataclass(frozen=True)
class ScenarioSummary:
scenario: str
strategy_family: str
baseline_scenario: str | None
epochs: int
protocol: dict[str, Any]
coalition_bond_share: float
coalition_operator_share: float
average_coalition_committee_share: float
proposer_share: float
committee_share_delta_vs_bond_share: float
split_amplification_ratio: float
fallback_epochs: int
fallback_rate: float
sticky_fallback_epochs: int
sticky_fallback_rate: float
fallback_entry_count: int
average_fallback_duration: float
max_fallback_duration: int
average_recovery_time: float
average_hhi: float
average_top1_share: float
average_top3_share: float
max_operator_committee_share: float
activation_latency_epochs: dict[str, int]
average_activation_latency: float
eligibility_churn_events: int
epochs_at_exact_threshold: int
epochs_below_threshold: int
epochs_at_recovery_threshold: int
sticky_fallback_entry_count: int
recovery_from_sticky_count: int
marginal_operator_committee_share: float
marginal_operator_eligibility_churn: int
operators_filtered_by_bond_floor: int
operators_filtered_by_availability: int
bond_threshold_binding_rate: float
warmup_blocking_rate: float
cooldown_blocking_rate: float
per_epoch: list[dict[str, Any]]
@dataclass(frozen=True)
class CandidateProfile:
name: str
committee_size: int
min_eligible: int
dynamic_min_bond_coins: float
availability_min_bond_coins: float
validator_warmup_blocks: int
validator_cooldown_blocks: int
def protocol(self) -> ProtocolParameters:
return ProtocolParameters(
committee_size=self.committee_size,
min_eligible=self.min_eligible,
dynamic_min_bond_coins=self.dynamic_min_bond_coins,
availability_min_bond_coins=self.availability_min_bond_coins,
validator_warmup_blocks=self.validator_warmup_blocks,
validator_cooldown_blocks=self.validator_cooldown_blocks,
)
def actor_id_bytes(actor_id: str) -> bytes:
return sha256d(f"actor:{actor_id}".encode("utf-8"))
def operator_id_bytes(operator_id: str) -> bytes:
return sha256d(f"operator:{operator_id}".encode("utf-8"))
def validator_pubkey_bytes(validator_id: str) -> bytes:
return sha256d(f"validator:{validator_id}".encode("utf-8"))
def epoch_seed(seed_label: str, epoch: int) -> bytes:
return sha256d(f"epoch-seed:{seed_label}:{epoch}".encode("utf-8"))
def u64le(value: int) -> bytes:
return int(value).to_bytes(8, "little", signed=False)
def varbytes(data: bytes) -> bytes:
length = len(data)
if length < 0xFD:
return bytes([length]) + data
if length <= 0xFFFF:
return b"\xFD" + length.to_bytes(2, "little") + data
if length <= 0xFFFFFFFF:
return b"\xFE" + length.to_bytes(4, "little") + data
return b"\xFF" + length.to_bytes(8, "little") + data
def leading_zero_bits(hash_bytes: bytes) -> int:
zeros = 0
for byte in hash_bytes:
if byte == 0:
zeros += 8
continue
for bit in range(7, -1, -1):
if ((byte >> bit) & 1) == 0:
zeros += 1
else:
return zeros
return zeros
def ticket_pow_bonus_bps_from_zero_bits(zero_bits: int, difficulty_bits: int, cap_bps: int) -> int:
if zero_bits < difficulty_bits:
return 0
surplus = zero_bits - difficulty_bits
smooth = math.isqrt(max(0, surplus + 1))
return min(cap_bps, 500 + smooth * 400)
def best_operator_ticket(operator_id: str, epoch: int, params: ProtocolParameters, seed_label: str) -> tuple[bytes, int, int]:
operator_bytes = operator_id_bytes(operator_id)
anchor = epoch_seed(seed_label, epoch)
best_hash: bytes | None = None
best_nonce = 0
for nonce in range(params.ticket_nonce_limit):
payload = (
b"SC-EPOCH-TICKET-V2"
+ u64le(epoch)
+ anchor
+ operator_bytes
+ u64le(nonce)
)
work_hash = sha256d(payload)
if best_hash is None or work_hash < best_hash:
best_hash = work_hash
best_nonce = nonce
assert best_hash is not None
zero_bits = leading_zero_bits(best_hash)
bonus = ticket_pow_bonus_bps_from_zero_bits(zero_bits, params.ticket_difficulty_bits, params.ticket_bonus_cap_bps)
return best_hash, best_nonce, bonus
def representative_validator(validators: Sequence[ValidatorSpec]) -> ValidatorSpec:
return sorted(validators, key=lambda item: (validator_pubkey_bytes(item.validator_id), item.validator_id))[0]
def availability_status_for_epoch(spec: OperatorSpec, epoch: int) -> str:
return spec.status_by_epoch.get(epoch, spec.default_status)
def availability_score_ok_for_epoch(spec: OperatorSpec, epoch: int) -> bool:
return spec.score_ok_by_epoch.get(epoch, True)
def epoch_start_height(epoch: int, params: ProtocolParameters) -> int:
return ((epoch - 1) * params.epoch_size) + 1
def lifecycle_active_for_epoch(validator: ValidatorSpec, epoch: int, params: ProtocolParameters) -> bool:
boundary_height = epoch_start_height(epoch, params)
if validator.join_height is not None or validator.exit_height is not None:
joined_height = 0 if validator.join_source == "GENESIS" else (validator.join_height or epoch_start_height(validator.join_epoch, params))
activation_height = 1 if validator.join_source == "GENESIS" else joined_height + params.validator_warmup_blocks
if boundary_height < activation_height:
return False
if validator.exit_height is None:
return True
removal_height = validator.exit_height + params.validator_cooldown_blocks
return boundary_height < removal_height
warmup_epochs = params.warmup_epochs
cooldown_epochs = params.cooldown_epochs
activation_epoch = 1 if validator.join_source == "GENESIS" else validator.join_epoch + warmup_epochs
if epoch < activation_epoch:
return False
if validator.exit_epoch is None:
return True
removal_epoch = validator.exit_epoch + cooldown_epochs
return epoch < removal_epoch
def base_eligible_for_epoch(validator: ValidatorSpec, epoch: int, params: ProtocolParameters) -> bool:
if not lifecycle_active_for_epoch(validator, epoch, params):
return False
bonded_units = coins_to_units(validator.bond_coins)
if validator.join_source == "GENESIS":
return bonded_units > 0
return bonded_units >= params.dynamic_min_bond_units
def availability_eligible(status: str, score_ok: bool, bond_ok: bool) -> bool:
return status == STATUS_ACTIVE and score_ok and bond_ok
def derive_mode_reason(previous_mode: str, eligible_count: int, min_eligible: int) -> tuple[str, str]:
if previous_mode == MODE_NORMAL:
if eligible_count < min_eligible:
return MODE_FALLBACK, REASON_INSUFFICIENT
return MODE_NORMAL, REASON_NONE
if eligible_count >= min_eligible + 1:
return MODE_NORMAL, REASON_NONE
if eligible_count == min_eligible:
return MODE_FALLBACK, REASON_STICKY
return MODE_FALLBACK, REASON_INSUFFICIENT
def candidate_for_operator(
operator_id: str,
validators: Sequence[ValidatorSpec],
actor_map: Mapping[str, ActorSpec],
epoch: int,
params: ProtocolParameters,
seed_label: str,
) -> RankedCandidate:
rep = representative_validator(validators)
total_bond_units = sum(coins_to_units(v.bond_coins) for v in validators)
capped_bond_units = min(total_bond_units, params.effective_bond_cap_units)
ticket_hash, ticket_nonce, bonus_bps = best_operator_ticket(operator_id, epoch, params, seed_label)
actor = actor_map[rep.actor_id]
return RankedCandidate(
pubkey=validator_pubkey_bytes(rep.validator_id),
selection_id=operator_id_bytes(operator_id),
bonded_amount=total_bond_units,
capped_bonded_amount=capped_bond_units,
effective_weight=sqrt_effective_weight(capped_bond_units),
ticket_work_hash=ticket_hash,
ticket_nonce=ticket_nonce,
ticket_bonus_bps=bonus_bps,
ticket_bonus_cap_bps=params.ticket_bonus_cap_bps,
actor_id=rep.actor_id,
adversarial=actor.adversarial,
validator_count=len(validators),
)
def finalized_committee_candidate_hash(seed: bytes, selection_id: bytes) -> bytes:
return sha256d(b"SC-COMMITTEE-V3" + seed + selection_id)
def hash64_prefix(hash_bytes: bytes) -> int:
return int.from_bytes(hash_bytes[:8], "big")
def finalized_committee_candidate_strength(candidate: RankedCandidate) -> int:
base_weight = candidate.effective_weight if candidate.effective_weight else sqrt_effective_weight(candidate.bonded_amount)
bounded_bonus = min(candidate.ticket_bonus_bps, candidate.ticket_bonus_cap_bps)
bonded_coins = max(1, candidate.bonded_amount // BASE_UNITS_PER_COIN)
bonus_scale = 1 + math.isqrt(bonded_coins)
adjusted_bonus = bounded_bonus // bonus_scale
return max(1, base_weight * (10_000 + adjusted_bonus))
def compare_ranked_candidates(a: RankedCandidate, b: RankedCandidate, seed: bytes) -> int:
a_selection_id = a.selection_id if a.selection_id else a.pubkey
b_selection_id = b.selection_id if b.selection_id else b.pubkey
a_hash = finalized_committee_candidate_hash(seed, a_selection_id)
b_hash = finalized_committee_candidate_hash(seed, b_selection_id)
a_hash64 = hash64_prefix(a_hash)
b_hash64 = hash64_prefix(b_hash)
lhs = a_hash64 * finalized_committee_candidate_strength(b)
rhs = b_hash64 * finalized_committee_candidate_strength(a)
if lhs != rhs:
return -1 if lhs < rhs else 1
if a_hash != b_hash:
return -1 if a_hash < b_hash else 1
if a_selection_id != b_selection_id:
return -1 if a_selection_id < b_selection_id else 1
if a.pubkey != b.pubkey:
return -1 if a.pubkey < b.pubkey else 1
if a.effective_weight != b.effective_weight:
return -1 if a.effective_weight > b.effective_weight else 1
if a.capped_bonded_amount != b.capped_bonded_amount:
return -1 if a.capped_bonded_amount > b.capped_bonded_amount else 1
if a.bonded_amount != b.bonded_amount:
return -1 if a.bonded_amount > b.bonded_amount else 1
if a.ticket_bonus_bps != b.ticket_bonus_bps:
return -1 if a.ticket_bonus_bps > b.ticket_bonus_bps else 1
if a.ticket_work_hash != b.ticket_work_hash:
return -1 if a.ticket_work_hash < b.ticket_work_hash else 1
if a.ticket_nonce != b.ticket_nonce:
return -1 if a.ticket_nonce < b.ticket_nonce else 1
if a.ticket_bonus_cap_bps != b.ticket_bonus_cap_bps:
return -1 if a.ticket_bonus_cap_bps < b.ticket_bonus_cap_bps else 1
return -1 if a.selection_id < b.selection_id else (1 if a.selection_id > b.selection_id else 0)
def rank_finalized_committee_candidates(candidates: Sequence[RankedCandidate], seed: bytes) -> list[RankedCandidate]:
return sorted(candidates, key=cmp_to_key(lambda a, b: compare_ranked_candidates(a, b, seed)))
def select_finalized_committee(candidates: Sequence[RankedCandidate], seed: bytes, committee_size: int) -> list[RankedCandidate]:
ranked = rank_finalized_committee_candidates(candidates, seed)
return ranked[: min(committee_size, len(ranked))]
def compute_committee_root(committee: Sequence[RankedCandidate]) -> bytes:
parts = []
for entry in committee:
parts.append(varbytes(entry.pubkey + entry.ticket_work_hash + u64le(entry.ticket_nonce)))
return sha256d(b"SELFCOIN_COMMITTEE_V1" + b"".join(parts))
def compute_proposer_seed(epoch_anchor: bytes, height: int, committee_root: bytes) -> bytes:
return sha256d(b"SELFCOIN_PROPOSER_V1" + epoch_anchor + u64le(height) + committee_root)
def proposer_schedule(committee: Sequence[RankedCandidate], epoch_anchor: bytes, height: int) -> list[RankedCandidate]:
proposer_seed = compute_proposer_seed(epoch_anchor, height, compute_committee_root(committee))
ranked = sorted(
((sha256d(proposer_seed + entry.pubkey), entry) for entry in committee),
key=lambda item: (item[0], item[1].pubkey),
)
return [entry for _, entry in ranked]
def proposer_share_by_actor(committee: Sequence[RankedCandidate], proposer: RankedCandidate | None, candidate_actor: Mapping[bytes, str]) -> dict[str, float]:
if not committee:
return {}
quorum = (2 * len(committee)) // 3 + 1
signers = sorted(committee, key=lambda c: c.pubkey)[:quorum]
scores: dict[str, int] = {}
for signer in signers:
actor_id = candidate_actor[signer.pubkey]
scores[actor_id] = scores.get(actor_id, 0) + signer.effective_weight
if proposer is not None:
actor_id = candidate_actor[proposer.pubkey]
scores[actor_id] = scores.get(actor_id, 0) + proposer.effective_weight
total = sum(scores.values())
if total == 0:
return {}
return {actor_id: score / total for actor_id, score in scores.items()}
def concentration_hhi(shares: Iterable[float]) -> float:
return sum(share * share for share in shares)
def top_k_share(shares: Iterable[float], k: int) -> float:
ranked = sorted(shares, reverse=True)
return sum(ranked[:k])
def operator_share_map(committee_operator_ids: Sequence[str]) -> dict[str, float]:
total = max(1, len(committee_operator_ids))
counts: dict[str, int] = {}
for operator_id in committee_operator_ids:
counts[operator_id] = counts.get(operator_id, 0) + 1
return {operator_id: count / total for operator_id, count in counts.items()}
def run_scenario(scenario: SimulationScenario) -> ScenarioSummary:
scenario.validate()
params = scenario.protocol
actor_map = {actor.actor_id: actor for actor in scenario.actors}
operator_map = {operator.operator_id: operator for operator in scenario.operators}
validators_by_operator: dict[str, list[ValidatorSpec]] = {}
for validator in scenario.validators:
validators_by_operator.setdefault(validator.operator_id, []).append(validator)
coalition_bond_units = sum(
coins_to_units(v.bond_coins) for v in scenario.validators if actor_map[v.actor_id].adversarial
)
total_bond_units = max(1, sum(coins_to_units(v.bond_coins) for v in scenario.validators))
coalition_bond_share = coalition_bond_units / total_bond_units
coalition_operator_share = (
sum(1 for operator in scenario.operators if actor_map[operator.actor_id].adversarial) / max(1, len(scenario.operators))
)
previous_mode = MODE_FALLBACK
committee_history: list[EpochResult] = []
seen_active: dict[str, bool] = {validator.validator_id: False for validator in scenario.validators}
activation_latency: dict[str, int] = {}
fallback_durations: list[int] = []
current_fallback_duration = 0
recovery_times: list[int] = []
pending_recovery = False
previous_eligible_operators: set[str] = set()
eligibility_churn_events = 0
epochs_at_exact_threshold = 0
epochs_below_threshold = 0
epochs_at_recovery_threshold = 0
sticky_fallback_entry_count = 0
recovery_from_sticky_count = 0
previous_sticky = False
marginal_operator_ids = set(str(item) for item in scenario.threshold_controls.get("marginal_operator_ids", []))
marginal_operator_committee_share_sum = 0.0
marginal_operator_eligibility_churn = 0
previous_marginal_eligible: set[str] = set()
operators_filtered_by_bond_floor = 0
operators_filtered_by_availability = 0
epochs_with_bond_binding = 0
warmup_blocked_checks = 0
warmup_total_checks = 0
cooldown_blocked_checks = 0
cooldown_total_checks = 0
for epoch in range(1, scenario.epochs + 1):
availability_eligible_ops: set[str] = set()
candidate_pool: list[tuple[str, RankedCandidate]] = []
operator_results: list[EpochOperatorResult] = []
join_activations: list[str] = []
exits_removed: list[str] = []
bond_binding_this_epoch = False
for validator in scenario.validators:
now_active = lifecycle_active_for_epoch(validator, epoch, params)
if validator.join_source == "POST_GENESIS":
warmup_total_checks += 1
if not now_active:
joined_height = validator.join_height if validator.join_height is not None else epoch_start_height(validator.join_epoch, params)
boundary_height = epoch_start_height(epoch, params)
if boundary_height < joined_height + params.validator_warmup_blocks:
warmup_blocked_checks += 1
if validator.exit_epoch is not None or validator.exit_height is not None:
cooldown_total_checks += 1
if now_active:
exit_height = validator.exit_height if validator.exit_height is not None else epoch_start_height(validator.exit_epoch or 0, params)
boundary_height = epoch_start_height(epoch, params)
if boundary_height < exit_height + params.validator_cooldown_blocks and boundary_height >= exit_height:
cooldown_blocked_checks += 1
if now_active and not seen_active[validator.validator_id]:
seen_active[validator.validator_id] = True
if validator.join_source == "POST_GENESIS":
activation_latency[validator.validator_id] = epoch - validator.join_epoch
join_activations.append(validator.validator_id)
if not now_active and seen_active[validator.validator_id]:
exits_removed.append(validator.validator_id)
for operator_id, spec in operator_map.items():
base_eligible_validators = [
validator
for validator in validators_by_operator.get(operator_id, [])
if base_eligible_for_epoch(validator, epoch, params)
]
total_bond_units = sum(
coins_to_units(validator.bond_coins)
for validator in validators_by_operator.get(operator_id, [])
if lifecycle_active_for_epoch(validator, epoch, params)
)
status = availability_status_for_epoch(spec, epoch)
score_ok = availability_score_ok_for_epoch(spec, epoch)
bond_ok = spec.bond_ok_by_epoch.get(epoch, total_bond_units >= params.availability_min_bond_units)
is_availability_eligible = availability_eligible(status, score_ok, bond_ok)
if is_availability_eligible:
availability_eligible_ops.add(operator_id)
operator_results.append(
EpochOperatorResult(
operator_id=operator_id,
actor_id=spec.actor_id,
adversarial=actor_map[spec.actor_id].adversarial,
bonded_units=total_bond_units,
capped_bonded_units=min(total_bond_units, params.effective_bond_cap_units),
effective_weight=sqrt_effective_weight(min(total_bond_units, params.effective_bond_cap_units))
if total_bond_units > 0
else 0,
validator_count=len(validators_by_operator.get(operator_id, [])),
base_eligible_validator_count=len(base_eligible_validators),
availability_status=status,
availability_score_ok=score_ok,
availability_bond_ok=bond_ok,
availability_eligible=is_availability_eligible,
selected=False,
proposer=False,
)
)
eligible_count = len(sorted(availability_eligible_ops))
mode, reason = derive_mode_reason(previous_mode, eligible_count, params.min_eligible)
fallback_sticky = mode == MODE_FALLBACK and reason == REASON_STICKY
if eligible_count == params.min_eligible:
epochs_at_exact_threshold += 1
if eligible_count < params.min_eligible:
epochs_below_threshold += 1
if eligible_count == params.min_eligible + 1:
epochs_at_recovery_threshold += 1
if fallback_sticky and not previous_sticky:
sticky_fallback_entry_count += 1
if previous_sticky and mode == MODE_NORMAL:
recovery_from_sticky_count += 1
current_eligible_operators = set(availability_eligible_ops)
eligibility_churn_events += len(current_eligible_operators.symmetric_difference(previous_eligible_operators))
current_marginal_eligible = {operator_id for operator_id in current_eligible_operators if operator_id in marginal_operator_ids}
marginal_operator_eligibility_churn += len(current_marginal_eligible.symmetric_difference(previous_marginal_eligible))
previous_eligible_operators = current_eligible_operators
previous_marginal_eligible = current_marginal_eligible
for operator_id, spec in operator_map.items():
base_eligible_validators = [
validator
for validator in validators_by_operator.get(operator_id, [])
if base_eligible_for_epoch(validator, epoch, params)
]
lifecycle_active_validators = [
validator for validator in validators_by_operator.get(operator_id, []) if lifecycle_active_for_epoch(validator, epoch, params)
]
if lifecycle_active_validators and not base_eligible_validators:
operators_filtered_by_bond_floor += 1
bond_binding_this_epoch = True
if not base_eligible_validators:
continue
operator_status = availability_status_for_epoch(spec, epoch)
score_ok = availability_score_ok_for_epoch(spec, epoch)
total_bond_units = sum(coins_to_units(validator.bond_coins) for validator in base_eligible_validators)
bond_ok = spec.bond_ok_by_epoch.get(epoch, total_bond_units >= params.availability_min_bond_units)
operator_availability_eligible = availability_eligible(operator_status, score_ok, bond_ok)
if mode == MODE_NORMAL and not operator_availability_eligible:
operators_filtered_by_availability += 1
continue
candidate_pool.append(
(
operator_id,
candidate_for_operator(operator_id, base_eligible_validators, actor_map, epoch, params, scenario.seed_label),
)
)
candidates = [candidate for _, candidate in sorted(candidate_pool, key=lambda item: item[0])]
committee = select_finalized_committee(candidates, epoch_seed(scenario.seed_label, epoch), params.committee_size)
schedule = proposer_schedule(committee, epoch_seed(scenario.seed_label, epoch), epoch)
proposer = schedule[0] if schedule else None
candidate_actor: dict[bytes, str] = {}
selection_to_operator: dict[bytes, str] = {}
for operator_id, candidate in candidate_pool:
rep = representative_validator(
[
validator
for validator in validators_by_operator.get(operator_id, [])
if base_eligible_for_epoch(validator, epoch, params)
]
)
candidate_actor[candidate.pubkey] = rep.actor_id
selection_to_operator[candidate.selection_id] = operator_id
committee_operator_ids = tuple(selection_to_operator[c.selection_id] for c in committee)
committee_actor_shares: dict[str, float] = {}
for candidate in committee:
actor_id = candidate_actor[candidate.pubkey]
committee_actor_shares[actor_id] = committee_actor_shares.get(actor_id, 0.0) + 1.0 / max(1, len(committee))
proposer_actor_shares = proposer_share_by_actor(committee, proposer, candidate_actor)
coalition_committee_share = sum(
share for actor_id, share in committee_actor_shares.items() if actor_map[actor_id].adversarial
)
coalition_proposer_share = sum(
share for actor_id, share in proposer_actor_shares.items() if actor_map[actor_id].adversarial
)
operator_shares = operator_share_map(committee_operator_ids)
marginal_operator_committee_share_sum += sum(
share for operator_id, share in operator_shares.items() if operator_id in marginal_operator_ids
)
operator_result_index = {item.operator_id: item for item in operator_results}
selected_ops = set(committee_operator_ids)
proposer_op = selection_to_operator.get(proposer.selection_id) if proposer is not None else None
operator_results = [
replace(item, selected=item.operator_id in selected_ops, proposer=item.operator_id == proposer_op)
for item in sorted(operator_results, key=lambda op: op.operator_id)
]
if mode == MODE_FALLBACK:
current_fallback_duration += 1
pending_recovery = True
else:
if current_fallback_duration > 0:
fallback_durations.append(current_fallback_duration)
current_fallback_duration = 0
if pending_recovery:
recovery_times.append(1)
pending_recovery = False
committee_history.append(
EpochResult(
epoch=epoch,
derivation_mode=mode,
fallback_reason=reason,
fallback_sticky=fallback_sticky,
eligible_operator_count=eligible_count,
min_eligible_operators=params.min_eligible,
committee_operator_ids=committee_operator_ids,
proposer_operator_id=proposer_op,
coalition_committee_share=coalition_committee_share,
coalition_proposer_share=coalition_proposer_share,
committee_hhi=concentration_hhi(operator_shares.values()),
committee_top1_share=top_k_share(operator_shares.values(), 1),
committee_top3_share=top_k_share(operator_shares.values(), 3),
operators=tuple(operator_results),
join_activations=tuple(sorted(join_activations)),
exits_removed=tuple(sorted(exits_removed)),
)
)
previous_mode = mode
previous_sticky = fallback_sticky
if bond_binding_this_epoch:
epochs_with_bond_binding += 1
if current_fallback_duration > 0:
fallback_durations.append(current_fallback_duration)
fallback_epochs = sum(1 for epoch in committee_history if epoch.derivation_mode == MODE_FALLBACK)
sticky_epochs = sum(1 for epoch in committee_history if epoch.fallback_sticky)
fallback_entry_count = sum(
1
for index, epoch in enumerate(committee_history)
if epoch.derivation_mode == MODE_FALLBACK and (index == 0 or committee_history[index - 1].derivation_mode != MODE_FALLBACK)
)
max_operator_committee_share = 0.0
for epoch in committee_history:
shares = operator_share_map(epoch.committee_operator_ids).values()
if shares:
max_operator_committee_share = max(max_operator_committee_share, max(shares))
return ScenarioSummary(
scenario=scenario.name,
strategy_family=scenario.strategy_family,
baseline_scenario=scenario.baseline_scenario,
epochs=scenario.epochs,
protocol=asdict(params),
coalition_bond_share=coalition_bond_share,
coalition_operator_share=coalition_operator_share,
average_coalition_committee_share=sum(epoch.coalition_committee_share for epoch in committee_history) / scenario.epochs,
proposer_share=sum(epoch.coalition_proposer_share for epoch in committee_history) / scenario.epochs,
committee_share_delta_vs_bond_share=(
sum(epoch.coalition_committee_share for epoch in committee_history) / scenario.epochs
)
- coalition_bond_share,
split_amplification_ratio=(
(sum(epoch.coalition_committee_share for epoch in committee_history) / scenario.epochs) / coalition_bond_share
if coalition_bond_share > 0
else 0.0
),
fallback_epochs=fallback_epochs,
fallback_rate=fallback_epochs / scenario.epochs,
sticky_fallback_epochs=sticky_epochs,
sticky_fallback_rate=sticky_epochs / scenario.epochs,
fallback_entry_count=fallback_entry_count,
average_fallback_duration=(sum(fallback_durations) / len(fallback_durations)) if fallback_durations else 0.0,
max_fallback_duration=max(fallback_durations) if fallback_durations else 0,
average_recovery_time=(sum(recovery_times) / len(recovery_times)) if recovery_times else 0.0,
average_hhi=sum(epoch.committee_hhi for epoch in committee_history) / scenario.epochs,
average_top1_share=sum(epoch.committee_top1_share for epoch in committee_history) / scenario.epochs,
average_top3_share=sum(epoch.committee_top3_share for epoch in committee_history) / scenario.epochs,
max_operator_committee_share=max_operator_committee_share,
activation_latency_epochs=activation_latency,
average_activation_latency=(
sum(activation_latency.values()) / len(activation_latency) if activation_latency else 0.0
),
eligibility_churn_events=eligibility_churn_events,
epochs_at_exact_threshold=epochs_at_exact_threshold,
epochs_below_threshold=epochs_below_threshold,
epochs_at_recovery_threshold=epochs_at_recovery_threshold,
sticky_fallback_entry_count=sticky_fallback_entry_count,
recovery_from_sticky_count=recovery_from_sticky_count,
marginal_operator_committee_share=marginal_operator_committee_share_sum / scenario.epochs,
marginal_operator_eligibility_churn=marginal_operator_eligibility_churn,
operators_filtered_by_bond_floor=operators_filtered_by_bond_floor,
operators_filtered_by_availability=operators_filtered_by_availability,
bond_threshold_binding_rate=epochs_with_bond_binding / max(1, scenario.epochs),
warmup_blocking_rate=warmup_blocked_checks / max(1, warmup_total_checks),
cooldown_blocking_rate=cooldown_blocked_checks / max(1, cooldown_total_checks),
per_epoch=[asdict(epoch) for epoch in committee_history],
)
def compare_summaries(summaries: Sequence[ScenarioSummary]) -> dict[str, Any]:
rows = []
for summary in summaries:
rows.append(
{
"scenario": summary.scenario,
"strategy_family": summary.strategy_family,
"committee_share_pct": percent(summary.average_coalition_committee_share),
"proposer_share_pct": percent(summary.proposer_share),