Skip to content

Agents API Reference 🧠

This section provides complete technical documentation of the dv-agentic-system multi-agent ecosystem. These specialized agents collaborate to perform specification analysis, code generation, simulation execution, log triage, and final coverage analysis.


Base Class & Configuration

All specialized sub-agents inherit from BaseAgent and are configured via AgentConfig.

base

AgentConfig dataclass

Configuration for an Agent.

Attributes:

Name Type Description
name str

Unique identifier for the agent.

budget int

Maximum number of iterations allowed.

environment Literal['internal', 'external']

Execution context, either "internal" (local) or "external" (remote).

Source code in src/dv_agentic/agents/base.py
@dataclass
class AgentConfig:
    """Configuration for an Agent.

    Attributes:
        name: Unique identifier for the agent.
        budget: Maximum number of iterations allowed.
        environment: Execution context, either "internal" (local) or "external" (remote).
    """

    name: str
    budget: int = 10
    environment: Literal["internal", "external"] = "internal"

    def __post_init__(self) -> None:
        """Validate configuration parameters."""
        if not self.name:
            raise ValueError("Agent name cannot be empty.")
        if self.budget <= 0:
            raise ValueError(f"Agent budget must be positive, got {self.budget}.")
        if self.environment not in ("internal", "external"):
            raise ValueError(f"Invalid environment: {self.environment}")
__post_init__()

Validate configuration parameters.

Source code in src/dv_agentic/agents/base.py
def __post_init__(self) -> None:
    """Validate configuration parameters."""
    if not self.name:
        raise ValueError("Agent name cannot be empty.")
    if self.budget <= 0:
        raise ValueError(f"Agent budget must be positive, got {self.budget}.")
    if self.environment not in ("internal", "external"):
        raise ValueError(f"Invalid environment: {self.environment}")

BaseAgent

Bases: ABC

Abstract base class for all Agents in the UVM system.

Source code in src/dv_agentic/agents/base.py
class BaseAgent(abc.ABC):
    """Abstract base class for all Agents in the UVM system."""

    def __init__(self, config: AgentConfig):
        """Initialize the agent with a configuration."""
        if not isinstance(config, AgentConfig):
            raise TypeError("config must be an instance of AgentConfig")
        self.config = config
        self.iteration = 0

    @abc.abstractmethod
    async def run(self, task_input: str) -> str:
        """Execute the agent's core logic.

        Args:
            task_input: The input string describing the task.

        Returns:
            A string representing the result or next steps.
        """

    async def check_budget(self) -> bool:
        """Check if the agent still has remaining budget to continue iterations.

        Note: Subclasses should prefer calling ``step()`` which both checks
        the budget and increments the iteration counter.
        """
        if self.iteration < 0:
            raise RuntimeError(f"Invalid iteration state: {self.iteration}")
        return self.iteration < self.config.budget

    async def step(self) -> bool:
        """Advance agent by one iteration.

        Returns:
            True if budget remains, False otherwise.
        """
        if await self.check_budget():
            self.iteration += 1
            return True
        return False
__init__(config)

Initialize the agent with a configuration.

Source code in src/dv_agentic/agents/base.py
def __init__(self, config: AgentConfig):
    """Initialize the agent with a configuration."""
    if not isinstance(config, AgentConfig):
        raise TypeError("config must be an instance of AgentConfig")
    self.config = config
    self.iteration = 0
check_budget() async

Check if the agent still has remaining budget to continue iterations.

Note: Subclasses should prefer calling step() which both checks the budget and increments the iteration counter.

Source code in src/dv_agentic/agents/base.py
async def check_budget(self) -> bool:
    """Check if the agent still has remaining budget to continue iterations.

    Note: Subclasses should prefer calling ``step()`` which both checks
    the budget and increments the iteration counter.
    """
    if self.iteration < 0:
        raise RuntimeError(f"Invalid iteration state: {self.iteration}")
    return self.iteration < self.config.budget
run(task_input) abstractmethod async

Execute the agent's core logic.

Parameters:

Name Type Description Default
task_input str

The input string describing the task.

required

Returns:

Type Description
str

A string representing the result or next steps.

Source code in src/dv_agentic/agents/base.py
@abc.abstractmethod
async def run(self, task_input: str) -> str:
    """Execute the agent's core logic.

    Args:
        task_input: The input string describing the task.

    Returns:
        A string representing the result or next steps.
    """
step() async

Advance agent by one iteration.

Returns:

Type Description
bool

True if budget remains, False otherwise.

Source code in src/dv_agentic/agents/base.py
async def step(self) -> bool:
    """Advance agent by one iteration.

    Returns:
        True if budget remains, False otherwise.
    """
    if await self.check_budget():
        self.iteration += 1
        return True
    return False

Orchestrator Agent

The OrchestratorAgent coordinates all task routing, schedules sub-agents, and manages loop safety guardrails.

orchestrator

Orchestrator agent.

Routes tasks to the appropriate workflow and coordinates sub-agent handoffs.

Workflow model

The LLM acts as the decision maker; Python executes the decisions.

Each turn
  1. LLM receives the accumulated history and returns a structured decision.
  2. Python parses: WORKFLOW, ACTION, INPUT, HUMAN_REVIEW.
  3. Python dispatches the action to the appropriate sub-agent.
  4. Sub-agent result is appended to history as a new user message.
  5. Repeat until ACTION is done / escalate, or budget is exhausted.
Dynamic escalation

When the Orchestrator dispatches run_log_analyzer it also tracks the failure_subtype field in the returned :class:~dv_agentic.agents.log_analyzer.FailureSummary. If the subtype shifts between consecutive log-analyzer calls (e.g. missing_timescaleunmatched_block) the Orchestrator escalates immediately instead of iterating further. A shifting error space indicates that each fix is revealing a new root-cause rather than converging, which means additional iterations are unlikely to produce a passing simulation and token budget is better spent on human diagnosis.

Valid actions

run_code_generator, run_sim_controller, run_log_analyzer, run_coverage_analyst, run_bug_classifier, run_spec_analyst, run_reporter, done, escalate

Expected LLM response format::

### Decision
WORKFLOW: 1
ACTION: run_code_generator
INPUT: Generate a targeted sequence for axi_burst bin

### Human Review Required
NO

OrchestratorAgent

Bases: BaseAgent

Routes tasks and coordinates sub-agents across Workflows 1, 2, and 3.

Each budget unit corresponds to one LLM routing call + one sub-agent dispatch. Sub-agents consume their own budgets independently.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration. budget caps orchestration cycles.

required
llm BaseLLMClient

LLM client used for routing decisions.

required
sub_agents dict[str, BaseAgent] | None

Mapping from agent key to agent instance, e.g. {"code_generator": CodeGeneratorAgent(...), ...}. Missing keys are handled gracefully.

None
project_config ProjectContext | None

Optional context for PromptLoader enrichment.

None
session SessionState | None

Optional session state.

None
prompts_dir str | Path | None

Directory containing orchestrator.md.

