-
Notifications
You must be signed in to change notification settings - Fork 32
[dss_bench] Tool to generate automatic graphs for q/s based on various parameters #1519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,3 +131,5 @@ go | |
|
|
||
| # setuptools_scm | ||
| _version.py | ||
|
|
||
| dss_bench_out | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| # dss_bench | ||
|
|
||
| Benchmark DSS performance as a function of deployment parameters. | ||
|
|
||
| For every **context** variant x **test**, it: cleans the local ecosystem, | ||
|
the-glu marked this conversation as resolved.
|
||
| `make start-locally` with the right env, runs the test against all deployed | ||
| DSS at once, and records q/s + latency percentiles. Output is one plot per | ||
|
the-glu marked this conversation as resolved.
|
||
| test, q/s and latency vs context. | ||
|
|
||
| ## Layout | ||
|
|
||
| - `config.py` - global settings (node count, image, datastore type, duration, processes). | ||
| - `environment.py` - wraps `make start-locally` / `down-locally`, resolves DSS targets. | ||
| - `auth.py` - mints Dummy OAuth tokens (audience = DSS hostname). | ||
| - `driver.py` - runs `processes` processes per DSS for `duration`, times each call. | ||
| - `measure.py` - aggregates into total q/s + median/p95 latency. | ||
| - `plot.py` - one PNG per test (q/s + latency, one line per comparison arm). | ||
| - `arms.py` - optional comparison arms (image vs image, or datastore vs datastore). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't arms be a property of BenchTest? I would expect someone to describe them as arms of a test.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmm no, the arms are global for all benchtest, not specfic to a particular test. An Arm apply on the global context (eg. two different image), not two variants of the test. |
||
| - `run.py` - CLI matrix runner. | ||
| - `contexts/` - one file per context. | ||
| from 0 to 100 ms). | ||
| - `tests/` - one file per test. | ||
|
|
||
| ## Run | ||
|
|
||
| ```bash | ||
| make image # build the DSS/monitoring images once | ||
|
the-glu marked this conversation as resolved.
|
||
| uv run python -m monitoring.dss_bench.run --processes 8 --duration 30 | ||
| ``` | ||
|
|
||
| ## Compare (optional) | ||
|
|
||
| Two arms, overlaid on every plot. Omit both flags for no comparison. | ||
|
|
||
| ```bash | ||
| # two PRs / images | ||
| uv run python -m monitoring.dss_bench.run --compare-images interuss/dss:pr-A interuss/dss:pr-B | ||
| # two datastores | ||
| uv run python -m monitoring.dss_bench.run --compare-dbs crdb raft | ||
| ``` | ||
|
|
||
| The bench runs on the host: DSS nodes are reached at `http://localhost:80NN` | ||
| and tokens carry the matching `dss<j>.uss<i>.localutm` audience, so no extra | ||
| container or network is needed. | ||
|
|
||
| ## Add a context or test | ||
|
|
||
| Drop a new file in `contexts/` (subclass `Context`) or `tests/` (subclass `BenchTest`). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| """Optional comparison arms: run the whole matrix under several configs and | ||
| overlay them on the plots. Default is a single arm (no comparison).""" | ||
|
|
||
| import dataclasses | ||
| from dataclasses import dataclass, field | ||
|
|
||
| from monitoring.dss_bench.config import GlobalConfig | ||
|
|
||
|
|
||
| @dataclass | ||
| class Arm: | ||
| label: str | ||
| overrides: dict = field(default_factory=dict) | ||
|
|
||
| def apply(self, cfg: GlobalConfig) -> GlobalConfig: | ||
| return dataclasses.replace(cfg, **self.overrides) | ||
|
|
||
|
|
||
| def single(cfg: GlobalConfig) -> list[Arm]: | ||
| return [Arm(label="baseline")] | ||
|
|
||
|
|
||
| def compare_images(img_a: str, img_b: str) -> list[Arm]: | ||
| return [ | ||
| Arm(label=img_a, overrides={"dss_image": img_a}), | ||
| Arm(label=img_b, overrides={"dss_image": img_b}), | ||
| ] | ||
|
|
||
|
|
||
| def compare_datastores(db_a: str, db_b: str) -> list[Arm]: | ||
| return [ | ||
| Arm(label=db_a, overrides={"db_type": db_a}), | ||
| Arm(label=db_b, overrides={"db_type": db_b}), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| """Mint a JWT from the local Dummy OAuth server.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not reuse an AuthAdapter and only hardcode the AuthSpec? |
||
|
|
||
| import requests | ||
|
|
||
|
|
||
| def issue_token(endpoint: str, sub: str, audience: str, scopes: list[str]) -> str: | ||
| params = { | ||
| "grant_type": "client_credentials", | ||
| "scope": " ".join(scopes), | ||
| "intended_audience": audience, | ||
| "issuer": "dummy", | ||
| "sub": sub, | ||
| } | ||
| resp = requests.get(endpoint, params=params, timeout=10) | ||
| resp.raise_for_status() | ||
| return resp.json()["access_token"] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| """Global, parameterizable settings for a benchmark run.""" | ||
|
|
||
| from dataclasses import dataclass | ||
|
|
||
|
|
||
| @dataclass | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For classes we would likely want to (de)serialize, ImplicitDict is the preferred choice and it seems nearly certain users will want to adjust these values.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but it's adjusted through flags and that the internal class to store them, not something that is to be shared/deserialized ? |
||
| class GlobalConfig: | ||
| # Local ecosystem sizing (consumed by `make start-locally`). | ||
| num_uss: int = 3 | ||
| num_nodes: int = 3 | ||
| dss_image: str = "interuss/dss:v0.22.0" | ||
| db_type: str = "crdb" # crdb | ybdb | raft | ||
| intra_netem: str = "delay 600us 40us 25% distribution normal loss 0.0005%" | ||
| inter_netem: str = "delay 36ms 40ms 50% distribution paretonormal loss 0.25% 15%" | ||
|
|
||
| # Load profile. | ||
| duration_s: float = 120.0 | ||
| processes: int = 4 # parallel processes calling action(), PER DSS | ||
|
|
||
| # Dummy OAuth reachable from the host. | ||
| oauth_token_endpoint: str = "http://localhost:8085/token" | ||
| oauth_sub: str = "uss1" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| """Auto-discover Context subclasses defined in this package.""" | ||
|
|
||
| import importlib | ||
| import inspect | ||
| import pkgutil | ||
|
|
||
| from monitoring.dss_bench.contexts.base import Context | ||
|
|
||
|
|
||
| def discover() -> dict[str, type[Context]]: | ||
| found: dict[str, type[Context]] = {} | ||
| for info in pkgutil.iter_modules(__path__): | ||
| if info.name == "base": | ||
| continue | ||
| module = importlib.import_module(f"{__name__}.{info.name}") | ||
| for _, obj in inspect.getmembers(module, inspect.isclass): | ||
| if ( | ||
| issubclass(obj, Context) | ||
| and obj is not Context | ||
| and obj.__module__ == module.__name__ | ||
| ): | ||
| found[obj.name] = obj | ||
| return found |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| """Base class for experiments that sweep across a range of values. A Sweep yields contexts for each value in the range, and contexts are sets of environment variables that are used in `make start-locally` before a measurement.""" | ||
|
|
||
| from dataclasses import dataclass | ||
|
|
||
|
|
||
| @dataclass | ||
| class Context: | ||
| label: str | ||
| """Label of the characteristic of interest in this context, e.g. '50ms'""" | ||
| env: dict[str, str] | ||
| """Additional environment variable values for `make start-locally`""" | ||
|
|
||
|
|
||
| class Sweep: | ||
| name: str = "base" | ||
| """<description of what this is, and especially how its content should differ from variable_label (why not just use variable_label?)>""" | ||
|
|
||
| variable_label: str = "context" | ||
| """Label of the value being swept, useful for labeling the axis in which the contexts are displayed""" | ||
|
|
||
| def variants(self) -> list[Context]: | ||
| raise NotImplementedError |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| """Sweep inter-USS network latency from 0 to 100 ms in 10 ms steps. | ||
| Jitter is 5% of the base delay; distribution/loss keep the deployment default. | ||
| """ | ||
|
|
||
| import re | ||
|
|
||
| from monitoring.dss_bench.contexts.base import Context, Variant | ||
|
|
||
|
|
||
| class ByLatency(Context): | ||
| name = "inter_latency" | ||
|
|
||
| axis_label = "inter-USS netem delay" | ||
|
|
||
| def __init__( | ||
| self, | ||
| delays: list[str] | None = None, | ||
| jitter_frac: float = 0.05, | ||
| correlation: str = "50%", | ||
| distribution: str = "paretonormal", | ||
| loss: str = "0.25% 15%", | ||
| ): | ||
| self.delays = delays or [ | ||
| "0ms", | ||
| "10ms", | ||
| "20ms", | ||
| "30ms", | ||
| "40ms", | ||
| "50ms", | ||
| "60ms", | ||
| "70ms", | ||
| "80ms", | ||
| "90ms", | ||
| "100ms", | ||
| ] | ||
| self.jitter_frac = jitter_frac | ||
| self.correlation = correlation | ||
| self.distribution = distribution | ||
| self.loss = loss | ||
|
|
||
| def _line(self, delay: str) -> str: | ||
|
|
||
| # netem rejects `distribution` when base delay is 0; keep only loss there. | ||
| m = re.match(r"([\d.]+)\s*([a-z]*)", delay) | ||
| value = float(m.group(1)) if m else 0.0 | ||
|
|
||
| if not m or value == 0: | ||
| return f"loss {self.loss}" | ||
|
|
||
| jitter = f"{value * self.jitter_frac:g}{m.group(2)}" | ||
|
|
||
| return ( | ||
| f"delay {delay} {jitter} {self.correlation} " | ||
| f"distribution {self.distribution} loss {self.loss}" | ||
| ) | ||
|
|
||
| def variants(self) -> list[Variant]: | ||
| return [ | ||
| Variant(label=d, env={"INTER_USS_NETEM_CONF": self._line(d)}) | ||
| for d in self.delays | ||
| ] |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,76 @@ | ||||||
| """Run a BenchTest with `cfg.processes` processes PER DSS, in parallel across | ||||||
| all DSS, for `cfg.duration_s`. Each process is one sequential caller; q/s | ||||||
| emerges from the number of processes.""" | ||||||
|
|
||||||
| import time | ||||||
| from multiprocessing import Process, Queue | ||||||
|
|
||||||
| import requests | ||||||
|
|
||||||
| from monitoring.dss_bench.auth import issue_token | ||||||
| from monitoring.dss_bench.config import GlobalConfig | ||||||
| from monitoring.dss_bench.tests.base import BenchTest | ||||||
|
|
||||||
|
|
||||||
| def _worker( | ||||||
| test: BenchTest, | ||||||
| target: tuple[str, str], | ||||||
| cfg: GlobalConfig, | ||||||
| q: Queue, | ||||||
| ) -> None: | ||||||
| base_url, audience = target | ||||||
| token = issue_token(cfg.oauth_token_endpoint, cfg.oauth_sub, audience, test.scopes) | ||||||
| session = requests.Session() | ||||||
| session.headers["Authorization"] = f"Bearer {token}" | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like we should leverage the existing auth infrastructure rather than writing from scratch |
||||||
|
|
||||||
| try: | ||||||
| test.setup(session, base_url) | ||||||
| except Exception: | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will prevent even the user from cancelling execution with KeyboardInterrupt; it seems like we should be much narrower in the exceptions we catch. What exceptions would we want to accept and continue for here? Wouldn't we expect the setup to work, and want to stop a test as probably invalid if the setup wasn't successful?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't prevent the user to cancel execution, Exception is not a subclass of KeyboardException. However yes, letting the test run when setup fail is probably wrong, I switched to an early return. I let the |
||||||
| pass | ||||||
|
|
||||||
| latencies_ms: list[float] = [] | ||||||
| error_latencies_ms: list[float] = [] | ||||||
| done = 0 | ||||||
| end = time.monotonic() + cfg.duration_s | ||||||
| while time.monotonic() < end: | ||||||
| t0 = time.monotonic() | ||||||
| try: | ||||||
| test.action(session, base_url) | ||||||
| latencies_ms.append((time.monotonic() - t0) * 1000.0) | ||||||
| done += 1 | ||||||
| except Exception: | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like an overbroad catch; could we just use query_and_describe to catch the right exceptions in the right circumstances and then check whether the query succeeded?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could restrict the catch, but the idea is to be large to catch others potential errors (like wrong data returned, etc.).
|
||||||
| # Keep how long the failed call took (e.g. a ~10s timeout) instead | ||||||
| # of dropping it: discarding slow failures biases percentiles down. | ||||||
| error_latencies_ms.append((time.monotonic() - t0) * 1000.0) | ||||||
|
|
||||||
| try: | ||||||
| test.teardown(session, base_url) | ||||||
| except Exception: | ||||||
| pass | ||||||
|
|
||||||
| q.put((base_url, latencies_ms, error_latencies_ms)) | ||||||
|
|
||||||
|
|
||||||
| def run_test( | ||||||
| test: BenchTest, targets: list[tuple[str, str]], cfg: GlobalConfig | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's hard to figure out what "targets" is, requiring tracing though the code; let's just make a simple data structure so it's super clear: @dataclass
class Target:
base_url: str
audience: str
Suggested change
...but, it doesn't seem like carrying |
||||||
| ) -> dict[str, dict]: | ||||||
| """Return {base_url: {"latencies": [...ms], "error_latencies": [...ms]}}.""" | ||||||
| q: Queue = Queue() | ||||||
| procs = [] | ||||||
| for target in targets: | ||||||
| for _ in range(cfg.processes): | ||||||
| p = Process(target=_worker, args=(test, target, cfg, q)) | ||||||
| p.start() | ||||||
| procs.append(p) | ||||||
|
|
||||||
| results: dict[str, dict] = { | ||||||
| url: {"latencies": [], "error_latencies": []} for url, _ in targets | ||||||
| } | ||||||
| for _ in procs: | ||||||
| url, lat, err_lat = q.get() | ||||||
| results[url]["latencies"].extend(lat) | ||||||
| results[url]["error_latencies"].extend(err_lat) | ||||||
| for p in procs: | ||||||
| p.join() | ||||||
|
|
||||||
| return results | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| """Drive `make start-locally` / `make down-locally` and resolve DSS targets. | ||
|
|
||
| Each DSS node is published on the host at port 80<NN> where NN is the | ||
| 2-digit global node index, and validates JWTs whose audience equals its | ||
| hostname dss<j>.uss<i>.localutm. We therefore hit http://localhost:80NN | ||
|
the-glu marked this conversation as resolved.
|
||
| while minting tokens for that audience. | ||
| """ | ||
|
|
||
| import os | ||
| import subprocess | ||
| from pathlib import Path | ||
|
|
||
| from monitoring.dss_bench.config import GlobalConfig | ||
|
|
||
| REPO_ROOT = Path(__file__).resolve().parents[2] | ||
|
|
||
|
|
||
| def _env(cfg: GlobalConfig, extra: dict[str, str]) -> dict[str, str]: | ||
| env = dict(os.environ) | ||
| env.update( | ||
| { | ||
| "NUM_USS": str(cfg.num_uss), | ||
| "NUM_NODES": str(cfg.num_nodes), | ||
| "DSS_IMAGE": cfg.dss_image, | ||
| "DB_TYPE": cfg.db_type, | ||
| "INTRA_USS_NETEM_CONF": cfg.intra_netem, | ||
| "INTER_USS_NETEM_CONF": cfg.inter_netem, | ||
| } | ||
| ) | ||
| env.update(extra) | ||
| return env | ||
|
|
||
|
|
||
| def up(cfg: GlobalConfig, extra: dict[str, str]) -> None: | ||
| subprocess.run( | ||
| ["make", "start-locally"], cwd=REPO_ROOT, env=_env(cfg, extra), check=True | ||
| ) | ||
|
|
||
|
|
||
| def down(cfg: GlobalConfig) -> None: | ||
| subprocess.run( | ||
| ["make", "clean-locally"], cwd=REPO_ROOT, env=_env(cfg, {}), check=False | ||
| ) | ||
|
|
||
|
|
||
| def dss_targets(cfg: GlobalConfig) -> list[tuple[str, str]]: | ||
| """Return (base_url, jwt_audience) for every deployed DSS node.""" | ||
| targets = [] | ||
| for i in range(1, cfg.num_uss + 1): | ||
| for j in range(1, cfg.num_nodes + 1): | ||
| node_idx = (i - 1) * cfg.num_nodes + j | ||
| url = f"http://localhost:80{node_idx:02d}" | ||
| audience = f"dss{j}.uss{i}.localutm" | ||
| targets.append((url, audience)) | ||
| return targets | ||
Uh oh!
There was an error while loading. Please reload this page.