From e25f243aa9b5ba75e2c501fa473f285103736c94 Mon Sep 17 00:00:00 2001 From: will-exaforce Date: Thu, 18 Jun 2026 12:57:56 -0500 Subject: [PATCH] Make meta-analyzer structured-output failures degrade per batch Harden the LLM meta-analyzer against unreliable served structured output and tighten the model-output normalization contract. Reliability / correctness: - run_batches / arun_batches skip a batch whose structured call fails after retries (StructuredOutputError) instead of aborting the whole pass; other exceptions (e.g. credential ValueError) still propagate. arun_batches uses gather(return_exceptions=True) so one failure no longer discards sibling results. - apply_filter distinguishes "evaluated but not confirmed" (drop) from "never evaluated" because its batch failed (preserve un-enriched), at batch granularity. A security tool must not silently drop an unreviewed finding. - Drop the lenient truncation-salvage parser: a truncated stringified findings array now fails cleanly -> retried -> batch skipped -> findings preserved, which is safer than salvaging a partial array and dropping the rest. Normalization: - Replace fragile substring impact/intent coercion with a shared token-scan + negation guard (_coerce_to_enum). Fixes 'unsafe'->benign, 'non-critical'-> critical, 'medium-low'->low, and 'follow-up'->low, while still covering phrases like 'high impact'. Ambiguous compounds bias to the higher level. Cleanup: - Remove dead OverallAssessment / overall_assessment (never consumed); shrinks the model's required output, reducing truncation surface. - Dedup the sync/async retry methods via shared helpers and add an explicit terminal raise so they can never silently return None. - Use dataclasses.replace when splitting batches; correct the StructuredOutputError docstring (LengthFinishReasonError is not a ValueError). Tests cover failed-batch preservation, run-loop resilience, non-structured error propagation, and the coercion regressions. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/skillspector/llm_analyzer_base.py | 142 ++++++++++++- src/skillspector/nodes/meta_analyzer.py | 272 ++++++++++++++++++++---- tests/nodes/test_llm_analyzer_base.py | 186 +++++++++++++++- 3 files changed, 538 insertions(+), 62 deletions(-) diff --git a/src/skillspector/llm_analyzer_base.py b/src/skillspector/llm_analyzer_base.py index aa3e7e9..79619f9 100644 --- a/src/skillspector/llm_analyzer_base.py +++ b/src/skillspector/llm_analyzer_base.py @@ -32,8 +32,10 @@ from dataclasses import dataclass, field from typing import Literal +import openai +from langchain_core.exceptions import OutputParserException from langchain_core.messages import BaseMessage -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ValidationError from skillspector.llm_utils import get_chat_model from skillspector.logging_config import get_logger @@ -219,11 +221,56 @@ def _message_text(response: object) -> str: - Be precise: report only genuine issues, not speculative ones.""" +# Structured-output method. langchain-openai's default is "json_schema", which +# has the model emit the result as ordinary message *content* under a strict +# JSON grammar. On some OpenAI-compatible serving stacks, content-mode JSON +# generation degenerates into a whitespace-repetition loop — the model emits one +# valid finding, then thousands of whitespace tokens until the output cap +# (finish_reason="length"), which the strict parser then discards. This was +# observed across unrelated models (NVIDIA Nemotron, gpt-oss), i.e. it is a +# property of the serving stack's content generation, not any one model. Emitting +# the same data as a *tool call* ("function_calling") avoids the content path and +# eliminates the runaway empirically (0/6 vs 4/6 on a 90-finding meta call). Free +# (non-guided) generation runs away just as badly, so this is specifically about +# tool-call vs content output — not guided-decoding-vs-not. +_STRUCTURED_OUTPUT_METHOD = "function_calling" + +# function_calling still occasionally returns tool-call arguments that don't +# satisfy the schema (a fast, retryable ValidationError — not a slow runaway), so +# retry a failed structured call a few times; each attempt is a fresh sample. +_LLM_MAX_RETRIES = 2 + +# Structured-output failures that a fresh sample usually fixes. All are caught +# per attempt and retried; on the final attempt they are wrapped in +# StructuredOutputError (see its docstring for why wrapping matters). +_RETRYABLE_STRUCTURED_ERRORS = ( + openai.LengthFinishReasonError, + OutputParserException, + ValidationError, +) + + # --------------------------------------------------------------------------- # Base LLM Analyzer # --------------------------------------------------------------------------- +class StructuredOutputError(RuntimeError): + """A structured LLM call failed after all retries. + + Deliberately a ``RuntimeError`` (NOT a ``ValueError``): the analyzer nodes + re-raise ``ValueError`` (to surface credential errors) but fall back on any + other ``Exception``. Two of the underlying failures — ``OutputParserException`` + and ``ValidationError`` — *are* ``ValueError`` subclasses, so raising them + directly would be re-raised by the nodes and crash the scan instead of + degrading gracefully. (``openai.LengthFinishReasonError`` is *not* a + ``ValueError`` and would already fall through to the nodes' generic + ``except Exception``, but it is wrapped here too for uniform handling.) + Wrapping breaks the ``ValueError`` re-raise path so a failed structured + call degrades instead of crashing. + """ + + class LLMAnalyzerBase: """Per-file / per-chunk LLM analyzer. @@ -250,9 +297,63 @@ def __init__(self, base_prompt: str, model: str): self._input_budget = get_max_input_tokens(model) self._llm = get_chat_model(model=model) self._structured_llm = ( - self._llm.with_structured_output(self.response_schema) if self.response_schema else None + self._llm.with_structured_output(self.response_schema, method=_STRUCTURED_OUTPUT_METHOD) + if self.response_schema + else None + ) + + # -- Structured-output invocation (retry) ------------------------------- + + @staticmethod + def _check_structured_result(result: object) -> object: + """Treat a ``None`` structured result as a retryable failure. + + The model emitted no tool call at all — common on oversized outputs. + ``None`` isn't an exception, so surface it as one so the retry loop + re-samples instead of returning ``None`` to ``parse_response``. + """ + if result is None: + raise OutputParserException("structured call returned no tool call") + return result + + @staticmethod + def _handle_structured_failure(attempt: int, exc: Exception) -> None: + """On the final attempt wrap-and-raise; otherwise log and let the caller retry.""" + if attempt == _LLM_MAX_RETRIES: + raise StructuredOutputError(str(exc)) from exc + logger.warning( + "structured LLM call failed (attempt %d/%d), retrying: %s", + attempt + 1, + _LLM_MAX_RETRIES + 1, + exc, ) + def _invoke_structured(self, prompt: str): + """Invoke the structured LLM, retrying on a failed structured response. + + Failures are sampling-dependent, so a fresh attempt usually succeeds: + ``LengthFinishReasonError`` (content-mode whitespace runaway hitting the + token cap), ``OutputParserException`` (unparseable tool call or no tool + call at all), or ``ValidationError`` (tool-call args that miss the schema). + """ + for attempt in range(_LLM_MAX_RETRIES + 1): + try: + return self._check_structured_result(self._structured_llm.invoke(prompt)) + except _RETRYABLE_STRUCTURED_ERRORS as exc: + self._handle_structured_failure(attempt, exc) + # _handle_structured_failure always raises on the final attempt; this is + # an explicit guard so the method never silently returns None. + raise StructuredOutputError("structured call exhausted retries") + + async def _ainvoke_structured(self, prompt: str): + """Async counterpart of :meth:`_invoke_structured`.""" + for attempt in range(_LLM_MAX_RETRIES + 1): + try: + return self._check_structured_result(await self._structured_llm.ainvoke(prompt)) + except _RETRYABLE_STRUCTURED_ERRORS as exc: + self._handle_structured_failure(attempt, exc) + raise StructuredOutputError("structured call exhausted retries") + # -- Batching ----------------------------------------------------------- def _estimate_extra_overhead(self, findings: list[Finding]) -> int: @@ -350,6 +451,12 @@ def run_batches( The element type of the inner list depends on the subclass: the default :meth:`parse_response` returns :class:`Finding` objects; subclasses may return dicts or other types. + + A batch whose structured call fails after all retries + (:class:`StructuredOutputError`) is logged and skipped rather than + aborting the whole run, so the batches that did succeed still + contribute their results. Other exceptions (e.g. credential + ``ValueError``) propagate. """ results: list[tuple[Batch, list]] = [] for batch in batches: @@ -360,10 +467,14 @@ def run_batches( estimate_tokens(prompt), len(batch.findings), ) - if self._structured_llm: - response = self._structured_llm.invoke(prompt) - else: - response = _message_text(self._llm.invoke(prompt)) + try: + if self._structured_llm: + response = self._invoke_structured(prompt) + else: + response = _message_text(self._llm.invoke(prompt)) + except StructuredOutputError as exc: + logger.warning("Skipping %s: %s", batch.file_label, exc) + continue logger.debug("LLM response for %s", batch.file_label) parsed = self.parse_response(response, batch) results.append((batch, parsed)) @@ -382,7 +493,11 @@ async def arun_batches( *max_concurrency* LLM requests in parallel. Both cross-file and cross-chunk batches are parallelized in a single gather call. - The return type mirrors :meth:`run_batches`. + The return type mirrors :meth:`run_batches`. As there, a batch whose + structured call fails after all retries (:class:`StructuredOutputError`) + is logged and dropped from the results instead of aborting the whole + gather, so the successful batches still contribute. Other exceptions + propagate. """ sem = asyncio.Semaphore(max_concurrency) @@ -396,13 +511,22 @@ async def _process(batch: Batch) -> tuple[Batch, list]: len(batch.findings), ) if self._structured_llm: - response = await self._structured_llm.ainvoke(prompt) + response = await self._ainvoke_structured(prompt) else: response = _message_text(await self._llm.ainvoke(prompt)) logger.debug("LLM response for %s", batch.file_label) return (batch, self.parse_response(response, batch)) - return list(await asyncio.gather(*[_process(b) for b in batches])) + gathered = await asyncio.gather(*[_process(b) for b in batches], return_exceptions=True) + results: list[tuple[Batch, list]] = [] + for batch, outcome in zip(batches, gathered, strict=True): + if isinstance(outcome, StructuredOutputError): + logger.warning("Skipping %s: %s", batch.file_label, outcome) + continue + if isinstance(outcome, BaseException): + raise outcome + results.append(outcome) + return results # -- Convenience -------------------------------------------------------- diff --git a/src/skillspector/nodes/meta_analyzer.py b/src/skillspector/nodes/meta_analyzer.py index 8f2b541..909d83b 100644 --- a/src/skillspector/nodes/meta_analyzer.py +++ b/src/skillspector/nodes/meta_analyzer.py @@ -24,6 +24,8 @@ import asyncio import json +import re +from dataclasses import replace from typing import Literal from pydantic import BaseModel, Field, field_validator @@ -49,6 +51,86 @@ # --------------------------------------------------------------------------- +# Prefixes that invert/diminish a level word ('non-critical', 'not safe'); when +# present, token matching can't be trusted, so we fall back to the safe default. +_ENUM_NEGATIONS = frozenset({"non", "not", "no", "anti"}) + + +def _coerce_to_enum( + value: object, aliases: dict[str, str], default: str, ranked: tuple[str, ...] +) -> object: + """Map an out-of-enum string to a canonical level; default when unmappable. + + Non-strings pass through unchanged (Pydantic reports the type error). For + strings: + + 1. An exact (stripped/lowercased) hit in *aliases* wins. + 2. Otherwise the value is split into word tokens; any *alias* token's + canonical value is a candidate, and the highest-ranked candidate is + returned (*ranked* is ordered most- to least-severe, so 'medium-high' + -> high and 'medium-low' -> medium — ambiguity biases higher, which is + safer for a security tool). + 3. A negation/diminutive token ('non-critical', 'not safe') voids token + matching and falls back to *default*. + + Token matching (not substring) is deliberate: substrings mis-mapped 'unsafe' + -> benign (contains 'safe'), 'non-critical' -> critical, and 'follow-up' + -> low. Tokenizing avoids all three while still handling phrases like + 'high impact' that an exact table would miss. + """ + if not isinstance(value, str): + return value + t = value.strip().lower() + if t in aliases: + return aliases[t] + tokens = [tok for tok in re.split(r"[^a-z]+", t) if tok] + if not any(tok in _ENUM_NEGATIONS for tok in tokens): + hits = [aliases[tok] for tok in tokens if tok in aliases] + if hits: + return min(hits, key=ranked.index) + logger.debug("coerced out-of-enum value %r -> %r (default)", value, default) + return default + + +# Canonical levels, ordered most- to least-severe; used to bias ambiguous +# multi-word values toward the higher level. +_IMPACT_RANK: tuple[str, ...] = ("critical", "high", "medium", "low") +_INTENT_RANK: tuple[str, ...] = ("malicious", "negligent", "benign") + +# Single-word synonyms -> canonical level. Compounds ('medium-high') are handled +# by the token scan in _coerce_to_enum, so only single words need listing. +_IMPACT_ALIASES: dict[str, str] = { + "critical": "critical", + "catastrophic": "critical", + "high": "high", + "severe": "high", + "major": "high", + "medium": "medium", + "moderate": "medium", + "med": "medium", + "low": "low", + "minor": "low", + "informational": "low", + "info": "low", + "negligible": "low", + "none": "low", +} +_INTENT_ALIASES: dict[str, str] = { + "malicious": "malicious", + "malware": "malicious", + "intentional": "malicious", + "deliberate": "malicious", + "negligent": "negligent", + "negligence": "negligent", + "careless": "negligent", + "accidental": "negligent", + "unintentional": "negligent", + "benign": "benign", + "safe": "benign", + "harmless": "benign", +} + + class MetaAnalyzerFinding(BaseModel): """A single finding evaluated by the meta-analyzer LLM (filter/enrich mode).""" @@ -73,30 +155,50 @@ class MetaAnalyzerFinding(BaseModel): explanation: str = Field(default="", description="Why this is dangerous (2-3 sentences)") remediation: str = Field(default="", description="How to fix the issue (actionable steps)") + @field_validator("impact", mode="before") + @classmethod + def _coerce_impact(cls, v: object) -> object: + """Coerce out-of-enum impact values (LLMs emit e.g. 'medium-high', + 'Critical', 'informational') to a canonical level; default 'medium'.""" + return _coerce_to_enum(v, _IMPACT_ALIASES, "medium", _IMPACT_RANK) -class OverallAssessment(BaseModel): - """Overall risk assessment for the analyzed file.""" - - risk_level: str = Field(description="Overall risk level: LOW, MEDIUM, HIGH, or CRITICAL") - summary: str = Field(description="Brief summary of findings") + @field_validator("intent", mode="before") + @classmethod + def _coerce_intent(cls, v: object) -> object: + """Coerce out-of-enum intent values to the closest of + malicious/negligent/benign; default 'negligent'.""" + return _coerce_to_enum(v, _INTENT_ALIASES, "negligent", _INTENT_RANK) class MetaAnalyzerResult(BaseModel): - """Top-level structured response from the meta-analyzer LLM.""" + """Top-level structured response from the meta-analyzer LLM. + + Only ``findings`` is modelled: it is the sole field the node consumes. A + smaller required output is also less likely to truncate, which is the whole + point of the batching/retry machinery below. + """ findings: list[MetaAnalyzerFinding] = Field(default_factory=list) - overall_assessment: OverallAssessment | None = None - @field_validator("overall_assessment", mode="before") + @field_validator("findings", mode="before") @classmethod - def _parse_stringified_assessment(cls, v: object) -> object: - """LLMs sometimes return nested objects as JSON strings.""" - if isinstance(v, str): - try: - return json.loads(v) - except (json.JSONDecodeError, TypeError): - return None - return v + def _parse_stringified_findings(cls, v: object) -> object: + """LLMs (esp. via tool-calling) sometimes return the findings array as a + JSON-encoded string rather than a native list. Decode it. + + A *truncated* string fails to parse here; we deliberately let that raise + (it surfaces as a retryable failure, and if it keeps failing the whole + batch is skipped). Skipping preserves the batch's findings un-enriched + via :meth:`LLMMetaAnalyzer.apply_filter`, which is strictly safer for a + security tool than salvaging a partial array and silently dropping the + findings the model never finished emitting.""" + if not isinstance(v, str): + return v + parsed = json.loads(v) # truncated -> JSONDecodeError -> retried, then preserved + if isinstance(parsed, list): + return parsed + # A lone object (one finding, not wrapped in an array) is still valid. + return [parsed] if isinstance(parsed, dict) else [] # --------------------------------------------------------------------------- @@ -193,30 +295,37 @@ def _format_findings_for_prompt(findings: list[Finding]) -> str: return "\n".join(lines) +def _fallback_finding(f: Finding) -> Finding: + """Pass *f* through with a default remediation but no LLM enrichment. + + Used both when the LLM is unavailable/failed and, within :meth:`apply_filter`, + for findings the LLM never returned a verdict on (a truncated or failed batch) + — preserving them rather than silently dropping an unreviewed finding. + """ + return Finding( + rule_id=f.rule_id, + message=f.message, + severity=f.severity, + confidence=f.confidence, + file=f.file, + start_line=f.start_line, + end_line=f.end_line, + remediation=f.remediation or get_remediation(f.rule_id), + tags=f.tags, + context=f.context, + matched_text=f.matched_text, + category=getattr(f, "category", None), + pattern=getattr(f, "pattern", None), + finding=getattr(f, "finding", None), + explanation=getattr(f, "explanation", None), + code_snippet=getattr(f, "code_snippet", None) or f.context, + intent=None, + ) + + def _fallback_filtered(findings: list[Finding]) -> list[Finding]: """Apply default remediation to all findings (pass-through with defaults).""" - return [ - Finding( - rule_id=f.rule_id, - message=f.message, - severity=f.severity, - confidence=f.confidence, - file=f.file, - start_line=f.start_line, - end_line=f.end_line, - remediation=f.remediation or get_remediation(f.rule_id), - tags=f.tags, - context=f.context, - matched_text=f.matched_text, - category=getattr(f, "category", None), - pattern=getattr(f, "pattern", None), - finding=getattr(f, "finding", None), - explanation=getattr(f, "explanation", None), - code_snippet=getattr(f, "code_snippet", None) or f.context, - intent=None, - ) - for f in findings - ] + return [_fallback_finding(f) for f in findings] # --------------------------------------------------------------------------- @@ -224,6 +333,19 @@ def _fallback_filtered(findings: list[Finding]) -> list[Finding]: # --------------------------------------------------------------------------- +# Max findings enriched per LLM call. Served models are unreliable producing a +# large structured result in one tool call: on big outputs they either emit no +# tool call at all, or stringify the findings array and truncate that string +# (unparseable). Smaller batches keep the result short enough to complete +# reliably (observed: 90→none, 10→occasional truncation, small→reliable). +# A batch that still fails after retries is skipped individually (run loops +# don't abort on one failure) and its findings are preserved un-enriched by +# apply_filter, so more batches no longer means a higher chance of losing the +# whole pass — 10 balances per-call reliability against re-sent input cost +# (each batch re-sends the file content; see the cost note in get_batches). +_MAX_FINDINGS_PER_BATCH = 10 + + class LLMMetaAnalyzer(LLMAnalyzerBase): """Per-file LLM filter/enrichment of static findings. @@ -236,6 +358,40 @@ class LLMMetaAnalyzer(LLMAnalyzerBase): def __init__(self, model: str): super().__init__(base_prompt=PER_FILE_ANALYSIS_PROMPT, model=model) + def get_batches( + self, + file_paths: list[str], + file_cache: dict[str, str], + findings: list[Finding] | None = None, + ) -> list[Batch]: + """Split each file's findings into groups of ``_MAX_FINDINGS_PER_BATCH``. + + The base batcher splits by *input* size, so a file with many static + findings becomes one call that must emit a large tool-call result. Served + models can't reliably produce a big structured result in one shot — they + return no tool call at all on very large outputs. Bounding findings per + call keeps each tool call small and reliable; ``apply_filter`` re-merges + across batches by (file, rule_id, line). + + Cost note: each sub-batch re-sends the full file content and prompt, so + a file with N findings costs ~ceil(N / _MAX_FINDINGS_PER_BATCH)x the + input tokens of a single call. _MAX_FINDINGS_PER_BATCH trades that + re-sent input against per-call output reliability. + """ + batches = super().get_batches(file_paths, file_cache, findings) + bounded: list[Batch] = [] + for batch in batches: + if len(batch.findings) <= _MAX_FINDINGS_PER_BATCH: + bounded.append(batch) + continue + for i in range(0, len(batch.findings), _MAX_FINDINGS_PER_BATCH): + # replace() carries every other Batch field forward, so a new + # field added to Batch isn't silently dropped for split batches. + bounded.append( + replace(batch, findings=batch.findings[i : i + _MAX_FINDINGS_PER_BATCH]) + ) + return bounded + def _estimate_extra_overhead(self, findings: list[Finding]) -> int: if not findings: return 0 @@ -271,7 +427,7 @@ def apply_filter( findings: list[Finding], batch_results: list[tuple[Batch, list[dict[str, object]]]], ) -> list[Finding]: - """Keep only LLM-confirmed findings, enriched with explanation / remediation. + """Keep LLM-confirmed findings (enriched) and findings the LLM never saw. Uses granular ``(file, rule_id, start_line, end_line)`` keying when the LLM provides a ``start_line``, so multiple findings with the same @@ -279,12 +435,33 @@ def apply_filter( is included in the key when provided but falls back to ``None`` so callers that omit it still match. Falls back to coarse ``(file, rule_id)`` keying for LLM responses that omit ``start_line``. + + Three outcomes per static finding: + + * **confirmed** (the LLM returned ``is_vulnerability`` with confidence + >= 0.6) — kept, enriched with explanation / remediation. + * **evaluated but not confirmed** (the finding's batch was processed + successfully and the LLM did not confirm it) — dropped. + * **never evaluated** (the finding's batch failed after all retries and + was skipped, so the LLM never saw it) — preserved via + :func:`_fallback_finding` rather than silently dropped, since a + security tool must not discard an unreviewed finding. + + "Evaluated" is tracked at *batch* granularity: a finding counts as + evaluated iff its batch appears in *batch_results* (i.e. that batch's + structured call succeeded). The per-response items can't be used for + this, because the model doesn't reliably emit an explicit verdict for + every finding — so a missing item means "not confirmed", not "not seen". """ _enrichment = tuple[str, str, float] confirmed_granular: dict[tuple[str, str, int, int | None], _enrichment] = {} confirmed_coarse: dict[tuple[str, str], _enrichment] = {} + # Findings whose batch was processed successfully (the LLM saw them). + evaluated_keys: set[tuple[str, str, int, int | None]] = set() for batch, llm_items in batch_results: + for bf in batch.findings: + evaluated_keys.add((bf.file, bf.rule_id, bf.start_line, bf.end_line)) for item in llm_items: pattern_id = item.get("pattern_id") if not pattern_id or not item.get("is_vulnerability", False): @@ -322,7 +499,10 @@ def apply_filter( expl, rem, conf = confirmed_granular[start_only_key] elif coarse_key in confirmed_coarse: expl, rem, conf = confirmed_coarse[coarse_key] + elif exact_key in evaluated_keys: + continue # LLM saw this finding's batch and did not confirm it -> drop else: + result.append(_fallback_finding(f)) # batch failed/skipped -> preserve continue result.append( Finding( @@ -359,11 +539,15 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse: When ``use_llm`` is *True* and an LLM API key is configured (see ``llm_utils._resolve_llm_credentials``), each file that has at least one finding gets its own LLM call (or multiple calls if the file is too - large for the model's input budget). Findings are matched back by - ``(file, rule_id)`` so enrichment is precise. - - Falls back to default remediations when ``use_llm`` is *False* or when - an LLM call fails. + large for the model's input budget or has many findings). Findings are + matched back by ``(file, rule_id, line)`` so enrichment is precise. + + Failures degrade gracefully at the smallest scope that still works: an + individual batch whose structured call fails after retries is skipped and + its findings are preserved un-enriched (see :meth:`LLMMetaAnalyzer.apply_filter`), + rather than discarding the whole pass. The blanket fallback to default + remediations applies only when ``use_llm`` is *False* or an unexpected + error escapes the run loop. """ findings: list[Finding] = state.get("findings", []) if not findings: diff --git a/tests/nodes/test_llm_analyzer_base.py b/tests/nodes/test_llm_analyzer_base.py index c1fabca..82b921a 100644 --- a/tests/nodes/test_llm_analyzer_base.py +++ b/tests/nodes/test_llm_analyzer_base.py @@ -20,6 +20,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest +from langchain_core.exceptions import OutputParserException from langchain_core.messages import AIMessage from skillspector.llm_analyzer_base import ( @@ -671,14 +672,48 @@ def test_confidence_validation(self) -> None: ) def test_intent_validation(self) -> None: - with pytest.raises(ValueError): - MetaAnalyzerFinding( + # Out-of-enum intent values are coerced to a safe default rather than + # raising, so a flaky model response doesn't fail the whole batch. + f = MetaAnalyzerFinding( + pattern_id="E1", + is_vulnerability=True, + confidence=0.5, + intent="unknown", + impact="high", + ) + assert f.intent == "negligent" + + def test_intent_coercion_does_not_downgrade_unsafe_to_benign(self) -> None: + # Regression: substring matching used to map 'unsafe' (contains 'safe') + # to 'benign' — the least suspicious intent. It must not. + f = MetaAnalyzerFinding( + pattern_id="E1", + is_vulnerability=True, + confidence=0.5, + intent="unsafe", + impact="high", + ) + assert f.intent == "negligent" + + def test_impact_coercion_biases_higher_and_avoids_false_escalation(self) -> None: + def impact_of(value: str) -> str: + return MetaAnalyzerFinding( pattern_id="E1", is_vulnerability=True, confidence=0.5, - intent="unknown", - impact="high", - ) + intent="negligent", + impact=value, + ).impact + + assert impact_of("medium-high") == "high" # ambiguous -> bias higher + assert impact_of("medium-low") == "medium" # ambiguous -> bias higher + assert impact_of("Critical") == "critical" # case-insensitive + assert impact_of("informational") == "low" + assert impact_of("non-critical") == "medium" # not falsely escalated to critical + assert impact_of("high impact") == "high" # phrase covered by token scan + assert impact_of("low risk") == "low" + assert impact_of("follow-up") == "medium" # 'low' substring must not match + assert impact_of("nonsense") == "medium" # unmappable -> default def test_empty_findings(self) -> None: result = MetaAnalyzerResult(findings=[]) @@ -1002,11 +1037,16 @@ def test_multiple_findings_same_file(self) -> None: assert len(result) == 2 @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) - def test_empty_batch_results(self) -> None: + def test_empty_batch_results_preserves_unevaluated_findings(self) -> None: + # No batch results means no batch was evaluated by the LLM (e.g. every + # batch failed after retries). Such findings must be preserved (un-enriched), + # not silently dropped — a security tool must not discard unreviewed findings. analyzer = LLMMetaAnalyzer(model=self.MODEL) findings = [self._make_finding("a.py", "E1")] result = analyzer.apply_filter(findings, []) - assert len(result) == 0 + assert len(result) == 1 + assert result[0].rule_id == "E1" + assert result[0].intent is None # fallback enrichment, not LLM-confirmed @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) def test_granular_keying_filters_per_instance(self) -> None: @@ -1116,12 +1156,40 @@ def test_end_line_used_when_provided(self) -> None: }, ] result = analyzer.apply_filter(findings, [(batch, llm_items)]) - # exact match for f_long; f_short has no exact match, falls back to start_only (None end_line) - # start_only key not in confirmed_granular, so f_short is not confirmed + # f_long matches the confirmed (5, 10) key exactly and is kept. f_short + # (5, 5) is not confirmed, but its batch was evaluated successfully, so + # it is dropped (not preserved as unreviewed). assert len(result) == 1 assert result[0].end_line == 10 assert result[0].explanation == "Long block is dangerous" + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + def test_findings_in_failed_batch_are_preserved(self) -> None: + """A finding whose batch is missing from batch_results (its call failed + after retries and was skipped) is preserved un-enriched, while findings + in a successful sibling batch are still filtered normally.""" + analyzer = LLMMetaAnalyzer(model=self.MODEL) + f_ok = self._make_finding("a.py", "E1", line=10) + f_failed = self._make_finding("a.py", "E2", line=20) + ok_batch = Batch(file_path="a.py", content="code", findings=[f_ok]) + # f_failed's batch never made it into batch_results (skipped on failure). + llm_items = [ + { + "pattern_id": "E1", + "start_line": 10, + "is_vulnerability": True, + "confidence": 0.9, + "explanation": "Confirmed", + "remediation": "Fix", + "_file": "a.py", + } + ] + result = analyzer.apply_filter([f_ok, f_failed], [(ok_batch, llm_items)]) + by_rule = {f.rule_id: f for f in result} + assert set(by_rule) == {"E1", "E2"} + assert by_rule["E1"].explanation == "Confirmed" # enriched + assert by_rule["E2"].intent is None # preserved via fallback, not evaluated + # --------------------------------------------------------------------------- # LLMMetaAnalyzer.run_batches (mocked LLM) @@ -1166,6 +1234,46 @@ def test_run_batches_propagates_value_error(self, mock_get_model: MagicMock) -> with pytest.raises(ValueError, match="API key"): LLMMetaAnalyzer(model=self.MODEL) + @patch(MOCK_PATCH_TARGET) + def test_run_batches_skips_failed_batch_keeps_successes( + self, mock_get_model: MagicMock + ) -> None: + """A batch that fails after all retries is skipped; the others still run.""" + ok_result = MetaAnalyzerResult( + findings=[ + MetaAnalyzerFinding( + pattern_id="E1", + is_vulnerability=True, + confidence=0.9, + intent="malicious", + impact="high", + ) + ], + ) + + def fake_invoke(prompt: str) -> MetaAnalyzerResult: + if "code b" in prompt: + raise OutputParserException("no tool call") # retryable; persists -> skipped + return ok_result + + mock_llm = MagicMock() + mock_structured = MagicMock() + mock_get_model.return_value = mock_llm + mock_llm.with_structured_output.return_value = mock_structured + mock_structured.invoke.side_effect = fake_invoke + + analyzer = LLMMetaAnalyzer(model=self.MODEL) + f1 = Finding(rule_id="E1", message="test", file="a.py", start_line=1) + f2 = Finding(rule_id="E2", message="test", file="b.py", start_line=1) + batches = [ + Batch(file_path="a.py", content="code a", findings=[f1]), + Batch(file_path="b.py", content="code b", findings=[f2]), + ] + results = analyzer.run_batches(batches, metadata_text="Name: skill") + # Only the successful batch is returned; the failing one is dropped, not raised. + assert len(results) == 1 + assert results[0][0].file_path == "a.py" + # --------------------------------------------------------------------------- # LLMMetaAnalyzer.arun_batches (async parallel execution) @@ -1206,6 +1314,66 @@ async def test_arun_batches_calls_ainvoke_per_batch(self, mock_get_model: MagicM assert mock_structured.ainvoke.call_count == 2 assert len(results) == 2 + @patch(MOCK_PATCH_TARGET) + async def test_arun_batches_skips_failed_batch_keeps_successes( + self, mock_get_model: MagicMock + ) -> None: + """One batch failing after retries does not abort the whole gather.""" + ok_result = MetaAnalyzerResult( + findings=[ + MetaAnalyzerFinding( + pattern_id="E1", + is_vulnerability=True, + confidence=0.9, + intent="malicious", + impact="high", + ) + ], + ) + + async def fake_ainvoke(prompt: str) -> MetaAnalyzerResult: + if "code b" in prompt: + raise OutputParserException("no tool call") + return ok_result + + mock_llm = MagicMock() + mock_structured = MagicMock() + mock_get_model.return_value = mock_llm + mock_llm.with_structured_output.return_value = mock_structured + mock_structured.ainvoke = AsyncMock(side_effect=fake_ainvoke) + + analyzer = LLMMetaAnalyzer(model=self.MODEL) + f1 = Finding(rule_id="E1", message="test", file="a.py", start_line=1) + f2 = Finding(rule_id="E2", message="test", file="b.py", start_line=1) + batches = [ + Batch(file_path="a.py", content="code a", findings=[f1]), + Batch(file_path="b.py", content="code b", findings=[f2]), + ] + results = await analyzer.arun_batches(batches, metadata_text="Name: skill") + assert len(results) == 1 + assert results[0][0].file_path == "a.py" + + @patch(MOCK_PATCH_TARGET) + async def test_arun_batches_propagates_non_structured_error( + self, mock_get_model: MagicMock + ) -> None: + """A non-StructuredOutputError (e.g. a credential ValueError) still propagates.""" + + async def fake_ainvoke(prompt: str) -> MetaAnalyzerResult: + raise ValueError("No LLM API key configured.") + + mock_llm = MagicMock() + mock_structured = MagicMock() + mock_get_model.return_value = mock_llm + mock_llm.with_structured_output.return_value = mock_structured + mock_structured.ainvoke = AsyncMock(side_effect=fake_ainvoke) + + analyzer = LLMMetaAnalyzer(model=self.MODEL) + f1 = Finding(rule_id="E1", message="test", file="a.py", start_line=1) + batches = [Batch(file_path="a.py", content="code a", findings=[f1])] + with pytest.raises(ValueError, match="API key"): + await analyzer.arun_batches(batches, metadata_text="Name: skill") + @patch(MOCK_PATCH_TARGET) async def test_arun_batches_results_compatible_with_apply_filter( self,