None
Source code in src/dv_agentic/agents/orchestrator.py
class OrchestratorAgent(BaseAgent):
    """Routes tasks and coordinates sub-agents across Workflows 1, 2, and 3.

    Each budget unit corresponds to one LLM routing call + one sub-agent
    dispatch.  Sub-agents consume their own budgets independently.

    Args:
        config: Agent configuration.  ``budget`` caps orchestration cycles.
        llm: LLM client used for routing decisions.
        sub_agents: Mapping from agent key to agent instance, e.g.
            ``{"code_generator": CodeGeneratorAgent(...), ...}``.
            Missing keys are handled gracefully.
        project_config: Optional context for PromptLoader enrichment.
        session: Optional session state.
        prompts_dir: Directory containing ``orchestrator.md``.
    """

    VALID_ACTIONS: frozenset[str] = frozenset(
        {
            "run_code_generator",
            "run_sim_controller",
            "run_log_analyzer",
            "run_coverage_analyst",
            "run_bug_classifier",
            "run_spec_analyst",
            "run_reporter",
            "done",
            "escalate",
        }
    )

    _AGENT_KEY: ClassVar[dict[str, str]] = {
        "run_code_generator": "code_generator",
        "run_sim_controller": "sim_controller",
        "run_log_analyzer": "log_analyzer",
        "run_coverage_analyst": "coverage_analyst",
        "run_bug_classifier": "bug_classifier",
        "run_spec_analyst": "spec_analyst",
        "run_reporter": "reporter",
    }

    _WORKFLOW_RE = re.compile(r"WORKFLOW\s*[:\-]?\s*([123])", re.IGNORECASE)
    _ACTION_RE = re.compile(r"ACTION\s*:\s*(" + "|".join(VALID_ACTIONS) + r")", re.IGNORECASE)
    _INPUT_RE = re.compile(r"INPUT\s*:(.*?)(?=\n[A-Z_]+\s*:|\n###|\Z)", re.DOTALL)
    _HUMAN_RE = re.compile(
        r"Human\s+Review\s+Required\s*\n(YES|NO)(.*?)(?=\n###|\Z)",
        re.IGNORECASE | re.DOTALL,
    )
    _FAILURE_SUBTYPE_RE = re.compile(r"failure_subtype\s+:\s+(\S+)", re.IGNORECASE)

    def __init__(
        self,
        config: AgentConfig,
        llm: BaseLLMClient,
        sub_agents: dict[str, BaseAgent] | None = None,
        project_config: ProjectContext | None = None,
        session: SessionState | None = None,
        prompts_dir: str | Path | None = None,
    ) -> None:
        super().__init__(config)
        self.llm = llm
        self.sub_agents: dict[str, BaseAgent] = sub_agents or {}
        self.project_config = project_config
        self.session = session
        self.prompts_dir = prompts_dir

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str) -> str:
        """Execute the full agentic verification flow.

        Args:
            task_input: Natural language description of the verification task.

        Returns:
            A human-readable final summary report.
        """
        if not task_input or not isinstance(task_input, str):
            raise ValueError("task_input must be a non-empty string")

        task_id = self._extract_task_id(task_input)
        system_prompt = self._load_system_prompt()

        if not system_prompt:
            raise RuntimeError("System prompt must not be empty")
        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        history: list[dict[str, str]] = [{"role": "user", "content": task_input}]

        workflow = "unknown"
        steps: list[str] = []
        # Dynamic escalation: track failure_subtype from consecutive log-analyzer calls.
        # Populated only when action == "run_log_analyzer"; not reset between iterations
        # so we can compare the current subtype with the previous one.
        _failure_subtype_history: list[str] = []

        while await self.step():
            response = await self.llm.complete(system_prompt, history, max_tokens=1000)
            history.append({"role": "assistant", "content": response})

            decision = self._parse_decision(response)
            if decision.workflow != "unknown":
                workflow = decision.workflow

            logger.info(
                "Orchestrator iter=%d action=%s workflow=%s human_review=%s",
                self.iteration,
                decision.action,
                workflow,
                decision.human_review,
            )

            if decision.human_review:
                return OrchestratorResult(
                    task_id=task_id,
                    workflow=workflow,
                    final_status="escalated",
                    steps=steps,
                    requires_human_review=True,
                    human_review_reason=decision.human_review_reason,
                ).to_str()

            if decision.action == "done":
                return OrchestratorResult(
                    task_id=task_id,
                    workflow=workflow,
                    final_status="done",
                    steps=steps,
                ).to_str()

            if decision.action == "escalate":
                return OrchestratorResult(
                    task_id=task_id,
                    workflow=workflow,
                    final_status="escalated",
                    steps=steps,
                    requires_human_review=True,
                    human_review_reason="LLM requested escalation.",
                ).to_str()

            # Dispatch to sub-agent
            sub_result = await self._dispatch(decision.action, decision.input_text)

            # ----------------------------------------------------------------
            # Dynamic escalation: detect shifting failure subtypes.
            # When run_log_analyzer returns a different failure_subtype than the
            # previous call we know each fix is revealing a new root-cause rather
            # than converging.  Continuing to iterate would burn token budget
            # without making progress — escalate immediately.
            # ----------------------------------------------------------------
            if decision.action == "run_log_analyzer":
                current_subtype = self._extract_failure_subtype(sub_result)
                if _failure_subtype_history and _failure_subtype_history[-1] != current_subtype:
                    prev = _failure_subtype_history[-1]
                    reason = (
                        f"Failure type shifted from '{prev}' to '{current_subtype}' "
                        f"across iterations — iterating is unlikely to converge. "
                        f"Manual diagnosis required."
                    )
                    logger.warning(
                        "Orchestrator: failure subtype shifted %s%s at iter=%d; escalating",
                        prev,
                        current_subtype,
                        self.iteration,
                    )
                    steps.append(f"run_log_analyzer: {sub_result[:120].strip()}")
                    return OrchestratorResult(
                        task_id=task_id,
                        workflow=workflow,
                        final_status="escalated",
                        steps=steps,
                        requires_human_review=True,
                        human_review_reason=reason,
                    ).to_str()
                _failure_subtype_history.append(current_subtype)

            step_label = f"{decision.action}: {sub_result[:120].strip()}"
            steps.append(step_label)

            # Feed result back to LLM for the next routing decision
            history.append(
                {
                    "role": "user",
                    "content": (
                        f"Result of {decision.action}:\n{sub_result}\n\n"
                        "Based on this result, what is the next action?"
                    ),
                }
            )

        return OrchestratorResult(
            task_id=task_id,
            workflow=workflow,
            final_status="budget_exhausted",
            steps=steps,
            requires_human_review=True,
            human_review_reason=f"Budget exhausted after {self.iteration} iterations.",
        ).to_str()

    # ------------------------------------------------------------------
    # Private — dispatch
    # ------------------------------------------------------------------

    async def _dispatch(self, action: str, input_text: str) -> str:
        """Call the sub-agent associated with *action*.

        Args:
            action: One of ``VALID_ACTIONS`` (excluding ``done``/``escalate``).
            input_text: Input forwarded to the sub-agent's ``run()`` method.

        Returns:
            The sub-agent's output string, or an informative error message
            if the agent is not configured.
        """
        if not action or not isinstance(action, str):
            raise ValueError("action must be a non-empty string")
        if not isinstance(input_text, str):
            raise TypeError("input_text must be a string")

        if action not in self.VALID_ACTIONS:
            raise ValueError(f"Action '{action}' is not valid")

        key = self._AGENT_KEY.get(action)
        if not key:
            return f"No sub-agent mapping for action '{action}'."

        agent = self.sub_agents.get(key)
        if not agent:
            return f"Sub-agent '{key}' is not configured in this orchestrator. Skipping."

        logger.info("Orchestrator dispatching to '%s'", key)
        try:
            return await agent.run(input_text)
        except Exception as exc:
            # Catch all sub-agent exceptions to prevent the orchestrator from crashing.
            # Errors are logged and returned as a string for LLM feedback.
            logger.exception("Sub-agent '%s' raised an exception", key)
            return f"Sub-agent '{key}' failed: {exc}"

    # ------------------------------------------------------------------
    # Private — parsing
    # ------------------------------------------------------------------

    def _parse_decision(self, response: str) -> _Decision:
        if not response or not isinstance(response, str):
            raise ValueError("LLM response must be a non-empty string")

        workflow = "unknown"
        m = self._WORKFLOW_RE.search(response)
        if m:
            workflow = m.group(1)

        action = "escalate"  # safe default
        m = self._ACTION_RE.search(response)
        if m:
            action = m.group(1).lower()

        input_text = ""
        m = self._INPUT_RE.search(response)
        if m:
            input_text = m.group(1).strip()

        human_review = False
        human_review_reason = ""
        m = self._HUMAN_RE.search(response)
        if m:
            human_review = m.group(1).upper() == "YES"
            human_review_reason = m.group(2).strip() if m.group(2) else ""

        decision = _Decision(
            workflow=workflow,
            action=action,
            input_text=input_text,
            human_review=human_review,
            human_review_reason=human_review_reason,
        )

        # Rule 5: Post-condition assertions
        assert decision.workflow in ("1", "2", "3", "unknown")
        assert decision.action in self.VALID_ACTIONS
        return decision

    def _extract_failure_subtype(self, log_analyzer_output: str) -> str:
        """Parse the ``failure_subtype`` field from a :class:`FailureSummary` string.

        Args:
            log_analyzer_output: The string returned by ``LogAnalyzerAgent.run()``.

        Returns:
            The subtype token (e.g. ``"missing_timescale"``), or ``"unknown"``
            if the field is absent (e.g. when the sub-agent itself errored).
        """
        m = self._FAILURE_SUBTYPE_RE.search(log_analyzer_output)
        return m.group(1) if m else "unknown"

    def _load_system_prompt(self) -> str:
        try:
            loader = PromptLoader(
                prompts_dir=self.prompts_dir,
                project_config=self.project_config,
                session=self.session,
            )
            return loader.load("orchestrator")
        except (FileNotFoundError, RuntimeError) as exc:
            logger.warning("PromptLoader unavailable (%s); using fallback.", exc)
            return (
                "You are an orchestration agent for hardware verification. "
                "Given a task, determine the workflow (1, 2, or 3) and the next action.\n"
                "Always respond in this format:\n"
                "### Decision\n"
                "WORKFLOW: 1\n"
                "ACTION: run_code_generator\n"
                "INPUT: <what to pass to the agent>\n\n"
                "### Human Review Required\n"
                "NO\n"
            )

    @staticmethod
    def _extract_task_id(text: str) -> str:
        m = re.search(r"task[_\s]id\s*[:\s]+([a-zA-Z0-9_\-]+)", text, re.IGNORECASE)
        return m.group(1) if m else "orchestrator_task"
run(task_input) async

Execute the full agentic verification flow.

Parameters:

Name Type Description Default
task_input str

Natural language description of the verification task.

required

Returns:

Type Description
str

A human-readable final summary report.

Source code in src/dv_agentic/agents/orchestrator.py
async def run(self, task_input: str) -> str:
    """Execute the full agentic verification flow.

    Args:
        task_input: Natural language description of the verification task.

    Returns:
        A human-readable final summary report.
    """
    if not task_input or not isinstance(task_input, str):
        raise ValueError("task_input must be a non-empty string")

    task_id = self._extract_task_id(task_input)
    system_prompt = self._load_system_prompt()

    if not system_prompt:
        raise RuntimeError("System prompt must not be empty")
    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    history: list[dict[str, str]] = [{"role": "user", "content": task_input}]

    workflow = "unknown"
    steps: list[str] = []
    # Dynamic escalation: track failure_subtype from consecutive log-analyzer calls.
    # Populated only when action == "run_log_analyzer"; not reset between iterations
    # so we can compare the current subtype with the previous one.
    _failure_subtype_history: list[str] = []

    while await self.step():
        response = await self.llm.complete(system_prompt, history, max_tokens=1000)
        history.append({"role": "assistant", "content": response})

        decision = self._parse_decision(response)
        if decision.workflow != "unknown":
            workflow = decision.workflow

        logger.info(
            "Orchestrator iter=%d action=%s workflow=%s human_review=%s",
            self.iteration,
            decision.action,
            workflow,
            decision.human_review,
        )

        if decision.human_review:
            return OrchestratorResult(
                task_id=task_id,
                workflow=workflow,
                final_status="escalated",
                steps=steps,
                requires_human_review=True,
                human_review_reason=decision.human_review_reason,
            ).to_str()

        if decision.action == "done":
            return OrchestratorResult(
                task_id=task_id,
                workflow=workflow,
                final_status="done",
                steps=steps,
            ).to_str()

        if decision.action == "escalate":
            return OrchestratorResult(
                task_id=task_id,
                workflow=workflow,
                final_status="escalated",
                steps=steps,
                requires_human_review=True,
                human_review_reason="LLM requested escalation.",
            ).to_str()

        # Dispatch to sub-agent
        sub_result = await self._dispatch(decision.action, decision.input_text)

        # ----------------------------------------------------------------
        # Dynamic escalation: detect shifting failure subtypes.
        # When run_log_analyzer returns a different failure_subtype than the
        # previous call we know each fix is revealing a new root-cause rather
        # than converging.  Continuing to iterate would burn token budget
        # without making progress — escalate immediately.
        # ----------------------------------------------------------------
        if decision.action == "run_log_analyzer":
            current_subtype = self._extract_failure_subtype(sub_result)
            if _failure_subtype_history and _failure_subtype_history[-1] != current_subtype:
                prev = _failure_subtype_history[-1]
                reason = (
                    f"Failure type shifted from '{prev}' to '{current_subtype}' "
                    f"across iterations — iterating is unlikely to converge. "
                    f"Manual diagnosis required."
                )
                logger.warning(
                    "Orchestrator: failure subtype shifted %s%s at iter=%d; escalating",
                    prev,
                    current_subtype,
                    self.iteration,
                )
                steps.append(f"run_log_analyzer: {sub_result[:120].strip()}")
                return OrchestratorResult(
                    task_id=task_id,
                    workflow=workflow,
                    final_status="escalated",
                    steps=steps,
                    requires_human_review=True,
                    human_review_reason=reason,
                ).to_str()
            _failure_subtype_history.append(current_subtype)

        step_label = f"{decision.action}: {sub_result[:120].strip()}"
        steps.append(step_label)

        # Feed result back to LLM for the next routing decision
        history.append(
            {
                "role": "user",
                "content": (
                    f"Result of {decision.action}:\n{sub_result}\n\n"
                    "Based on this result, what is the next action?"
                ),
            }
        )

    return OrchestratorResult(
        task_id=task_id,
        workflow=workflow,
        final_status="budget_exhausted",
        steps=steps,
        requires_human_review=True,
        human_review_reason=f"Budget exhausted after {self.iteration} iterations.",
    ).to_str()

OrchestratorResult dataclass

Structured output from :class:OrchestratorAgent.

Attributes:

Name Type Description
task_id str

Unique identifier for the orchestrated task.

workflow str

The detected workflow category ("1", "2", or "3").

final_status str

Termination state ("done", "escalated", or "budget_exhausted").

steps list[str]

List of summary strings for each sub-agent dispatch.

requires_human_review bool

True if the process stopped for manual intervention.

human_review_reason str

Explanation for why review is required.

Source code in src/dv_agentic/agents/orchestrator.py
@dataclass
class OrchestratorResult:
    """Structured output from :class:`OrchestratorAgent`.

    Attributes:
        task_id: Unique identifier for the orchestrated task.
        workflow: The detected workflow category ("1", "2", or "3").
        final_status: Termination state ("done", "escalated", or "budget_exhausted").
        steps: List of summary strings for each sub-agent dispatch.
        requires_human_review: True if the process stopped for manual intervention.
        human_review_reason: Explanation for why review is required.
    """

    task_id: str
    workflow: str
    final_status: str  # "done" | "escalated" | "budget_exhausted"
    steps: list[str] = field(default_factory=list)
    requires_human_review: bool = False
    human_review_reason: str = ""

    def to_str(self) -> str:
        lines = [
            "### Orchestrator Result",
            f"task_id      : {self.task_id}",
            f"workflow     : {self.workflow}",
            f"final_status : {self.final_status}",
            f"steps_taken  : {len(self.steps)}",
            f"human_review : {'YES' if self.requires_human_review else 'NO'}",
        ]
        if self.human_review_reason:
            lines.append(f"review_reason: {self.human_review_reason}")
        if self.steps:
            lines.append("steps        :")
            for s in self.steps:
                lines.append(f"  - {s}")
        return "\n".join(lines)

