Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 133 additions & 9 deletions src/skillspector/llm_analyzer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
from dataclasses import dataclass, field
from typing import Literal

import openai
from langchain_core.exceptions import OutputParserException
from langchain_core.messages import BaseMessage
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, ValidationError

from skillspector.llm_utils import get_chat_model
from skillspector.logging_config import get_logger
Expand Down Expand Up @@ -219,11 +221,56 @@ def _message_text(response: object) -> str:
- Be precise: report only genuine issues, not speculative ones."""


# Structured-output method. langchain-openai's default is "json_schema", which
# has the model emit the result as ordinary message *content* under a strict
# JSON grammar. On some OpenAI-compatible serving stacks, content-mode JSON
# generation degenerates into a whitespace-repetition loop — the model emits one
# valid finding, then thousands of whitespace tokens until the output cap
# (finish_reason="length"), which the strict parser then discards. This was
# observed across unrelated models (NVIDIA Nemotron, gpt-oss), i.e. it is a
# property of the serving stack's content generation, not any one model. Emitting
# the same data as a *tool call* ("function_calling") avoids the content path and
# eliminates the runaway empirically (0/6 vs 4/6 on a 90-finding meta call). Free
# (non-guided) generation runs away just as badly, so this is specifically about
# tool-call vs content output — not guided-decoding-vs-not.
_STRUCTURED_OUTPUT_METHOD = "function_calling"

# function_calling still occasionally returns tool-call arguments that don't
# satisfy the schema (a fast, retryable ValidationError — not a slow runaway), so
# retry a failed structured call a few times; each attempt is a fresh sample.
_LLM_MAX_RETRIES = 2

# Structured-output failures that a fresh sample usually fixes. All are caught
# per attempt and retried; on the final attempt they are wrapped in
# StructuredOutputError (see its docstring for why wrapping matters).
_RETRYABLE_STRUCTURED_ERRORS = (
openai.LengthFinishReasonError,
OutputParserException,
ValidationError,
)


# ---------------------------------------------------------------------------
# Base LLM Analyzer
# ---------------------------------------------------------------------------


class StructuredOutputError(RuntimeError):
"""A structured LLM call failed after all retries.

Deliberately a ``RuntimeError`` (NOT a ``ValueError``): the analyzer nodes
re-raise ``ValueError`` (to surface credential errors) but fall back on any
other ``Exception``. Two of the underlying failures — ``OutputParserException``
and ``ValidationError`` — *are* ``ValueError`` subclasses, so raising them
directly would be re-raised by the nodes and crash the scan instead of
degrading gracefully. (``openai.LengthFinishReasonError`` is *not* a
``ValueError`` and would already fall through to the nodes' generic
``except Exception``, but it is wrapped here too for uniform handling.)
Wrapping breaks the ``ValueError`` re-raise path so a failed structured
call degrades instead of crashing.
"""


class LLMAnalyzerBase:
"""Per-file / per-chunk LLM analyzer.

Expand All @@ -250,9 +297,63 @@ def __init__(self, base_prompt: str, model: str):
self._input_budget = get_max_input_tokens(model)
self._llm = get_chat_model(model=model)
self._structured_llm = (
self._llm.with_structured_output(self.response_schema) if self.response_schema else None
self._llm.with_structured_output(self.response_schema, method=_STRUCTURED_OUTPUT_METHOD)
if self.response_schema
else None
)

# -- Structured-output invocation (retry) -------------------------------

@staticmethod
def _check_structured_result(result: object) -> object:
"""Treat a ``None`` structured result as a retryable failure.

The model emitted no tool call at all — common on oversized outputs.
``None`` isn't an exception, so surface it as one so the retry loop
re-samples instead of returning ``None`` to ``parse_response``.
"""
if result is None:
raise OutputParserException("structured call returned no tool call")
return result

@staticmethod
def _handle_structured_failure(attempt: int, exc: Exception) -> None:
"""On the final attempt wrap-and-raise; otherwise log and let the caller retry."""
if attempt == _LLM_MAX_RETRIES:
raise StructuredOutputError(str(exc)) from exc
logger.warning(
"structured LLM call failed (attempt %d/%d), retrying: %s",
attempt + 1,
_LLM_MAX_RETRIES + 1,
exc,
)

