diff --git a/AGENTS.md b/AGENTS.md index 64ead5f..cc3e546 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -64,7 +64,7 @@ uv run python -m unittest discover -s tests ```sh set -euo pipefail -VERSION=0.1.2 +VERSION=0.1.4 BRANCH="release-v${VERSION}" git fetch origin --tags --prune @@ -116,7 +116,7 @@ rm -rf /tmp/src-py-lib-release-check ```sh set -euo pipefail -VERSION=0.1.2 +VERSION=0.1.4 BRANCH="release-v${VERSION}" GH_REPO="sourcegraph/src-py-lib" @@ -140,7 +140,7 @@ gh pr merge "${BRANCH}" --repo "${GH_REPO}" --squash --delete-branch ```sh set -euo pipefail -VERSION=0.1.2 +VERSION=0.1.4 git fetch origin --tags --prune git switch main @@ -154,7 +154,7 @@ git push origin "v${VERSION}" ```sh set -euo pipefail -VERSION=0.1.2 +VERSION=0.1.4 GH_REPO="sourcegraph/src-py-lib" RUN_ID="$( diff --git a/pyproject.toml b/pyproject.toml index a827a60..8c2be08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dev = [ [project] name = "src-py-lib" -version = "0.1.3" +version = "0.1.4" description = "Reusable libraries for Sourcegraph projects" readme = "README.md" requires-python = ">=3.11" diff --git a/src/src_py_lib/__init__.py b/src/src_py_lib/__init__.py index 85a6ace..3518d9e 100644 --- a/src/src_py_lib/__init__.py +++ b/src/src_py_lib/__init__.py @@ -176,6 +176,7 @@ def _script_name() -> str: "load_json_cache", "load_json_subset", "logging", + "logging_context", "logging_settings_from_config", "log", "log_context", diff --git a/src/src_py_lib/clients/sourcegraph.py b/src/src_py_lib/clients/sourcegraph.py index b980fb1..69200b6 100644 --- a/src/src_py_lib/clients/sourcegraph.py +++ b/src/src_py_lib/clients/sourcegraph.py @@ -8,6 +8,7 @@ import queue import time from collections.abc import Iterable, Iterator, Mapping, Sequence +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from typing import Final, cast from urllib.parse import urlsplit @@ -19,6 +20,7 @@ from src_py_lib.utils.logging import ( current_trace_context, new_trace_context, + submit_with_log_context, trace_context_from_traceparent, traceparent_header, ) @@ -244,13 +246,27 @@ def stream_jaeger_trace_summaries( traces: Iterable[SourcegraphTrace] | None = None, *, retry_delays_seconds: Sequence[float] = JAEGER_TRACE_RETRY_DELAYS_SECONDS, + parallelism: int = 8, ) -> Iterator[SourcegraphJaegerTraceSummary]: """Yield compact Jaeger/debug summaries for traced Sourcegraph requests.""" - for trace in self.drain_traces() if traces is None else traces: - yield self.fetch_jaeger_trace_summary( - trace, - retry_delays_seconds=retry_delays_seconds, - ) + if parallelism < 1: + raise ValueError("parallelism must be at least 1") + pending_traces = list(self.drain_traces() if traces is None else traces) + with ThreadPoolExecutor( + max_workers=parallelism, + thread_name_prefix="SourcegraphJaegerTrace", + ) as executor: + futures = [ + submit_with_log_context( + executor, + self.fetch_jaeger_trace_summary, + trace, + retry_delays_seconds=retry_delays_seconds, + ) + for trace in pending_traces + ] + for future in as_completed(futures): + yield future.result() def fetch_jaeger_trace_summary( self, diff --git a/tests/test_logging_http_clients.py b/tests/test_logging_http_clients.py index ef703cd..cdfb6cc 100644 --- a/tests/test_logging_http_clients.py +++ b/tests/test_logging_http_clients.py @@ -8,6 +8,7 @@ import logging import subprocess import tempfile +import threading import unittest from collections.abc import Mapping from contextlib import redirect_stderr, redirect_stdout @@ -36,6 +37,7 @@ from src_py_lib.clients.sourcegraph import ( SourcegraphClient, SourcegraphClientConfig, + SourcegraphTrace, decode_external_service_id, decode_repository_id, encode_repository_id, @@ -1535,6 +1537,50 @@ def handler(request: httpx.Request) -> httpx.Response: self.assertEqual(summaries[0].graphql_operations[0]["operation"], "Viewer") self.assertEqual(summaries[0].errored_spans[0]["description"], "boom") + def test_sourcegraph_streams_jaeger_summaries_in_parallel(self) -> None: + trace_ids = ("1" * 32, "2" * 32, "3" * 32) + requested_trace_ids: list[str] = [] + first_batch_barrier = threading.Barrier(2, timeout=1) + + def handler(request: httpx.Request) -> httpx.Response: + trace_id = request.url.path.rsplit("/", 1)[-1] + requested_trace_ids.append(trace_id) + if trace_id in trace_ids[:2]: + first_batch_barrier.wait() + return httpx.Response( + 200, + json={ + "data": [ + { + "spans": [ + { + "operationName": f"trace {trace_id[0]}", + "duration": 1_000, + "tags": [], + } + ] + } + ] + }, + ) + + client = SourcegraphClient( + "https://sourcegraph.example.com/", + "token", + http=HTTPClient(max_attempts=1, transport=httpx.MockTransport(handler)), + ) + + summaries = list( + client.stream_jaeger_trace_summaries( + [SourcegraphTrace(trace_id) for trace_id in trace_ids], + retry_delays_seconds=(0,), + parallelism=2, + ) + ) + + self.assertCountEqual(requested_trace_ids, trace_ids) + self.assertCountEqual([summary.trace.trace_id for summary in summaries], trace_ids) + def test_graphql_client_paginates_cursor_results(self) -> None: http = RecordingHTTP( [ diff --git a/uv.lock b/uv.lock index 592882f..d26ff3b 100644 --- a/uv.lock +++ b/uv.lock @@ -254,7 +254,7 @@ wheels = [ [[package]] name = "src-py-lib" -version = "0.1.3" +version = "0.1.4" source = { editable = "." } dependencies = [ { name = "httpx" },