Code Generator Agent

The CodeGeneratorAgent generates stimulus sequences, testbenches, and checkers utilizing UVM/pyuvm constructs.

code_generator

SV/UVM testbench code generation agent.

Scope boundary

This agent operates exclusively on testbench files (sequences, scoreboards, coverage groups, monitors, drivers, agents, env). RTL source files are strictly read-only — this agent must never create or modify them.

The allowed_dirs constructor parameter enforces this at write time. When set, any file path whose top-level directory is not in the whitelist raises a ValueError before a byte is written to disk. .. traversal is always blocked regardless of allowed_dirs.

Workflow
  1. Load the enriched code_generator system prompt via PromptLoader.
  2. Send the task description as the first user message.
  3. Parse the LLM response for ### Compile Confidence.
  4. HIGH or MEDIUM → extract code, write files, return report.
  5. LOW or UNKNOWN → append open questions as a follow-up user message, repeat from step 3.
  6. Stop when confidence passes or AgentConfig.budget is exhausted.

CodeGeneratorAgent

Bases: BaseAgent

Generates and modifies SV/UVM testbench code through multi-turn LLM dialogue.

Terminates when the LLM reports HIGH or MEDIUM compile confidence or when AgentConfig.budget iterations are exhausted.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration.

required
llm BaseLLMClient

Any :class:BaseLLMClient.

required
project_config ProjectContext | None

Optional project context for PromptLoader enrichment.

None
session SessionState | None

Optional session state injected into the system prompt.

None
prompts_dir Path | str | None

Directory containing code_generator.md.

None
workspace_dir str

Root directory where generated files are written.

'.'
allowed_dirs frozenset[str] | set[str] | None

Whitelist of top-level directory names the agent may write into. None (default) disables the check — use :data:DEFAULT_TB_ALLOWED_DIRS in production. .. traversal is always blocked regardless of this setting.

None
Source code in src/dv_agentic/agents/code_generator.py
class CodeGeneratorAgent(BaseAgent):
    """Generates and modifies SV/UVM **testbench** code through multi-turn LLM dialogue.

    Terminates when the LLM reports HIGH or MEDIUM compile confidence or when
    ``AgentConfig.budget`` iterations are exhausted.

    Args:
        config: Agent configuration.
        llm: Any :class:`BaseLLMClient`.
        project_config: Optional project context for PromptLoader enrichment.
        session: Optional session state injected into the system prompt.
        prompts_dir: Directory containing ``code_generator.md``.
        workspace_dir: Root directory where generated files are written.
        allowed_dirs: Whitelist of top-level directory names the agent may
            write into.  ``None`` (default) disables the check — use
            :data:`DEFAULT_TB_ALLOWED_DIRS` in production.  ``..`` traversal
            is always blocked regardless of this setting.
    """

    #: Confidence levels that signal a passing self-review.
    PASS_CONFIDENCE: frozenset[str] = frozenset({"HIGH", "MEDIUM"})

    _SECTION_RE = re.compile(r"^###\s+(.+)$", re.MULTILINE)
    _CODE_BLOCK_RE = re.compile(r"```(?:\w+)?\n(.*?)```", re.DOTALL)
    _CONFIDENCE_RE = re.compile(r"\b(HIGH|MEDIUM|LOW)\b", re.IGNORECASE)
    _FILE_PATH_RE = re.compile(r"`([^`]+\.[a-zA-Z]+)`")
    _FILE_MARKER_RE = re.compile(
        r"^(?://|#)\s*(?:file:|===)\s*(.+?)(?:\s*===)?\s*$",
        re.IGNORECASE | re.MULTILINE,
    )

    def __init__(
        self,
        config: AgentConfig,
        llm: BaseLLMClient,
        project_config: ProjectContext | None = None,
        session: SessionState | None = None,
        prompts_dir: Path | str | None = None,
        workspace_dir: str = ".",
        allowed_dirs: frozenset[str] | set[str] | None = None,
    ) -> None:
        super().__init__(config)
        self.llm = llm
        self.project_config = project_config
        self.session = session
        self.prompts_dir = Path(prompts_dir) if prompts_dir else None
        self.workspace_dir = Path(workspace_dir)
        # Freeze for safety; None means "no restriction" (test / custom use)
        self.allowed_dirs: frozenset[str] | None = (
            frozenset(allowed_dirs) if allowed_dirs is not None else None
        )

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str | CodeTask) -> str:
        """Run the code generation loop.

        Args:
            task_input: A :class:`CodeTask` or a plain string description.
                When a plain string is given, ``task_id`` defaults to
                ``"codegen_task"``.

        Returns:
            A formatted :class:`CodeReport` string.
        """
        if not task_input or not isinstance(task_input, str | CodeTask):
            raise ValueError("task_input must be a non-empty string or CodeTask")

        task = self._parse_task(task_input)
        system_prompt = self._load_system_prompt()

        if not system_prompt:
            raise RuntimeError("System prompt must not be empty")
        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        history: list[dict[str, str]] = [{"role": "user", "content": task.description}]
        files_written: list[str] = []
        last_parsed: ParsedResponse | None = None

        while await self.step():
            logger.info("CodeGenerator iter=%d task_id=%s", self.iteration, task.task_id)
            response = await self.llm.complete(system_prompt, history, max_tokens=4000)
            history.append({"role": "assistant", "content": response})

            last_parsed = self._parse_response(response)
            logger.info("Confidence=%s iter=%d", last_parsed.confidence, self.iteration)

            if last_parsed.confidence in self.PASS_CONFIDENCE:
                written = self._write_files(last_parsed.file_specs, str(self.workspace_dir))
                files_written.extend(written)
                return CodeReport(
                    task_id=task.task_id,
                    final_status="pass",
                    iterations=self.iteration,
                    files_written=files_written,
                    confidence=last_parsed.confidence,
                    summary=last_parsed.summary,
                    open_questions=last_parsed.open_questions,
                ).to_str()

            # LOW / UNKNOWN: feed open questions back as a follow-up
            history.append({"role": "user", "content": self._build_follow_up(last_parsed)})

        # Budget exhausted — persist whatever the last iteration produced
        if last_parsed and last_parsed.file_specs:
            written = self._write_files(last_parsed.file_specs, str(self.workspace_dir))
            files_written.extend(written)

        return CodeReport(
            task_id=task.task_id,
            final_status="budget_exhausted",
            iterations=self.iteration,
            files_written=files_written,
            confidence=last_parsed.confidence if last_parsed else "UNKNOWN",
            summary=last_parsed.summary if last_parsed else "",
            open_questions=last_parsed.open_questions if last_parsed else "",
        ).to_str()

    # ------------------------------------------------------------------
    # Private — task parsing
    # ------------------------------------------------------------------

    @staticmethod
    def _parse_task(task_input: str | CodeTask) -> CodeTask:
        if isinstance(task_input, CodeTask):
            return task_input
        return CodeTask(task_id="codegen_task", description=task_input)

    # ------------------------------------------------------------------
    # Private — system prompt
    # ------------------------------------------------------------------

    def _load_system_prompt(self) -> str:
        """Load and enrich the code_generator prompt; fall back if unavailable."""
        try:
            loader = PromptLoader(
                prompts_dir=self.prompts_dir,
                project_config=self.project_config,
                session=self.session,
            )
            return loader.load("code_generator")
        except (FileNotFoundError, RuntimeError) as exc:
            logger.warning("PromptLoader unavailable (%s); using minimal fallback prompt.", exc)
            return (
                "You are a SystemVerilog / UVM testbench code generation specialist. "
                "Generate correct, simulation-ready SV/UVM testbench code. "
                "Never modify RTL source files. "
                "Always end your response with:\n"
                "### Compile Confidence\n"
                "HIGH | MEDIUM | LOW — brief justification."
            )

    # ------------------------------------------------------------------
    # Private — response parsing
    # ------------------------------------------------------------------

    def _parse_response(self, response: str) -> ParsedResponse:
        """Extract structured fields from one LLM response."""
        if not response or not isinstance(response, str):
            raise ValueError("LLM response must be a non-empty string")

        sections = self._split_sections(response)

        summary = sections.get("Summary", "").strip()
        changed_files_text = sections.get("Changed Files", "")
        code_text = sections.get("Code", "")
        open_questions = sections.get("Open Questions", "").strip()
        confidence_text = sections.get("Compile Confidence", "")

        m = self._CONFIDENCE_RE.search(confidence_text)
        confidence = m.group(1).upper() if m else "UNKNOWN"
        file_paths = self._FILE_PATH_RE.findall(changed_files_text)
        file_specs = self._extract_file_specs(code_text, file_paths)

        parsed = ParsedResponse(
            summary=summary,
            changed_file_paths=file_paths,
            file_specs=file_specs,
            open_questions=open_questions,
            confidence=confidence,
            confidence_reason=confidence_text.strip(),
            raw=response,
        )

        if parsed.confidence not in ("HIGH", "MEDIUM", "LOW", "UNKNOWN"):
            raise ValueError(f"Invalid confidence level extracted: {parsed.confidence}")
        if not isinstance(parsed.file_specs, list):
            raise TypeError("file_specs must be a list")
        return parsed

    def _split_sections(self, text: str) -> dict[str, str]:
        """Split a response string by ``### `` headers."""
        result: dict[str, str] = {}
        matches = list(self._SECTION_RE.finditer(text))
        for i, match in enumerate(matches):
            name = match.group(1).strip()
            start = match.end()
            end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
            result[name] = text[start:end]
        return result

    def _extract_file_specs(self, code_text: str, file_paths: list[str]) -> list[FileSpec]:
        """Map code blocks to :class:`FileSpec` objects.

        Tries three strategies in order:

        1. **N blocks : N paths** — one code block per changed file path.
        2. **File markers** — single block with ``// file: path`` markers.
        3. **Fallback** — single block assigned to the first changed file path.
        """
        code_blocks = self._CODE_BLOCK_RE.findall(code_text)
        if not code_blocks:
            return []

        # Strategy 1: one block per path
        if len(code_blocks) == len(file_paths) and file_paths:
            return [
                FileSpec(path=p, content=c.strip())
                for p, c in zip(file_paths, code_blocks, strict=True)
            ]

        # Strategy 2: file markers inside a single block
        specs = self._split_by_markers(code_blocks[0])
        if specs:
            return specs

        # Strategy 3: fallback
        if file_paths:
            return [FileSpec(path=file_paths[0], content=code_blocks[0].strip())]

        return []

    def _split_by_markers(self, code: str) -> list[FileSpec]:
        """Split *code* on ``// file: path`` or ``// === path ===`` markers."""
        # re.split with a capturing group → [pre, path, content, path, content, ...]
        parts = self._FILE_MARKER_RE.split(code)
        if len(parts) < 3:  # no marker found
            return []

        specs: list[FileSpec] = []
        for i in range(1, len(parts), 2):
            if i + 1 < len(parts):
                specs.append(FileSpec(path=parts[i].strip(), content=parts[i + 1].strip()))
        return specs

    # ------------------------------------------------------------------
    # Private — path validation  ← NEW
    # ------------------------------------------------------------------

    def _validate_path(self, spec_path: str) -> None:
        """Validate that *spec_path* is safe to write.

        Two rules, applied unconditionally:

        1. **No traversal**: ``..`` anywhere in the path is rejected.
        2. **Whitelist** (only when ``self.allowed_dirs`` is set): the
           top-level directory component must be in the whitelist.  Flat
           paths (no directory component) are always allowed.

        Args:
            spec_path: Relative path as returned by the LLM.

        Raises:
            ValueError: If the path fails either check.
        """
        p = Path(spec_path)

        # Check for Windows drive specifier (e.g. C:) even on non-Windows/Linux hosts
        has_win_drive = len(spec_path) >= 2 and spec_path[1] == ":" and spec_path[0].isalpha()

        # Rule 1: block traversal and absolute paths regardless of whitelist
        if (
            p.is_absolute()
            or spec_path.startswith("/")
            or spec_path.startswith("\\")
            or has_win_drive
            or ".." in p.parts
        ):
            raise ValueError(
                f"Absolute paths and path traversal are not allowed: '{spec_path}'. "
                "The LLM must use relative paths within the workspace and "
                "not use '..' to escape it."
            )

        # Rule 2: whitelist check (only when configured)
        if self.allowed_dirs is not None and len(p.parts) > 1:
            top = p.parts[0]
            if top not in self.allowed_dirs:
                raise ValueError(
                    f"Path '{spec_path}' targets directory '{top}' which is outside "
                    f"the allowed testbench directories: {sorted(self.allowed_dirs)}. "
                    "RTL source files are read-only — this agent must not modify them."
                )

    # ------------------------------------------------------------------
    # Private — follow-up message + file writing
    # ------------------------------------------------------------------

    @staticmethod
    def _build_follow_up(parsed: ParsedResponse) -> str:
        lines = [
            "Your compile confidence was LOW or could not be determined. "
            "Please revise the code to address the following issues:",
            "",
            parsed.open_questions
            if parsed.open_questions
            else (
                "Review against the self-review checklist in the system prompt "
                "and fix any outstanding issues."
            ),
            "",
            "Provide the complete revised code with the same output format "
            "and a new ### Compile Confidence assessment.",
        ]
        return "\n".join(lines)

    def _write_files(self, file_specs: list[FileSpec], workspace_dir: str) -> list[str]:
        """Write *file_specs* under *workspace_dir*; return written paths.

        Every path is validated via :meth:`_validate_path` before any file
        system operation takes place.
        """
        if not workspace_dir:
            raise ValueError("workspace_dir must not be empty")
        if not isinstance(file_specs, list):
            raise TypeError("file_specs must be a list")

        base = Path(workspace_dir)
        if not base.exists():
            raise FileNotFoundError(f"Workspace directory {base} must exist")

        written: list[str] = []
        for spec in file_specs:
            if not spec.path:
                raise ValueError("File spec must have a path")
            if spec.content is None:
                raise ValueError(f"File spec {spec.path} must have content")

            self._validate_path(spec.path)

            target = base / spec.path
            target.parent.mkdir(parents=True, exist_ok=True)
            target.write_text(spec.content, encoding="utf-8")
            logger.info("Wrote %s (%d chars)", target, len(spec.content))
            written.append(str(target))

        if len(written) != len(file_specs):
            msg = f"File write mismatch: {len(written)} written, {len(file_specs)} expected"
            raise RuntimeError(msg)
        return written