def _invoke_structured(self, prompt: str):
"""Invoke the structured LLM, retrying on a failed structured response.

Failures are sampling-dependent, so a fresh attempt usually succeeds:
``LengthFinishReasonError`` (content-mode whitespace runaway hitting the
token cap), ``OutputParserException`` (unparseable tool call or no tool
call at all), or ``ValidationError`` (tool-call args that miss the schema).
"""
for attempt in range(_LLM_MAX_RETRIES + 1):
try:
return self._check_structured_result(self._structured_llm.invoke(prompt))
except _RETRYABLE_STRUCTURED_ERRORS as exc:
self._handle_structured_failure(attempt, exc)
# _handle_structured_failure always raises on the final attempt; this is
# an explicit guard so the method never silently returns None.
raise StructuredOutputError("structured call exhausted retries")

async def _ainvoke_structured(self, prompt: str):
"""Async counterpart of :meth:`_invoke_structured`."""
for attempt in range(_LLM_MAX_RETRIES + 1):
try:
return self._check_structured_result(await self._structured_llm.ainvoke(prompt))
except _RETRYABLE_STRUCTURED_ERRORS as exc:
self._handle_structured_failure(attempt, exc)
raise StructuredOutputError("structured call exhausted retries")

# -- Batching -----------------------------------------------------------

def _estimate_extra_overhead(self, findings: list[Finding]) -> int:
Expand Down Expand Up @@ -350,6 +451,12 @@ def run_batches(
The element type of the inner list depends on the subclass: the default
:meth:`parse_response` returns :class:`Finding` objects; subclasses may
return dicts or other types.

A batch whose structured call fails after all retries
(:class:`StructuredOutputError`) is logged and skipped rather than
aborting the whole run, so the batches that did succeed still
contribute their results. Other exceptions (e.g. credential
``ValueError``) propagate.
"""
results: list[tuple[Batch, list]] = []
for batch in batches:
Expand All @@ -360,10 +467,14 @@ def run_batches(
estimate_tokens(prompt),
len(batch.findings),
)
if self._structured_llm:
response = self._structured_llm.invoke(prompt)
else:
response = _message_text(self._llm.invoke(prompt))
try:
if self._structured_llm:
response = self._invoke_structured(prompt)
else:
response = _message_text(self._llm.invoke(prompt))
except StructuredOutputError as exc:
logger.warning("Skipping %s: %s", batch.file_label, exc)
continue
logger.debug("LLM response for %s", batch.file_label)
parsed = self.parse_response(response, batch)
results.append((batch, parsed))
Expand All @@ -382,7 +493,11 @@ async def arun_batches(
*max_concurrency* LLM requests in parallel. Both cross-file and
cross-chunk batches are parallelized in a single gather call.

The return type mirrors :meth:`run_batches`.
The return type mirrors :meth:`run_batches`. As there, a batch whose
structured call fails after all retries (:class:`StructuredOutputError`)
is logged and dropped from the results instead of aborting the whole
gather, so the successful batches still contribute. Other exceptions
propagate.
"""
sem = asyncio.Semaphore(max_concurrency)

Expand All @@ -396,13 +511,22 @@ async def _process(batch: Batch) -> tuple[Batch, list]:
len(batch.findings),
)
if self._structured_llm:
response = await self._structured_llm.ainvoke(prompt)
response = await self._ainvoke_structured(prompt)
else:
response = _message_text(await self._llm.ainvoke(prompt))
logger.debug("LLM response for %s", batch.file_label)
return (batch, self.parse_response(response, batch))

return list(await asyncio.gather(*[_process(b) for b in batches]))
gathered = await asyncio.gather(*[_process(b) for b in batches], return_exceptions=True)
results: list[tuple[Batch, list]] = []
for batch, outcome in zip(batches, gathered, strict=True):
if isinstance(outcome, StructuredOutputError):
logger.warning("Skipping %s: %s", batch.file_label, outcome)
continue
if isinstance(outcome, BaseException):
raise outcome
results.append(outcome)
return results

# -- Convenience --------------------------------------------------------

Expand Down
Loading