run(task_input) async

Run the code generation loop.

Parameters:

Name Type Description Default
task_input str | CodeTask

A :class:CodeTask or a plain string description. When a plain string is given, task_id defaults to "codegen_task".

required

Returns:

Type Description
str

A formatted :class:CodeReport string.

Source code in src/dv_agentic/agents/code_generator.py
async def run(self, task_input: str | CodeTask) -> str:
    """Run the code generation loop.

    Args:
        task_input: A :class:`CodeTask` or a plain string description.
            When a plain string is given, ``task_id`` defaults to
            ``"codegen_task"``.

    Returns:
        A formatted :class:`CodeReport` string.
    """
    if not task_input or not isinstance(task_input, str | CodeTask):
        raise ValueError("task_input must be a non-empty string or CodeTask")

    task = self._parse_task(task_input)
    system_prompt = self._load_system_prompt()

    if not system_prompt:
        raise RuntimeError("System prompt must not be empty")
    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    history: list[dict[str, str]] = [{"role": "user", "content": task.description}]
    files_written: list[str] = []
    last_parsed: ParsedResponse | None = None

    while await self.step():
        logger.info("CodeGenerator iter=%d task_id=%s", self.iteration, task.task_id)
        response = await self.llm.complete(system_prompt, history, max_tokens=4000)
        history.append({"role": "assistant", "content": response})

        last_parsed = self._parse_response(response)
        logger.info("Confidence=%s iter=%d", last_parsed.confidence, self.iteration)

        if last_parsed.confidence in self.PASS_CONFIDENCE:
            written = self._write_files(last_parsed.file_specs, str(self.workspace_dir))
            files_written.extend(written)
            return CodeReport(
                task_id=task.task_id,
                final_status="pass",
                iterations=self.iteration,
                files_written=files_written,
                confidence=last_parsed.confidence,
                summary=last_parsed.summary,
                open_questions=last_parsed.open_questions,
            ).to_str()

        # LOW / UNKNOWN: feed open questions back as a follow-up
        history.append({"role": "user", "content": self._build_follow_up(last_parsed)})

    # Budget exhausted — persist whatever the last iteration produced
    if last_parsed and last_parsed.file_specs:
        written = self._write_files(last_parsed.file_specs, str(self.workspace_dir))
        files_written.extend(written)

    return CodeReport(
        task_id=task.task_id,
        final_status="budget_exhausted",
        iterations=self.iteration,
        files_written=files_written,
        confidence=last_parsed.confidence if last_parsed else "UNKNOWN",
        summary=last_parsed.summary if last_parsed else "",
        open_questions=last_parsed.open_questions if last_parsed else "",
    ).to_str()

CodeReport dataclass

Structured output from a completed :class:CodeGeneratorAgent run.

Attributes:

Name Type Description
task_id str

Unique identifier for the code generation task.

final_status str

Termination state ("pass" or "budget_exhausted").

iterations int

Total number of LLM calls made.

files_written list[str]

List of absolute paths to the files written to disk.

confidence str

Final self-reported confidence from the LLM.

summary str

Final summary of the changes implemented.

open_questions str

Remaining questions or issues if status is not "pass".

Source code in src/dv_agentic/agents/code_generator.py
@dataclass
class CodeReport:
    """Structured output from a completed :class:`CodeGeneratorAgent` run.

    Attributes:
        task_id: Unique identifier for the code generation task.
        final_status: Termination state ("pass" or "budget_exhausted").
        iterations: Total number of LLM calls made.
        files_written: List of absolute paths to the files written to disk.
        confidence: Final self-reported confidence from the LLM.
        summary: Final summary of the changes implemented.
        open_questions: Remaining questions or issues if status is not "pass".
    """

    task_id: str
    final_status: str  # "pass" | "budget_exhausted"
    iterations: int
    files_written: list[str] = field(default_factory=list)
    confidence: str = "UNKNOWN"
    summary: str = ""
    open_questions: str = ""

    def to_str(self) -> str:
        status_note = "✓" if self.final_status == "pass" else "⚠ budget exhausted"
        lines = [
            "### Code Generation Report",
            f"task_id      : {self.task_id}",
            f"final_status : {self.final_status}  {status_note}",
            f"iterations   : {self.iterations}",
            f"confidence   : {self.confidence}",
        ]
        if self.files_written:
            lines.append("files_written :")
            for f in self.files_written:
                lines.append(f"  - {f}")
        if self.summary:
            lines += ["", "### Summary", self.summary]
        if self.open_questions:
            lines += ["", "### Open Questions", self.open_questions]
        return "\n".join(lines)

CodeTask dataclass

Input specification for a single CodeGeneratorAgent run.

Attributes:

Name Type Description
task_id str

Unique identifier used in log messages and reports.

description str

Natural-language task for the LLM, e.g. "Generate a sequence targeting the back-pressure bin in axi_write_cov.hit_max_outstanding".

Source code in src/dv_agentic/agents/code_generator.py
@dataclass
class CodeTask:
    """Input specification for a single CodeGeneratorAgent run.

    Attributes:
        task_id: Unique identifier used in log messages and reports.
        description: Natural-language task for the LLM, e.g.
            ``"Generate a sequence targeting the back-pressure bin in
            axi_write_cov.hit_max_outstanding"``.
    """

    task_id: str
    description: str

FileSpec dataclass

A file path and its full content, ready to write to disk.

Attributes:

Name Type Description
path str

Relative or absolute destination path for the file.

content str

Full text content of the file.

Source code in src/dv_agentic/agents/code_generator.py
@dataclass
class FileSpec:
    """A file path and its full content, ready to write to disk.

    Attributes:
        path: Relative or absolute destination path for the file.
        content: Full text content of the file.
    """

    path: str
    content: str

ParsedResponse dataclass

Structured fields extracted from one LLM response.

Attributes:

Name Type Description
summary str

Executive summary of the changes made by the LLM.

changed_file_paths list[str]

List of paths identified in the 'Changed Files' section.

file_specs list[FileSpec]

List of :class:FileSpec objects ready to be written to disk.

open_questions str

Feedback or questions from the LLM if confidence is low.

confidence str

Self-reported compile confidence ("HIGH", "MEDIUM", "LOW").

confidence_reason str

Detailed justification for the confidence rating.

raw str

The original raw string response from the LLM.

Source code in src/dv_agentic/agents/code_generator.py
@dataclass
class ParsedResponse:
    """Structured fields extracted from one LLM response.

    Attributes:
        summary: Executive summary of the changes made by the LLM.
        changed_file_paths: List of paths identified in the 'Changed Files' section.
        file_specs: List of :class:`FileSpec` objects ready to be written to disk.
        open_questions: Feedback or questions from the LLM if confidence is low.
        confidence: Self-reported compile confidence ("HIGH", "MEDIUM", "LOW").
        confidence_reason: Detailed justification for the confidence rating.
        raw: The original raw string response from the LLM.
    """

    summary: str
    changed_file_paths: list[str]  # paths from ### Changed Files
    file_specs: list[FileSpec]  # paths + content ready to write
    open_questions: str
    confidence: str  # "HIGH" | "MEDIUM" | "LOW" | "UNKNOWN"
    confidence_reason: str
    raw: str

Simulation Controller Agent

The SimControllerAgent drives simulator execution, configures test variables, and triggers builds.

sim_controller

Simulation execution agent.

Manages the full lifecycle of a simulation task
  1. Create ai-task/{task_id} git branch.
  2. Compile (fail-fast — never submit a broken build).
  3. Run the simulation in a loop, respecting the budget in AgentConfig.
  4. Commit the final state and report results.

SimControllerAgent

Bases: BaseAgent

Runs compile → simulate → commit cycles within a git branch.

Does not require LLM access. All decisions are deterministic: compile fail → abort; sim pass → done; budget exhausted → escalate.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration (name, budget, environment).

required
simulator SimulatorTool

A SimulatorTool adapter (Xcelium, GHDL, Icarus, …).

required
coverage CoverageTool | None

Optional CoverageTool adapter used to record the coverage DB path in the report.

None
base_branch str

Git branch to fork from. Defaults to "main".

'main'
Source code in src/dv_agentic/agents/sim_controller.py
class SimControllerAgent(BaseAgent):
    """Runs compile → simulate → commit cycles within a git branch.

    Does not require LLM access.  All decisions are deterministic:
    compile fail → abort; sim pass → done; budget exhausted → escalate.

    Args:
        config: Agent configuration (name, budget, environment).
        simulator: A ``SimulatorTool`` adapter (Xcelium, GHDL, Icarus, …).
        coverage: Optional ``CoverageTool`` adapter used to record the
            coverage DB path in the report.
        base_branch: Git branch to fork from.  Defaults to ``"main"``.
    """

    def __init__(
        self,
        config: AgentConfig,
        simulator: SimulatorTool,
        coverage: CoverageTool | None = None,
        base_branch: str = "main",
    ) -> None:
        super().__init__(config)
        self.sim = simulator
        self.cov = coverage
        self.base_branch = base_branch

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str | SimTask) -> str:
        """Execute the simulation task lifecycle.

        Args:
            task_input: Either a :class:`SimTask` instance or a JSON string
                that deserialises into one.

        Returns:
            A human-readable report string (see :class:`SimReport`).
        """
        if not task_input:
            raise ValueError("task_input must not be empty")

        task = self._parse_task(task_input)

        if not task.task_id:
            raise ValueError("SimTask must have a task_id")
        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        safe_id = re.sub(r"[^a-zA-Z0-9_-]", "_", task.task_id)
        branch = f"ai-task/{safe_id}"

        await asyncio.to_thread(self._git_checkout_new_branch, branch)

        # Fail-fast compile
        compile_result = await asyncio.to_thread(self.sim.compile, task.file_list, task.top)
        if compile_result.status == "fail":
            logger.error("Compile failed for task '%s'", task.task_id)
            return (
                SimReport(
                    task_id=task.task_id,
                    final_status="compile_fail",
                    runs_total=0,
                    branch=branch,
                    ready_for_pr=False,
                ).to_str()
                + f"\n\n### Compile Output\n{compile_result.output}"
            )

        # Sim loop — self.step() checks budget AND increments self.iteration
        results: list[SimResult] = []
        while await self.step():
            sim_result = await asyncio.to_thread(self.sim.run, task.test, task.seed, task.debug)
            results.append(sim_result)
            logger.info(
                "Sim iter=%d status=%s job_id=%s",
                self.iteration,
                sim_result.status,
                sim_result.job_id,
            )
            await asyncio.to_thread(
                self._git_commit,
                f"[agent] sim iter={self.iteration} · task:{task.task_id} · iter:{self.iteration}",
            )
            if sim_result.status == "pass":
                return SimReport(
                    task_id=task.task_id,
                    final_status="pass",
                    runs_total=self.iteration,
                    branch=branch,
                    ready_for_pr=True,
                    last_result=sim_result,
                ).to_str()

        # Budget exhausted
        await asyncio.to_thread(
            self._git_commit, f"[agent] budget exhausted · task:{task.task_id} · INCOMPLETE"
        )
        last = results[-1] if results else None
        return SimReport(
            task_id=task.task_id,
            final_status="escalated",
            runs_total=self.iteration,
            branch=branch,
            ready_for_pr=False,
            last_result=last,
        ).to_str()

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _parse_task(task_input: str | SimTask) -> SimTask:
        if isinstance(task_input, SimTask):
            return task_input
        return SimTask(**json.loads(task_input))

    def _git(self, *args: str) -> None:
        """Run a git command, raising CalledProcessError on failure."""
        subprocess.run(  # noqa: S603
            ["git", *args],  # noqa: S607
            check=True,
            capture_output=True,
        )

    def _git_checkout_new_branch(self, branch: str) -> None:
        try:
            self._git("checkout", self.base_branch)
            self._git("pull", "--ff-only")
            self._git("checkout", "-B", branch)
        except subprocess.CalledProcessError as exc:
            logger.warning("git branch setup failed (may be expected in CI): %s", exc)

    def _git_commit(self, message: str) -> None:
        try:
            # Add only tracked files and newly created files that are NOT in .gitignore
            self._git("add", ".")
            self._git("commit", "-m", message)
        except subprocess.CalledProcessError:
            logger.debug("Nothing to commit for message: %s", message)
run(task_input) async

Execute the simulation task lifecycle.

Parameters:

Name Type Description Default
task_input str | SimTask

Either a :class:SimTask instance or a JSON string that deserialises into one.

required

Returns:

Type Description
str

A human-readable report string (see :class:SimReport).

Source code in src/dv_agentic/agents/sim_controller.py
async def run(self, task_input: str | SimTask) -> str:
    """Execute the simulation task lifecycle.

    Args:
        task_input: Either a :class:`SimTask` instance or a JSON string
            that deserialises into one.

    Returns:
        A human-readable report string (see :class:`SimReport`).
    """
    if not task_input:
        raise ValueError("task_input must not be empty")

    task = self._parse_task(task_input)

    if not task.task_id:
        raise ValueError("SimTask must have a task_id")
    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    safe_id = re.sub(r"[^a-zA-Z0-9_-]", "_", task.task_id)
    branch = f"ai-task/{safe_id}"

    await asyncio.to_thread(self._git_checkout_new_branch, branch)

    # Fail-fast compile
    compile_result = await asyncio.to_thread(self.sim.compile, task.file_list, task.top)
    if compile_result.status == "fail":
        logger.error("Compile failed for task '%s'", task.task_id)
        return (
            SimReport(
                task_id=task.task_id,
                final_status="compile_fail",
                runs_total=0,
                branch=branch,
                ready_for_pr=False,
            ).to_str()
            + f"\n\n### Compile Output\n{compile_result.output}"
        )

    # Sim loop — self.step() checks budget AND increments self.iteration
    results: list[SimResult] = []
    while await self.step():
        sim_result = await asyncio.to_thread(self.sim.run, task.test, task.seed, task.debug)
        results.append(sim_result)
        logger.info(
            "Sim iter=%d status=%s job_id=%s",
            self.iteration,
            sim_result.status,
            sim_result.job_id,
        )
        await asyncio.to_thread(
            self._git_commit,
            f"[agent] sim iter={self.iteration} · task:{task.task_id} · iter:{self.iteration}",
        )
        if sim_result.status == "pass":
            return SimReport(
                task_id=task.task_id,
                final_status="pass",
                runs_total=self.iteration,
                branch=branch,
                ready_for_pr=True,
                last_result=sim_result,
            ).to_str()

    # Budget exhausted
    await asyncio.to_thread(
        self._git_commit, f"[agent] budget exhausted · task:{task.task_id} · INCOMPLETE"
    )
    last = results[-1] if results else None
    return SimReport(
        task_id=task.task_id,
        final_status="escalated",
        runs_total=self.iteration,
        branch=branch,
        ready_for_pr=False,
        last_result=last,
    ).to_str()

SimReport dataclass

Structured output from a completed SimControllerAgent run.

Source code in src/dv_agentic/agents/sim_controller.py
@dataclass
class SimReport:
    """Structured output from a completed SimControllerAgent run."""

    task_id: str
    final_status: str  # "pass" | "fail" | "timeout" | "compile_fail" | "escalated"
    runs_total: int
    branch: str
    ready_for_pr: bool
    last_result: SimResult | None = None

    def to_str(self) -> str:
        pr_note = "yes" if self.ready_for_pr else f"no (status={self.final_status})"
        lines = [
            "### Task Complete",
            f"task_id      : {self.task_id}",
            f"final_status : {self.final_status}",
            f"runs_total   : {self.runs_total}",
            f"branch       : {self.branch}",
            f"ready_for_pr : {pr_note}",
        ]
        if self.last_result and self.last_result.error_summary:
            lines.append(f"last_error   : {self.last_result.error_summary}")
        return "\n".join(lines)

Log Analyzer Agent

The LogAnalyzerAgent parses simulation logs, identifies failures, and returns structured failure classifications.

log_analyzer

Simulation log analysis agent.

Classifies simulation failures by matching known error patterns with regular expressions. Falls back to unknown when no pattern matches and sets debug_required = True so the Orchestrator can request a debug-mode re-run.

Unknown-class handling can be handled by an LLM-powered agent. analysis) — this agent never speculates on root cause.

FailureSummary dataclass

Structured result produced by :class:LogAnalyzerAgent.

Granular sub-category within error_class.

Used by the Orchestrator's dynamic escalation logic: when the failure_subtype shifts between consecutive log-analyzer calls the Orchestrator escalates immediately instead of continuing to iterate, saving token budget on a shifting error space.

Compile-error subtypes (CVDP cluster-informed): missing_timescale, unmatched_block, mixed_assignment, multiple_drivers, width_mismatch, interface_mismatch, syntax_general

Sim-error subtypes

scoreboard_fail, coverage_miss, timing_offset, interface_mismatch, protocol_violation, sim_general

Source code in src/dv_agentic/agents/log_analyzer.py
@dataclass
class FailureSummary:
    """Structured result produced by :class:`LogAnalyzerAgent`.

    Granular sub-category within *error_class*.

    Used by the Orchestrator's dynamic escalation logic: when the
    *failure_subtype* shifts between consecutive log-analyzer calls the
    Orchestrator escalates immediately instead of continuing to iterate,
    saving token budget on a shifting error space.

    Compile-error subtypes (CVDP cluster-informed):
      ``missing_timescale``, ``unmatched_block``, ``mixed_assignment``,
      ``multiple_drivers``, ``width_mismatch``, ``interface_mismatch``,
      ``syntax_general``

    Sim-error subtypes:
      ``scoreboard_fail``, ``coverage_miss``, ``timing_offset``,
      ``interface_mismatch``, ``protocol_violation``, ``sim_general``
    """

    error_class: str
    first_occurrence: str  # "line N" or "N/A"
    message: str  # first matching line, trimmed to 120 chars
    failure_subtype: str = "unknown"
    context_lines: list[str] = field(default_factory=list)
    debug_required: bool = False
    next_step: str = ""

    def to_str(self) -> str:
        ctx = "\n".join(self.context_lines) or "(none)"
        debug = f"YES — {self.next_step}" if self.debug_required else "NO  — log is sufficient"
        return (
            f"### Failure Summary\n"
            f"error_class      : {self.error_class}\n"
            f"failure_subtype  : {self.failure_subtype}\n"
            f"first_occurrence : {self.first_occurrence}\n"
            f"message          : {self.message}\n\n"
            f"### Context Window\n{ctx}\n\n"
            f"### Debug Mode Required\n{debug}\n\n"
            f"### Recommended Next Step\n{self.next_step}"
        )

LogAnalyzerAgent

Bases: BaseAgent

Parses simulation logs and returns a structured :class:FailureSummary.

Does not require LLM access. Analysis is purely regex-based. An LLM agent can provide reasoning for unknown class failures.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration (budget is not consumed by this agent, but is required by the ABC).

required
Source code in src/dv_agentic/agents/log_analyzer.py
class LogAnalyzerAgent(BaseAgent):
    """Parses simulation logs and returns a structured :class:`FailureSummary`.

    Does not require LLM access.  Analysis is purely regex-based.
    An LLM agent can provide reasoning for ``unknown`` class failures.

    Args:
        config: Agent configuration (budget is not consumed by this agent,
            but is required by the ABC).
    """

    def __init__(self, config: AgentConfig) -> None:
        super().__init__(config)

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str) -> str:
        """Analyse a log file or log content string.

        Args:
            task_input: Path to a log file *or* raw log content.
                If the string resolves to an existing file, the file is read;
                otherwise the string itself is treated as log content.

        Returns:
            A formatted :class:`FailureSummary` string.
        """
        if not task_input or not isinstance(task_input, str):
            raise ValueError("task_input must be a non-empty string")

        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        await self.step()  # Deterministic agent, one iteration per run
        summary = await asyncio.to_thread(self.analyze, task_input)
        return summary.to_str()

    # ------------------------------------------------------------------
    # Public helpers (useful for unit tests and downstream agents)
    # ------------------------------------------------------------------

    def analyze(self, content_or_path: str) -> FailureSummary:
        """Return a :class:`FailureSummary` for the given log content or file path."""
        return self._analyze(self._get_lines(content_or_path))

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    @staticmethod
    def _get_lines(task_input: str) -> Iterator[str]:
        p = Path(task_input)
        if p.exists() and p.is_file():
            logger.info("LogAnalyzerAgent reading log from %s", p)
            with p.open(encoding="utf-8", errors="replace") as f:
                for line in f:
                    yield line.rstrip("\r\n")
        else:
            yield from task_input.splitlines()

    def _analyze(self, lines_iter: Iterator[str]) -> FailureSummary:
        prev_line: str | None = None

        for i, line in enumerate(lines_iter):
            for pattern, error_class in _PATTERNS:
                if pattern.search(line):
                    # Found match. Capture 1 line before, current, and 2 lines after.
                    context = []
                    if prev_line is not None:
                        context.append(prev_line)
                    context.append(line)

                    # Try to grab up to 2 more lines
                    for _ in range(2):
                        try:
                            context.append(next(lines_iter).rstrip("\r\n"))
                        except StopIteration:
                            break

                    debug_required = self._needs_debug(error_class, context, lines_iter)
                    failure_subtype = self._classify_subtype(error_class, "\n".join(context))
                    return FailureSummary(
                        error_class=error_class,
                        first_occurrence=f"line {i + 1}",
                        message=line[:120].strip(),
                        failure_subtype=failure_subtype,
                        context_lines=context,
                        debug_required=debug_required,
                        next_step=self._recommend(error_class, debug_required),
                    )
            prev_line = line

        # No pattern matched
        return FailureSummary(
            error_class="unknown",
            first_occurrence="N/A",
            message="No recognisable error pattern found.",
            failure_subtype="unknown",
            context_lines=[],
            debug_required=True,
            next_step="Re-run in debug mode with +UVM_VERBOSITY=UVM_HIGH.",
        )

    @staticmethod
    def _classify_subtype(error_class: str, text: str) -> str:
        """Return a granular failure sub-type for dynamic escalation tracking.

        Matches the combined text of the matched line plus its context window
        against the CVDP-informed sub-type pattern tables.

        Args:
            error_class: Top-level class already determined by :attr:`_PATTERNS`.
            text: Concatenated matched line + context lines to search.

        Returns:
            A sub-type string such as ``"missing_timescale"`` or
            ``"scoreboard_fail"``.  Falls back to ``"syntax_general"`` for
            compile errors and ``"sim_general"`` for runtime errors when no
            sub-type pattern matches.
        """
        if error_class == "compile_error":
            for pattern, subtype in _COMPILE_SUBTYPE_PATTERNS:
                if pattern.search(text):
                    return subtype
            return "syntax_general"

        if error_class in (
            "uvm_fatal",
            "uvm_error",
            "cocotb_error",
            "scoreboard_mismatch",
            "sim_assertion",
            "x_propagation",
            "timeout",
        ):
            for pattern, subtype in _SIM_SUBTYPE_PATTERNS:
                if pattern.search(text):
                    return subtype
            return "sim_general"

        # For "unknown" and any future classes, echo the class itself so the
        # Orchestrator can still track shifts (unknown→unknown is stable).
        return error_class

    @staticmethod
    def _needs_debug(error_class: str, context: list[str], remaining_lines: Iterator[str]) -> bool:
        """Return True when a debug-mode re-run would provide more information."""
        if error_class in _NO_DEBUG_CLASSES:
            return False

        # Multiple UVM_ERRORs but log may be truncated
        if error_class == "uvm_error":
            # Check context first (already consumed from iterator)
            if sum(1 for line in context if "UVM_ERROR" in line) > 1:
                return True
            # Then check remaining lines
            return any("UVM_ERROR" in line for line in remaining_lines)

        # Unknown class always needs more info
        return error_class == "unknown"

    @staticmethod
    def _recommend(error_class: str, debug_required: bool) -> str:
        if error_class == "compile_error":
            return "Compile error — pass to Code Generator for fix."
        if debug_required:
            return "Re-run in debug mode with +UVM_VERBOSITY=UVM_HIGH."
        if error_class in {"uvm_fatal", "sim_assertion", "cocotb_error"}:
            return "Pass to Bug Classifier with the above summary."
        return "Pass to Bug Classifier with the above summary."
analyze(content_or_path)

Return a :class:FailureSummary for the given log content or file path.

Source code in src/dv_agentic/agents/log_analyzer.py
def analyze(self, content_or_path: str) -> FailureSummary:
    """Return a :class:`FailureSummary` for the given log content or file path."""
    return self._analyze(self._get_lines(content_or_path))
run(task_input) async

Analyse a log file or log content string.

Parameters:

Name Type Description Default
task_input str

Path to a log file or raw log content. If the string resolves to an existing file, the file is read; otherwise the string itself is treated as log content.

required

Returns:

Type Description
str

A formatted :class:FailureSummary string.

Source code in src/dv_agentic/agents/log_analyzer.py
async def run(self, task_input: str) -> str:
    """Analyse a log file or log content string.

    Args:
        task_input: Path to a log file *or* raw log content.
            If the string resolves to an existing file, the file is read;
            otherwise the string itself is treated as log content.

    Returns:
        A formatted :class:`FailureSummary` string.
    """
    if not task_input or not isinstance(task_input, str):
        raise ValueError("task_input must be a non-empty string")

    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    await self.step()  # Deterministic agent, one iteration per run
    summary = await asyncio.to_thread(self.analyze, task_input)
    return summary.to_str()

Other Sub-Agents

Below are additional specialized components:

Spec Analyst Agent

spec_analyst

Spec analysis agent.

Parses a specification document (plain text or pre-extracted PDF content) and generates a structured verification plan (vplan) in YAML format.

Workflow
  1. Send the spec text to the LLM with the spec_analyst system prompt.
  2. Parse the YAML block from the response.
  3. If a complete YAML block is found → write to disk and return VplanResult.
  4. If incomplete or no YAML → ask the LLM to produce a complete plan and retry.
  5. Stop when a valid vplan is extracted or budget is exhausted.
SpecAnalystAgent

Bases: BaseAgent

Parses a spec document and produces a structured vplan.yaml.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration (budget caps LLM call count).

required
llm BaseLLMClient

LLM client.

required
output_path str | None

Where to write the generated vplan.yaml. Pass None to skip writing (useful in tests or preview mode).

'.agent/vplan.yaml'
project_config ProjectContext | None

Optional context for PromptLoader enrichment.

None
session SessionState | None

Optional session state.

None
prompts_dir str | Path | None

Directory containing spec_analyst.md.

None
Source code in src/dv_agentic/agents/spec_analyst.py
class SpecAnalystAgent(BaseAgent):
    """Parses a spec document and produces a structured vplan.yaml.

    Args:
        config: Agent configuration (``budget`` caps LLM call count).
        llm: LLM client.
        output_path: Where to write the generated vplan.yaml.  Pass ``None``
            to skip writing (useful in tests or preview mode).
        project_config: Optional context for PromptLoader enrichment.
        session: Optional session state.
        prompts_dir: Directory containing ``spec_analyst.md``.
    """

    _YAML_RE = _YAML_BLOCK_RE

    def __init__(
        self,
        config: AgentConfig,
        llm: BaseLLMClient,
        output_path: str | None = ".agent/vplan.yaml",
        project_config: ProjectContext | None = None,
        session: SessionState | None = None,
        prompts_dir: str | Path | None = None,
    ) -> None:
        super().__init__(config)
        self.llm = llm
        self.output_path = output_path
        self.project_config = project_config
        self.session = session
        self.prompts_dir = prompts_dir

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str) -> str:
        """Parse specifications and generate a verification plan.

        Args:
            task_input: Natural language description of the verification
                scope or paths to spec documents.

        Returns:
            A string containing the generated vplan (YAML format).
        """
        if not task_input or not isinstance(task_input, str):
            raise ValueError("task_input must be a non-empty string")

        system_prompt = self._load_system_prompt()

        if not system_prompt:
            raise RuntimeError("System prompt must not be empty")
        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        history: list[dict[str, str]] = [{"role": "user", "content": task_input}]
        last_yaml = ""

        while await self.step():
            response = await self.llm.complete(system_prompt, history, max_tokens=4000)
            history.append({"role": "assistant", "content": response})

            yaml_block = self._extract_yaml(response)
            # Only count as 'last_yaml' if it was specifically extracted as a block
            if self._YAML_RE.search(response):
                last_yaml = yaml_block
                logger.info("SpecAnalyst iter=%d: vplan YAML extracted", self.iteration)
                break

            # No valid YAML yet — ask the LLM to produce a complete one
            logger.info("SpecAnalyst iter=%d: no YAML found, retrying", self.iteration)
            history.append({"role": "user", "content": self._follow_up()})

        if not last_yaml:
            return VplanResult(
                vplan_yaml="",
                feature_count=0,
                output_path="",
                summary="Budget exhausted before a valid vplan was extracted.",
                iterations=self.iteration,
            ).to_str()

        feature_count = len(_FEATURE_RE.findall(last_yaml))
        written_path = self._write_vplan(last_yaml)
        summary = self._extract_summary(history)

        return VplanResult(
            vplan_yaml=last_yaml,
            feature_count=feature_count,
            output_path=written_path,
            summary=summary,
            iterations=self.iteration,
        ).to_str()

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    def _load_system_prompt(self) -> str:
        try:
            loader = PromptLoader(
                prompts_dir=self.prompts_dir,
                project_config=self.project_config,
                session=self.session,
            )
            return loader.load("spec_analyst")
        except (FileNotFoundError, RuntimeError) as exc:
            logger.warning("PromptLoader unavailable (%s); using fallback.", exc)
            return (
                "You are a hardware verification specification analyst. "
                "Given a specification document, extract all features and generate "
                "a structured verification plan in YAML format. "
                "Respond with a ```yaml block containing the vplan. "
                "Each feature must have: name, description, priority (mandatory/optional), "
                "and a list of coverage bins."
            )

    def _extract_yaml(self, response: str) -> str:
        """Extract the YAML vplan from the LLM response."""
        if not response or not isinstance(response, str):
            raise ValueError("LLM response must be a non-empty string")

        m = self._YAML_RE.search(response)
        if m:
            vplan = m.group(1).strip()
            if not vplan:
                raise ValueError("Extracted vplan must not be empty")
            return vplan

        return ""

    def _write_vplan(self, yaml_content: str) -> str:
        if not self.output_path:
            return ""
        target = Path(self.output_path)
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(yaml_content, encoding="utf-8")
        logger.info("SpecAnalyst wrote vplan to %s", target)
        return str(target)

    @staticmethod
    def _follow_up() -> str:
        return (
            "Please provide the complete verification plan as a YAML code block. "
            "Use the format:\n"
            "```yaml\n"
            "features:\n"
            "  - name: feature_name\n"
            "    description: what it verifies\n"
            "    priority: mandatory\n"
            "    bins:\n"
            "      - bin_name\n"
            "```"
        )

    @staticmethod
    def _extract_summary(history: list[dict[str, str]]) -> str:
        """Extract a one-sentence summary from the last assistant message."""
        for msg in reversed(history):
            if msg["role"] == "assistant":
                content = msg["content"].strip()
                first_line = content.splitlines()[0] if content else ""
                return first_line[:200]
        return ""
run(task_input) async

Parse specifications and generate a verification plan.

Parameters:

Name Type Description Default
task_input str

Natural language description of the verification scope or paths to spec documents.

required

Returns:

Type Description
str

A string containing the generated vplan (YAML format).

Source code in src/dv_agentic/agents/spec_analyst.py
async def run(self, task_input: str) -> str:
    """Parse specifications and generate a verification plan.

    Args:
        task_input: Natural language description of the verification
            scope or paths to spec documents.

    Returns:
        A string containing the generated vplan (YAML format).
    """
    if not task_input or not isinstance(task_input, str):
        raise ValueError("task_input must be a non-empty string")

    system_prompt = self._load_system_prompt()

    if not system_prompt:
        raise RuntimeError("System prompt must not be empty")
    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    history: list[dict[str, str]] = [{"role": "user", "content": task_input}]
    last_yaml = ""

    while await self.step():
        response = await self.llm.complete(system_prompt, history, max_tokens=4000)
        history.append({"role": "assistant", "content": response})

        yaml_block = self._extract_yaml(response)
        # Only count as 'last_yaml' if it was specifically extracted as a block
        if self._YAML_RE.search(response):
            last_yaml = yaml_block
            logger.info("SpecAnalyst iter=%d: vplan YAML extracted", self.iteration)
            break

        # No valid YAML yet — ask the LLM to produce a complete one
        logger.info("SpecAnalyst iter=%d: no YAML found, retrying", self.iteration)
        history.append({"role": "user", "content": self._follow_up()})

    if not last_yaml:
        return VplanResult(
            vplan_yaml="",
            feature_count=0,
            output_path="",
            summary="Budget exhausted before a valid vplan was extracted.",
            iterations=self.iteration,
        ).to_str()

    feature_count = len(_FEATURE_RE.findall(last_yaml))
    written_path = self._write_vplan(last_yaml)
    summary = self._extract_summary(history)

    return VplanResult(
        vplan_yaml=last_yaml,
        feature_count=feature_count,
        output_path=written_path,
        summary=summary,
        iterations=self.iteration,
    ).to_str()
VplanResult dataclass

Structured output from :class:SpecAnalystAgent.

Source code in src/dv_agentic/agents/spec_analyst.py
@dataclass
class VplanResult:
    """Structured output from :class:`SpecAnalystAgent`."""

    vplan_yaml: str
    feature_count: int
    output_path: str  # path where vplan.yaml was written ("" if not written)
    summary: str
    iterations: int

    def to_str(self) -> str:
        lines = [
            "### Vplan Result",
            f"feature_count : {self.feature_count}",
            f"iterations    : {self.iterations}",
            f"output_path   : {self.output_path or '(not written)'}",
            "",
            "### Summary",
            self.summary,
        ]
        return "\n".join(lines)

Bug Classifier Agent

bug_classifier

Bug classification agent.

Classifies a simulation failure as a testbench bug (TB_BUG) or an RTL bug (RTL_BUG), and assigns a confidence score. When confidence is below the project threshold the agent requests human review rather than guessing.

Workflow
  1. Build a prompt from the failure summary (and optional spec/code context).
  2. Call the LLM; parse BUG_TYPE, CONFIDENCE, and EVIDENCE from the response.
  3. If confidence >= threshold → done.
  4. If confidence < threshold → feed the open questions back and retry.
  5. If budget exhausted → mark requires_human_review = True.
BugClassifierAgent

Bases: BaseAgent

Classifies a simulation failure as a TB bug or RTL bug.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration (budget caps LLM call count).

required
llm BaseLLMClient

LLM client to use for classification.

required
confidence_threshold float

Minimum confidence to accept a classification without flagging for human review. Defaults to 0.75.

0.75
project_config ProjectContext | None

Optional context for PromptLoader enrichment.

None
session SessionState | None

Optional session state injected into the system prompt.

None
prompts_dir str | Path | None

Directory containing bug_classifier.md.

None
Source code in src/dv_agentic/agents/bug_classifier.py
class BugClassifierAgent(BaseAgent):
    """Classifies a simulation failure as a TB bug or RTL bug.

    Args:
        config: Agent configuration (``budget`` caps LLM call count).
        llm: LLM client to use for classification.
        confidence_threshold: Minimum confidence to accept a classification
            without flagging for human review.  Defaults to 0.75.
        project_config: Optional context for PromptLoader enrichment.
        session: Optional session state injected into the system prompt.
        prompts_dir: Directory containing ``bug_classifier.md``.
    """

    _BUG_TYPE_RE = re.compile(r"BUG_TYPE\s*:\s*(TB_BUG|RTL_BUG|UNKNOWN)", re.IGNORECASE)
    _CONFIDENCE_RE = re.compile(r"CONFIDENCE\s*:\s*([0-9]*\.?[0-9]+)", re.IGNORECASE)
    _EVIDENCE_RE = re.compile(
        r"EVIDENCE\s*:(.*?)(?=\n###|\n[A-Z_]+\s*:|\Z)", re.DOTALL | re.IGNORECASE
    )

    def __init__(
        self,
        config: AgentConfig,
        llm: BaseLLMClient,
        confidence_threshold: float = 0.75,
        project_config: ProjectContext | None = None,
        session: SessionState | None = None,
        prompts_dir: str | Path | None = None,
    ) -> None:
        super().__init__(config)
        self.llm = llm
        self.confidence_threshold = confidence_threshold
        self.project_config = project_config
        self.session = session
        self.prompts_dir = prompts_dir

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str) -> str:
        """Classify the failure described in *task_input*.

        Args:
            task_input: Failure summary text (e.g. ``FailureSummary.to_str()``),
                optionally followed by spec excerpts or relevant code snippets.

        Returns:
            A formatted :class:`ClassificationResult` string.
        """
        if not task_input or not isinstance(task_input, str):
            raise ValueError("task_input must be a non-empty string")

        system_prompt = self._load_system_prompt()

        if not system_prompt:
            raise RuntimeError("System prompt must not be empty")
        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        history: list[dict[str, str]] = [{"role": "user", "content": task_input}]
        last: ClassificationResult | None = None

        while await self.step():
            response = await self.llm.complete(system_prompt, history, max_tokens=2000)
            history.append({"role": "assistant", "content": response})

            last = self._parse_response(response, self.iteration)
            logger.info(
                "BugClassifier iter=%d type=%s confidence=%.2f",
                self.iteration,
                last.bug_type,
                last.confidence,
            )

            if last.confidence >= self.confidence_threshold and last.bug_type != "UNKNOWN":
                return last.to_str()

            # Low confidence → ask for more evidence
            history.append({"role": "user", "content": self._follow_up(last)})

        # Budget exhausted — return best guess with human review flag
        if last is None:
            last = ClassificationResult(
                bug_type="UNKNOWN",
                confidence=0.0,
                evidence=[],
                summary="Budget exhausted before any LLM response.",
                requires_human_review=True,
                human_review_reason="No LLM response received.",
                iterations=self.iteration,
            )
        else:
            last.requires_human_review = True
            last.human_review_reason = (
                f"Confidence {last.confidence:.2f} below threshold "
                f"{self.confidence_threshold:.2f} after {self.iteration} iterations."
            )
        return last.to_str()

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    def _load_system_prompt(self) -> str:
        try:
            loader = PromptLoader(
                prompts_dir=self.prompts_dir,
                project_config=self.project_config,
                session=self.session,
            )
            return loader.load("bug_classifier")
        except (FileNotFoundError, RuntimeError) as exc:
            logger.warning("PromptLoader unavailable (%s); using fallback.", exc)
            return (
                "You are a hardware verification bug classification specialist. "
                "Given a simulation failure summary, classify the root cause as "
                "TB_BUG (testbench / verification code issue) or RTL_BUG (design bug). "
                "Always respond with:\n"
                "BUG_TYPE: TB_BUG | RTL_BUG | UNKNOWN\n"
                "CONFIDENCE: 0.0-1.0\n"
                "EVIDENCE:\n- bullet point evidence\n"
                "### Summary\n"
                "One-paragraph explanation."
            )

    def _parse_response(self, response: str, iteration: int) -> ClassificationResult:
        if not response or not isinstance(response, str):
            raise ValueError("LLM response must be a non-empty string")

        bug_type = "UNKNOWN"
        m = self._BUG_TYPE_RE.search(response)
        if m:
            bug_type = m.group(1).upper()

        confidence = 0.0
        m = self._CONFIDENCE_RE.search(response)
        if m:
            raw = float(m.group(1))
            # Accept both 0-1 and 0-100 scales
            confidence = raw / 100.0 if raw > 1.0 else raw

        evidence: list[str] = []
        m = self._EVIDENCE_RE.search(response)
        if m:
            block = m.group(1)
            evidence = [
                line.lstrip("-• ").strip()
                for line in block.splitlines()
                if line.strip().lstrip("-• ")
            ]

        # Extract summary (text after last ### Summary or whole response as fallback)
        summary = ""
        if "### Summary" in response:
            summary = response.split("### Summary", 1)[1].strip()
        elif "### summary" in response.lower():
            summary = re.split(r"###\s+summary", response, flags=re.IGNORECASE)[1].strip()

        requires_review = confidence < self.confidence_threshold or bug_type == "UNKNOWN"
        return ClassificationResult(
            bug_type=bug_type,
            confidence=confidence,
            evidence=evidence,
            summary=summary,
            requires_human_review=requires_review,
            iterations=iteration,
        )

    @staticmethod
    def _follow_up(result: ClassificationResult) -> str:
        return (
            f"Your classification confidence was {result.confidence:.2f}, "
            f"which is below the required threshold. "
            "Please review the failure evidence more carefully and provide:\n"
            "1. Additional evidence from the log that supports or contradicts each bug type.\n"
            "2. A revised BUG_TYPE and CONFIDENCE.\n"
            "3. Specific RTL signals or testbench components that would confirm the root cause."
        )
run(task_input) async

Classify the failure described in task_input.

Parameters:

Name Type Description Default
task_input str

Failure summary text (e.g. FailureSummary.to_str()), optionally followed by spec excerpts or relevant code snippets.

required

Returns:

Type Description
str

A formatted :class:ClassificationResult string.

Source code in src/dv_agentic/agents/bug_classifier.py
async def run(self, task_input: str) -> str:
    """Classify the failure described in *task_input*.

    Args:
        task_input: Failure summary text (e.g. ``FailureSummary.to_str()``),
            optionally followed by spec excerpts or relevant code snippets.

    Returns:
        A formatted :class:`ClassificationResult` string.
    """
    if not task_input or not isinstance(task_input, str):
        raise ValueError("task_input must be a non-empty string")

    system_prompt = self._load_system_prompt()

    if not system_prompt:
        raise RuntimeError("System prompt must not be empty")
    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    history: list[dict[str, str]] = [{"role": "user", "content": task_input}]
    last: ClassificationResult | None = None

    while await self.step():
        response = await self.llm.complete(system_prompt, history, max_tokens=2000)
        history.append({"role": "assistant", "content": response})

        last = self._parse_response(response, self.iteration)
        logger.info(
            "BugClassifier iter=%d type=%s confidence=%.2f",
            self.iteration,
            last.bug_type,
            last.confidence,
        )

        if last.confidence >= self.confidence_threshold and last.bug_type != "UNKNOWN":
            return last.to_str()

        # Low confidence → ask for more evidence
        history.append({"role": "user", "content": self._follow_up(last)})

    # Budget exhausted — return best guess with human review flag
    if last is None:
        last = ClassificationResult(
            bug_type="UNKNOWN",
            confidence=0.0,
            evidence=[],
            summary="Budget exhausted before any LLM response.",
            requires_human_review=True,
            human_review_reason="No LLM response received.",
            iterations=self.iteration,
        )
    else:
        last.requires_human_review = True
        last.human_review_reason = (
            f"Confidence {last.confidence:.2f} below threshold "
            f"{self.confidence_threshold:.2f} after {self.iteration} iterations."
        )
    return last.to_str()
ClassificationResult dataclass

Structured output from :class:BugClassifierAgent.

Source code in src/dv_agentic/agents/bug_classifier.py
@dataclass
class ClassificationResult:
    """Structured output from :class:`BugClassifierAgent`."""

    bug_type: str  # "TB_BUG" | "RTL_BUG" | "UNKNOWN"
    confidence: float  # 0.0 - 1.0
    evidence: list[str]  # bullet points extracted from LLM response
    summary: str
    requires_human_review: bool
    human_review_reason: str = ""
    iterations: int = 1

    def to_str(self) -> str:
        lines = [
            "### Bug Classification",
            f"bug_type   : {self.bug_type}",
            f"confidence : {self.confidence:.2f}",
            f"iterations : {self.iterations}",
            f"human_review: {'YES' if self.requires_human_review else 'NO'}",
        ]
        if self.human_review_reason:
            lines.append(f"review_reason: {self.human_review_reason}")
        if self.evidence:
            lines.append("EVIDENCE   :")
            for e in self.evidence:
                lines.append(f"  - {e.strip()}")
        if self.summary:
            lines += ["", "### Summary", self.summary]
        return "\n".join(lines)

Coverage Analyst Agent

coverage_analyst

Coverage analysis agent.

The agent retrieves a coverage DB for a given job ID, compares the overall percentage against a threshold, and return a structured summary.

Hole classification (actionable / protocol_blocked / design_excluded) and priority ranking are LLM-powered features intended for future extension.

CoverageAnalystAgent

Bases: BaseAgent

Retrieves coverage for a job and reports whether it meets the threshold.

Does not require LLM access. This agent can be extended to support LLM-based analysis.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration.

required
coverage CoverageTool

A CoverageTool adapter (IMC or pyuvm).

required
threshold float

Minimum acceptable overall coverage percentage. Defaults to 90.0.

90.0
Source code in src/dv_agentic/agents/coverage_analyst.py
class CoverageAnalystAgent(BaseAgent):
    """Retrieves coverage for a job and reports whether it meets the threshold.

    Does not require LLM access. This agent can be extended
    to support LLM-based analysis.

    Args:
        config: Agent configuration.
        coverage: A ``CoverageTool`` adapter (IMC or pyuvm).
        threshold: Minimum acceptable overall coverage percentage.
            Defaults to 90.0.
    """

    def __init__(
        self,
        config: AgentConfig,
        coverage: CoverageTool,
        threshold: float = 90.0,
    ) -> None:
        super().__init__(config)
        self.cov = coverage
        self.threshold = threshold

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str) -> str:
        """Retrieve and summarise coverage for a simulation job.

        Args:
            task_input: The job ID whose coverage DB should be queried.

        Returns:
            A formatted :class:`CoverageSummary` string.
        """
        if not task_input or not isinstance(task_input, str):
            raise ValueError("task_input must be a non-empty string")

        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        await self.step()  # Deterministic agent
        summary = await asyncio.to_thread(self.get_summary, task_input)
        return summary.to_str()

    # ------------------------------------------------------------------
    # Public helper (useful for downstream agents without async overhead)
    # ------------------------------------------------------------------

    def get_summary(self, job_id: str) -> CoverageSummary:
        """Return a :class:`CoverageSummary` for *job_id*.

        Args:
            job_id: Simulation job identifier.
        """
        if not job_id or not isinstance(job_id, str):
            raise ValueError("job_id must be a non-empty string")

        db: CoverageDB = self.cov.get_coverage(job_id)

        if db.overall_percentage < 0:
            raise ValueError(f"Coverage percentage cannot be negative: {db.overall_percentage}")

        logger.info(
            "Coverage for job '%s': %.2f%% (threshold=%.2f%%)",
            job_id,
            db.overall_percentage,
            self.threshold,
        )
        return CoverageSummary(
            job_id=job_id,
            db_path=db.path,
            overall_pct=db.overall_percentage,
            threshold_pct=self.threshold,
            below_threshold=db.overall_percentage < self.threshold,
        )
get_summary(job_id)

Return a :class:CoverageSummary for job_id.

Parameters:

Name Type Description Default
job_id str

Simulation job identifier.

required
Source code in src/dv_agentic/agents/coverage_analyst.py
def get_summary(self, job_id: str) -> CoverageSummary:
    """Return a :class:`CoverageSummary` for *job_id*.

    Args:
        job_id: Simulation job identifier.
    """
    if not job_id or not isinstance(job_id, str):
        raise ValueError("job_id must be a non-empty string")

    db: CoverageDB = self.cov.get_coverage(job_id)

    if db.overall_percentage < 0:
        raise ValueError(f"Coverage percentage cannot be negative: {db.overall_percentage}")

    logger.info(
        "Coverage for job '%s': %.2f%% (threshold=%.2f%%)",
        job_id,
        db.overall_percentage,
        self.threshold,
    )
    return CoverageSummary(
        job_id=job_id,
        db_path=db.path,
        overall_pct=db.overall_percentage,
        threshold_pct=self.threshold,
        below_threshold=db.overall_percentage < self.threshold,
    )
run(task_input) async

Retrieve and summarise coverage for a simulation job.

Parameters:

Name Type Description Default
task_input str

The job ID whose coverage DB should be queried.

required

Returns:

Type Description
str

A formatted :class:CoverageSummary string.

Source code in src/dv_agentic/agents/coverage_analyst.py
async def run(self, task_input: str) -> str:
    """Retrieve and summarise coverage for a simulation job.

    Args:
        task_input: The job ID whose coverage DB should be queried.

    Returns:
        A formatted :class:`CoverageSummary` string.
    """
    if not task_input or not isinstance(task_input, str):
        raise ValueError("task_input must be a non-empty string")

    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    await self.step()  # Deterministic agent
    summary = await asyncio.to_thread(self.get_summary, task_input)
    return summary.to_str()
CoverageSummary dataclass

Structured output from :class:CoverageAnalystAgent.

Source code in src/dv_agentic/agents/coverage_analyst.py
@dataclass
class CoverageSummary:
    """Structured output from :class:`CoverageAnalystAgent`."""

    job_id: str
    db_path: str
    overall_pct: float
    threshold_pct: float
    below_threshold: bool

    def to_str(self) -> str:
        status = "BELOW THRESHOLD ⚠" if self.below_threshold else "OK ✓"
        lines = [
            "### Coverage Summary",
            f"job_id     : {self.job_id}",
            f"db_path    : {self.db_path}",
            f"overall    : {self.overall_pct:.2f}%",
            f"threshold  : {self.threshold_pct:.2f}%",
            f"status     : {status}",
        ]
        if self.below_threshold:
            gap = self.threshold_pct - self.overall_pct
            lines += [
                f"gap        : {gap:.2f}% needed to reach threshold",
                "action     : Coverage hole analysis required (Phase 3b LLM agent)",
            ]
        return "\n".join(lines)

Reporter Agent

reporter

Session reporter agent.

Aggregates results from a completed agentic session and generates a structured markdown report suitable for human review or ticket creation.

This agent is intentionally single-turn: the input is fully structured and the LLM has everything it needs in one shot. Budget > 1 is unused in normal operation but respected for safety.

ReporterAgent

Bases: BaseAgent

Generates a structured markdown report from session results.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration.

required
llm BaseLLMClient

LLM client.

required
output_path str | None

Where to write the generated report. Pass None to skip writing.

None
project_config ProjectContext | None

Optional context for PromptLoader enrichment.

None
session SessionState | None

Optional session state.

None
prompts_dir str | Path | None

Directory containing reporter.md.

None
Source code in src/dv_agentic/agents/reporter.py
class ReporterAgent(BaseAgent):
    """Generates a structured markdown report from session results.

    Args:
        config: Agent configuration.
        llm: LLM client.
        output_path: Where to write the generated report.  Pass ``None``
            to skip writing.
        project_config: Optional context for PromptLoader enrichment.
        session: Optional session state.
        prompts_dir: Directory containing ``reporter.md``.
    """

    def __init__(
        self,
        config: AgentConfig,
        llm: BaseLLMClient,
        output_path: str | None = None,
        project_config: ProjectContext | None = None,
        session: SessionState | None = None,
        prompts_dir: str | Path | None = None,
    ) -> None:
        super().__init__(config)
        self.llm = llm
        self.output_path = output_path
        self.project_config = project_config
        self.session = session
        self.prompts_dir = prompts_dir

    # ------------------------------------------------------------------
    # BaseAgent ABC
    # ------------------------------------------------------------------

    async def run(self, task_input: str) -> str:
        """Aggregate results and generate a final report.

        Args:
            task_input: The history of agent interactions to summarize.

        Returns:
            A formatted markdown report string.
        """
        if not task_input or not isinstance(task_input, str):
            raise ValueError("task_input must be a non-empty string")

        system_prompt = self._load_system_prompt()

        if not system_prompt:
            raise RuntimeError("System prompt must not be empty")
        if self.iteration != 0:
            raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

        await self.step()  # consume one budget unit

        task_id = self._extract_task_id(task_input)
        response = await self.llm.complete(
            system_prompt,
            [{"role": "user", "content": task_input}],
            max_tokens=3000,
        )

        written_path = self._write_report(response, task_id)
        report = SessionReport(
            task_id=task_id,
            markdown=response,
            output_path=written_path,
        )
        logger.info("Reporter: generated report for task '%s'", task_id)
        return report.to_str()

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    def _load_system_prompt(self) -> str:
        try:
            loader = PromptLoader(
                prompts_dir=self.prompts_dir,
                project_config=self.project_config,
                session=self.session,
            )
            return loader.load("reporter")
        except (FileNotFoundError, RuntimeError) as exc:
            logger.warning("PromptLoader unavailable (%s); using fallback.", exc)
            return (
                "You are a verification session reporter. "
                "Given the results from multiple agents in a session, produce a concise "
                "markdown report with these sections:\n"
                "## Summary\n## Simulation Results\n## Coverage\n## Issues Found\n"
                "## Recommended Next Steps\n"
                "Be factual and concise. Use tables where appropriate."
            )

    def _write_report(self, markdown: str, task_id: str) -> str:
        if not self.output_path:
            return ""
        path_str = self.output_path.replace("{task_id}", task_id)
        target = Path(path_str)
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(markdown, encoding="utf-8")
        logger.info("Reporter wrote report to %s", target)
        return str(target)

    @staticmethod
    def _extract_task_id(text: str) -> str:
        """Try to parse a task_id from the input; fall back to 'session'."""
        import re

        m = re.search(r"task[_\s]id\s*[:\s]+([a-zA-Z0-9_\-]+)", text, re.IGNORECASE)
        return m.group(1) if m else "session"
run(task_input) async

Aggregate results and generate a final report.

Parameters:

Name Type Description Default
task_input str

The history of agent interactions to summarize.

required

Returns:

Type Description
str

A formatted markdown report string.

Source code in src/dv_agentic/agents/reporter.py
async def run(self, task_input: str) -> str:
    """Aggregate results and generate a final report.

    Args:
        task_input: The history of agent interactions to summarize.

    Returns:
        A formatted markdown report string.
    """
    if not task_input or not isinstance(task_input, str):
        raise ValueError("task_input must be a non-empty string")

    system_prompt = self._load_system_prompt()

    if not system_prompt:
        raise RuntimeError("System prompt must not be empty")
    if self.iteration != 0:
        raise RuntimeError(f"Agent must start at iteration 0 (current: {self.iteration})")

    await self.step()  # consume one budget unit

    task_id = self._extract_task_id(task_input)
    response = await self.llm.complete(
        system_prompt,
        [{"role": "user", "content": task_input}],
        max_tokens=3000,
    )

    written_path = self._write_report(response, task_id)
    report = SessionReport(
        task_id=task_id,
        markdown=response,
        output_path=written_path,
    )
    logger.info("Reporter: generated report for task '%s'", task_id)
    return report.to_str()
SessionReport dataclass

Structured output from :class:ReporterAgent.

Source code in src/dv_agentic/agents/reporter.py
@dataclass
class SessionReport:
    """Structured output from :class:`ReporterAgent`."""

    task_id: str
    markdown: str
    output_path: str  # path where report was written ("" if not written)

    def to_str(self) -> str:
        return self.markdown