Skip to content

Tools & Adapters API Reference ⚙️

This section documents the foundational execution tools, LLM clients, and HDL simulator adapters in dv-agentic-system.


Tool Interface & Models

The base interfaces and validation schemas for tools and system data types.

interface

Abstract base classes defining the simulator and coverage tool contracts.

CoverageTool

Bases: ABC

Interface for coverage analysis tools.

Source code in src/dv_agentic/tools/interface.py
class CoverageTool(abc.ABC):
    """Interface for coverage analysis tools."""

    @abc.abstractmethod
    def get_coverage(self, job_id: str) -> CoverageDB:
        """Retrieve coverage results for a specific job.

        Args:
            job_id: The ID of the simulation job.

        """
get_coverage(job_id) abstractmethod

Retrieve coverage results for a specific job.

Parameters:

Name Type Description Default
job_id str

The ID of the simulation job.

required
Source code in src/dv_agentic/tools/interface.py
@abc.abstractmethod
def get_coverage(self, job_id: str) -> CoverageDB:
    """Retrieve coverage results for a specific job.

    Args:
        job_id: The ID of the simulation job.

    """

SimulatorTool

Bases: ABC

Interface for simulation tools (VCS, Questa, cocotb, etc.).

Source code in src/dv_agentic/tools/interface.py
class SimulatorTool(abc.ABC):
    """Interface for simulation tools (VCS, Questa, cocotb, etc.)."""

    @abc.abstractmethod
    def compile(self, file_list: list[str], top: str) -> CompileResult:
        """Compile the source files.

        Args:
            file_list: List of source file paths.
            top: Name of the top-level module.

        """

    @abc.abstractmethod
    def run(self, test: str, seed: int, debug: bool) -> SimResult:
        """Run a specific test.

        Args:
            test: Name of the test to run.
            seed: Random seed for simulation.
            debug: Whether to enable debug mode (e.g., waveform dumping).

        """
compile(file_list, top) abstractmethod

Compile the source files.

Parameters:

Name Type Description Default
file_list list[str]

List of source file paths.

required
top str

Name of the top-level module.

required
Source code in src/dv_agentic/tools/interface.py
@abc.abstractmethod
def compile(self, file_list: list[str], top: str) -> CompileResult:
    """Compile the source files.

    Args:
        file_list: List of source file paths.
        top: Name of the top-level module.

    """
run(test, seed, debug) abstractmethod

Run a specific test.

Parameters:

Name Type Description Default
test str

Name of the test to run.

required
seed int

Random seed for simulation.

required
debug bool

Whether to enable debug mode (e.g., waveform dumping).

required
Source code in src/dv_agentic/tools/interface.py
@abc.abstractmethod
def run(self, test: str, seed: int, debug: bool) -> SimResult:
    """Run a specific test.

    Args:
        test: Name of the test to run.
        seed: Random seed for simulation.
        debug: Whether to enable debug mode (e.g., waveform dumping).

    """

models

Data models for simulation results and coverage tracking.

CompileResult dataclass

Result of a compilation step.

Source code in src/dv_agentic/tools/models.py
@dataclass
class CompileResult:
    """Result of a compilation step."""

    status: Literal["pass", "fail"]
    output: str

CoverageDB dataclass

Representation of a coverage database.

Source code in src/dv_agentic/tools/models.py
@dataclass
class CoverageDB:
    """Representation of a coverage database."""

    path: str
    overall_percentage: float

SimResult dataclass

Result of a simulation run.

Source code in src/dv_agentic/tools/models.py
@dataclass
class SimResult:
    """Result of a simulation run."""

    status: Literal["pass", "fail", "timeout"]
    job_id: str
    log_path: str
    error_summary: str | None = None
    cov_db_path: str | None = None
    """Path to the coverage DB written by the simulator (None if not collected)."""
    wall_time_sec: int | None = None
    """Wall-clock time in seconds that the simulation took to run."""
cov_db_path = None class-attribute instance-attribute

Path to the coverage DB written by the simulator (None if not collected).

wall_time_sec = None class-attribute instance-attribute

Wall-clock time in seconds that the simulation took to run.

SimTask dataclass

Input specification for a single SimControllerAgent run.

Attributes:

Name Type Description
task_id str

Unique identifier for this task (used for branch naming and commit messages).

test str

UVM test name or cocotb test module to run.

seed int

Random seed for the simulation.

file_list list[str]

Source files to compile. May be empty if the project already has a compiled snapshot.

top str

Top-level module name passed to the simulator.

debug bool

Whether to enable debug mode (waveform dumping, full verbosity).

Source code in src/dv_agentic/tools/models.py
@dataclass
class SimTask:
    """Input specification for a single SimControllerAgent run.

    Attributes:
        task_id: Unique identifier for this task (used for branch naming and
            commit messages).
        test: UVM test name or cocotb test module to run.
        seed: Random seed for the simulation.
        file_list: Source files to compile.  May be empty if the project
            already has a compiled snapshot.
        top: Top-level module name passed to the simulator.
        debug: Whether to enable debug mode (waveform dumping, full verbosity).
    """

    task_id: str
    test: str
    seed: int
    file_list: list[str] = field(default_factory=list)
    top: str = "top"
    debug: bool = False

LLM Clients

These clients interact with LLM backends (either via cloud APIs or local model pipelines) to complete system agent queries.

Base LLM Client Interface

interface

BaseLLMClient

Bases: ABC

Abstract base class for LLM clients.

Source code in src/dv_agentic/tools/llm/interface.py
class BaseLLMClient(abc.ABC):
    """Abstract base class for LLM clients."""

    @abc.abstractmethod
    async def complete(
        self,
        system: str,
        messages: list[dict[str, str]],
        max_tokens: int = 1000,
    ) -> str:
        """Complete the given conversation.

        Args:
            system: The system prompt.
            messages: A list of message dictionaries (e.g., {"role": "user", "content": "..."}).
            max_tokens: The maximum number of tokens to generate.

        Returns:
            The generated response string.
        """
complete(system, messages, max_tokens=1000) abstractmethod async

Complete the given conversation.

Parameters:

Name Type Description Default
system str

The system prompt.

required
messages list[dict[str, str]]

A list of message dictionaries (e.g., {"role": "user", "content": "..."}).

required
max_tokens int

The maximum number of tokens to generate.

1000

Returns:

Type Description
str

The generated response string.

Source code in src/dv_agentic/tools/llm/interface.py
@abc.abstractmethod
async def complete(
    self,
    system: str,
    messages: list[dict[str, str]],
    max_tokens: int = 1000,
) -> str:
    """Complete the given conversation.

    Args:
        system: The system prompt.
        messages: A list of message dictionaries (e.g., {"role": "user", "content": "..."}).
        max_tokens: The maximum number of tokens to generate.

    Returns:
        The generated response string.
    """

Web API LLM Client

api

External LLM client for the LLM Messages API.

Uses only Python stdlib (urllib) — no third-party SDK required. Set LLM_API_KEY in the environment or pass it explicitly.

LLMAPIClient

Bases: BaseLLMClient

Calls the LLM /v1/messages endpoint over raw HTTP.

All network I/O runs in a thread-pool executor so the async caller is never blocked.

Source code in src/dv_agentic/tools/llm/api.py
class LLMAPIClient(BaseLLMClient):
    """Calls the LLM /v1/messages endpoint over raw HTTP.

    All network I/O runs in a thread-pool executor so the async caller
    is never blocked.
    """

    DEFAULT_URL = "https://api.anthropic.com/v1/messages"
    LLM_VERSION = "2023-06-01"

    def __init__(
        self,
        api_key: str | None = None,
        model: str = "claude-3-5-sonnet-latest",
        api_url: str = DEFAULT_URL,
        timeout: int = 120,
    ) -> None:
        """Initialise the client.

        Args:
            api_key: LLM API key.  Falls back to ``LLM_API_KEY``
                environment variable.
            model: Model identifier to send in every request.
            api_url: Full URL of the messages endpoint (override for testing).
            timeout: Socket timeout in seconds for each HTTP call.
        """
        self.api_key = api_key or os.environ.get("LLM_API_KEY", "")
        self.model = model
        self.api_url = api_url
        self.timeout = timeout

    async def complete(
        self,
        system: str,
        messages: list[dict[str, str]],
        max_tokens: int = 1000,
    ) -> str:
        """Send a request to the LLM API and return the assistant reply.

        Args:
            system: System prompt string.
            messages: Conversation turns in ``[{"role": ..., "content": ...}]`` form.
            max_tokens: Maximum tokens to generate.

        Returns:
            The text content of the first content block in the response.

        Raises:
            RuntimeError: On non-2xx HTTP response.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self._post, system, messages, max_tokens)

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    def _post(
        self,
        system: str,
        messages: list[dict[str, str]],
        max_tokens: int,
    ) -> str:
        """Blocking HTTP POST — runs in a thread-pool executor."""
        payload = json.dumps(
            {
                "model": self.model,
                "max_tokens": max_tokens,
                "system": system,
                "messages": messages,
            }
        ).encode()

        req = urllib.request.Request(  # noqa: S310
            self.api_url,
            data=payload,
            headers={
                "Content-Type": "application/json",
                "x-api-key": self.api_key,
                "llm-version": self.LLM_VERSION,
            },
            method="POST",
        )
        try:
            with urllib.request.urlopen(req, timeout=self.timeout) as resp:  # noqa: S310
                body: dict[str, Any] = json.loads(resp.read())
                return str(body["content"][0]["text"])
        except urllib.error.HTTPError as exc:
            detail = exc.read().decode(errors="replace")
            raise RuntimeError(f"LLM API error {exc.code}: {detail}") from exc
        except urllib.error.URLError as exc:
            raise RuntimeError(f"LLM API connection error: {exc.reason}") from exc
__init__(api_key=None, model='claude-3-5-sonnet-latest', api_url=DEFAULT_URL, timeout=120)

Initialise the client.

Parameters:

Name Type Description Default
api_key str | None

LLM API key. Falls back to LLM_API_KEY environment variable.

None
model str

Model identifier to send in every request.

'claude-3-5-sonnet-latest'
api_url str

Full URL of the messages endpoint (override for testing).

DEFAULT_URL
timeout int

Socket timeout in seconds for each HTTP call.

120
Source code in src/dv_agentic/tools/llm/api.py
def __init__(
    self,
    api_key: str | None = None,
    model: str = "claude-3-5-sonnet-latest",
    api_url: str = DEFAULT_URL,
    timeout: int = 120,
) -> None:
    """Initialise the client.

    Args:
        api_key: LLM API key.  Falls back to ``LLM_API_KEY``
            environment variable.
        model: Model identifier to send in every request.
        api_url: Full URL of the messages endpoint (override for testing).
        timeout: Socket timeout in seconds for each HTTP call.
    """
    self.api_key = api_key or os.environ.get("LLM_API_KEY", "")
    self.model = model
    self.api_url = api_url
    self.timeout = timeout
complete(system, messages, max_tokens=1000) async

Send a request to the LLM API and return the assistant reply.

Parameters:

Name Type Description Default
system str

System prompt string.

required
messages list[dict[str, str]]

Conversation turns in [{"role": ..., "content": ...}] form.

required
max_tokens int

Maximum tokens to generate.

1000

Returns:

Type Description
str

The text content of the first content block in the response.

Raises:

Type Description
RuntimeError

On non-2xx HTTP response.

Source code in src/dv_agentic/tools/llm/api.py
async def complete(
    self,
    system: str,
    messages: list[dict[str, str]],
    max_tokens: int = 1000,
) -> str:
    """Send a request to the LLM API and return the assistant reply.

    Args:
        system: System prompt string.
        messages: Conversation turns in ``[{"role": ..., "content": ...}]`` form.
        max_tokens: Maximum tokens to generate.

    Returns:
        The text content of the first content block in the response.

    Raises:
        RuntimeError: On non-2xx HTTP response.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self._post, system, messages, max_tokens)

Local LLM Client

local

Internal LLM client for the local/internal endpoint (OpenAI-compatible).

Reads LOCAL_LLM_BASE_URL and LOCAL_LLM_API_KEY from the environment, or accept them explicitly. The endpoint is assumed to follow the OpenAI Chat Completions API shape (POST /v1/chat/completions).

LocalLLMClient

Bases: BaseLLMClient

Calls an internal LLM endpoint that speaks OpenAI Chat Completions.

All network I/O runs in a thread-pool executor so the async caller is never blocked.

Source code in src/dv_agentic/tools/llm/local.py
class LocalLLMClient(BaseLLMClient):
    """Calls an internal LLM endpoint that speaks OpenAI Chat Completions.

    All network I/O runs in a thread-pool executor so the async caller
    is never blocked.
    """

    def __init__(
        self,
        base_url: str | None = None,
        api_key: str | None = None,
        model: str = "default",
        timeout: int = 120,
    ) -> None:
        """Initialise the client.

        Args:
            base_url: Base URL of the local LLM service, e.g.
                ``"http://localhost:8080"``.
                Falls back to ``LOCAL_LLM_BASE_URL`` environment variable.
            api_key: Bearer token for the internal service.
                Falls back to ``LOCAL_LLM_API_KEY`` environment variable.
            model: Model name to send in the request body.
            timeout: Socket timeout in seconds.
        """
        raw_url = base_url or os.environ.get("LOCAL_LLM_BASE_URL", "")
        if not raw_url:
            msg = (
                "LocalLLMClient requires 'base_url' or 'LOCAL_LLM_BASE_URL' "
                "environment variable to be set."
            )
            raise ValueError(msg)

        self.api_url = raw_url.rstrip("/") + "/v1/chat/completions"
        self.api_key = api_key or os.environ.get("LOCAL_LLM_API_KEY", "")
        self.model = model
        self.timeout = timeout

    async def complete(
        self,
        system: str,
        messages: list[dict[str, str]],
        max_tokens: int = 1000,
    ) -> str:
        """Send a chat-completion request to the local LLM endpoint.

        Args:
            system: System prompt string (prepended as a ``"system"`` role message).
            messages: Conversation turns in ``[{"role": ..., "content": ...}]`` form.
            max_tokens: Maximum tokens to generate.

        Returns:
            The assistant's reply text.

        Raises:
            RuntimeError: On non-2xx HTTP response or connection failure.
        """
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self._post, system, messages, max_tokens)

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    def _post(
        self,
        system: str,
        messages: list[dict[str, str]],
        max_tokens: int,
    ) -> str:
        """Blocking HTTP POST — runs in a thread-pool executor."""
        all_messages = [{"role": "system", "content": system}, *messages]
        payload = json.dumps(
            {
                "model": self.model,
                "max_tokens": max_tokens,
                "messages": all_messages,
            }
        ).encode()

        headers: dict[str, str] = {"Content-Type": "application/json"}
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"

        req = urllib.request.Request(  # noqa: S310
            self.api_url,
            data=payload,
            headers=headers,
            method="POST",
        )
        try:
            with urllib.request.urlopen(req, timeout=self.timeout) as resp:  # noqa: S310
                body: dict[str, Any] = json.loads(resp.read())
                return str(body["choices"][0]["message"]["content"])
        except urllib.error.HTTPError as exc:
            detail = exc.read().decode(errors="replace")
            raise RuntimeError(f"Local LLM API error {exc.code}: {detail}") from exc
        except urllib.error.URLError as exc:
            raise RuntimeError(f"Local LLM connection error: {exc.reason}") from exc
__init__(base_url=None, api_key=None, model='default', timeout=120)

Initialise the client.

Parameters:

Name Type Description Default
base_url str | None

Base URL of the local LLM service, e.g. "http://localhost:8080". Falls back to LOCAL_LLM_BASE_URL environment variable.

None
api_key str | None

Bearer token for the internal service. Falls back to LOCAL_LLM_API_KEY environment variable.

None
model str

Model name to send in the request body.

'default'
timeout int

Socket timeout in seconds.

120
Source code in src/dv_agentic/tools/llm/local.py
def __init__(
    self,
    base_url: str | None = None,
    api_key: str | None = None,
    model: str = "default",
    timeout: int = 120,
) -> None:
    """Initialise the client.

    Args:
        base_url: Base URL of the local LLM service, e.g.
            ``"http://localhost:8080"``.
            Falls back to ``LOCAL_LLM_BASE_URL`` environment variable.
        api_key: Bearer token for the internal service.
            Falls back to ``LOCAL_LLM_API_KEY`` environment variable.
        model: Model name to send in the request body.
        timeout: Socket timeout in seconds.
    """
    raw_url = base_url or os.environ.get("LOCAL_LLM_BASE_URL", "")
    if not raw_url:
        msg = (
            "LocalLLMClient requires 'base_url' or 'LOCAL_LLM_BASE_URL' "
            "environment variable to be set."
        )
        raise ValueError(msg)

    self.api_url = raw_url.rstrip("/") + "/v1/chat/completions"
    self.api_key = api_key or os.environ.get("LOCAL_LLM_API_KEY", "")
    self.model = model
    self.timeout = timeout
complete(system, messages, max_tokens=1000) async

Send a chat-completion request to the local LLM endpoint.

Parameters:

Name Type Description Default
system str

System prompt string (prepended as a "system" role message).

required
messages list[dict[str, str]]

Conversation turns in [{"role": ..., "content": ...}] form.

required
max_tokens int

Maximum tokens to generate.

1000

Returns:

Type Description
str

The assistant's reply text.

Raises:

Type Description
RuntimeError

On non-2xx HTTP response or connection failure.

Source code in src/dv_agentic/tools/llm/local.py
async def complete(
    self,
    system: str,
    messages: list[dict[str, str]],
    max_tokens: int = 1000,
) -> str:
    """Send a chat-completion request to the local LLM endpoint.

    Args:
        system: System prompt string (prepended as a ``"system"`` role message).
        messages: Conversation turns in ``[{"role": ..., "content": ...}]`` form.
        max_tokens: Maximum tokens to generate.

    Returns:
        The assistant's reply text.

    Raises:
        RuntimeError: On non-2xx HTTP response or connection failure.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self._post, system, messages, max_tokens)

Simulator Adapters

These classes adapt Cocotb, pyuvm, and various logic simulators (like GHDL, Icarus, Verilator, Xcelium, and IMC) to be controlled programmatically by our agents.

Cocotb Base Adapter

cocotb_base

Base class for cocotb.runner based simulator adapters.

CocotbBaseAdapter

Bases: SimulatorTool

Base class for simulators using cocotb.runner infrastructure.

Source code in src/dv_agentic/tools/adapters/cocotb_base.py
class CocotbBaseAdapter(SimulatorTool):
    """Base class for simulators using cocotb.runner infrastructure."""

    def __init__(
        self,
        simulator: str,
        hdl_toplevel: str = "top",
        hdl_toplevel_lang: str | None = None,
    ) -> None:
        """Initialize the cocotb base adapter.

        Args:
            simulator: Name of the simulator (e.g., "icarus", "verilator", "ghdl").
            hdl_toplevel: Name of the HDL top-level module.
            hdl_toplevel_lang: Language of the top-level module ("verilog" or "vhdl").
        """
        self.simulator = simulator
        self.hdl_toplevel = hdl_toplevel
        self.hdl_toplevel_lang = hdl_toplevel_lang

    def _get_runner(self) -> Any:
        if get_runner is None:
            raise ImportError("cocotb.runner is not available on this platform.")
        return get_runner(self.simulator)

    def compile(self, file_list: list[str], top: str) -> CompileResult:
        """Compile HDL source files using the cocotb runner.

        Args:
            file_list: List of paths to Verilog/SystemVerilog or VHDL source files.
            top: Name of the HDL top-level module to build.

        Returns:
            A :class:`CompileResult` indicating success or failure with logs.
        """
        self.hdl_toplevel = top
        runner = self._get_runner()
        try:
            # Runners are selected based on the 'simulator' attribute.
            # Sources are divided into verilog_sources and vhdl_sources by extension.
            # Subclasses can override hdl_toplevel_lang to force a specific mode.
            build_kwargs: dict[str, Any] = {
                "hdl_toplevel": top,
                "always": True,
            }

            v_files = [f for f in file_list if f.endswith((".v", ".sv"))]
            vhdl_files = [f for f in file_list if f.endswith((".vhd", ".vhdl"))]

            if v_files:
                build_kwargs["verilog_sources"] = v_files
            if vhdl_files:
                build_kwargs["vhdl_sources"] = vhdl_files

            if self.hdl_toplevel_lang:
                build_kwargs["hdl_toplevel_lang"] = self.hdl_toplevel_lang

            runner.build(**build_kwargs)
            return CompileResult(
                status="pass",
                output=f"{self.simulator.capitalize()} build successful.",
            )
        except Exception as e:
            logger.exception("%s build failed", self.simulator.capitalize())
            return CompileResult(status="fail", output=str(e))

    def run(self, test: str, seed: int, debug: bool) -> SimResult:
        """Run a cocotb simulation test case.

        Args:
            test: Test case identifier, either "module" or "module.testcase".
            seed: Random seed for the simulation.
            debug: If True, enable waveform dumping (if supported by the simulator).

        Returns:
            A :class:`SimResult` containing the status and path to the simulation log.
        """
        env = os.environ.copy()
        env.update(
            {
                "SIM": self.simulator,
                "RANDOM_SEED": str(seed),
            }
        )
        log_path = f"sim_{test}_{seed}.log"

        # Robust testcase parsing: handle "module" or "module.testcase"
        parts = test.split(".", 1)
        test_module = parts[0]
        testcase = parts[1] if len(parts) > 1 else None

        try:
            runner = self._get_runner()
            runner.test(
                hdl_toplevel=self.hdl_toplevel,
                hdl_toplevel_lang=self.hdl_toplevel_lang,
                test_module=test_module,
                testcase=testcase,
                waves=debug,
                extra_env=env,
            )

            return SimResult(
                status="pass",
                job_id=f"{test}_{seed}",
                log_path=log_path,
            )
        except Exception as e:
            logger.exception(
                "%s simulation failed for test '%s'",
                self.simulator.capitalize(),
                test,
            )
            return SimResult(
                status="fail",
                job_id=f"{test}_{seed}",
                log_path=log_path,
                error_summary=str(e),
            )
__init__(simulator, hdl_toplevel='top', hdl_toplevel_lang=None)

Initialize the cocotb base adapter.

Parameters:

Name Type Description Default
simulator str

Name of the simulator (e.g., "icarus", "verilator", "ghdl").

required
hdl_toplevel str

Name of the HDL top-level module.

'top'
hdl_toplevel_lang str | None

Language of the top-level module ("verilog" or "vhdl").

None
Source code in src/dv_agentic/tools/adapters/cocotb_base.py
def __init__(
    self,
    simulator: str,
    hdl_toplevel: str = "top",
    hdl_toplevel_lang: str | None = None,
) -> None:
    """Initialize the cocotb base adapter.

    Args:
        simulator: Name of the simulator (e.g., "icarus", "verilator", "ghdl").
        hdl_toplevel: Name of the HDL top-level module.
        hdl_toplevel_lang: Language of the top-level module ("verilog" or "vhdl").
    """
    self.simulator = simulator
    self.hdl_toplevel = hdl_toplevel
    self.hdl_toplevel_lang = hdl_toplevel_lang
compile(file_list, top)

Compile HDL source files using the cocotb runner.

Parameters:

Name Type Description Default
file_list list[str]

List of paths to Verilog/SystemVerilog or VHDL source files.

required
top str

Name of the HDL top-level module to build.

required

Returns:

Name Type Description
A CompileResult

class:CompileResult indicating success or failure with logs.

Source code in src/dv_agentic/tools/adapters/cocotb_base.py
def compile(self, file_list: list[str], top: str) -> CompileResult:
    """Compile HDL source files using the cocotb runner.

    Args:
        file_list: List of paths to Verilog/SystemVerilog or VHDL source files.
        top: Name of the HDL top-level module to build.

    Returns:
        A :class:`CompileResult` indicating success or failure with logs.
    """
    self.hdl_toplevel = top
    runner = self._get_runner()
    try:
        # Runners are selected based on the 'simulator' attribute.
        # Sources are divided into verilog_sources and vhdl_sources by extension.
        # Subclasses can override hdl_toplevel_lang to force a specific mode.
        build_kwargs: dict[str, Any] = {
            "hdl_toplevel": top,
            "always": True,
        }

        v_files = [f for f in file_list if f.endswith((".v", ".sv"))]
        vhdl_files = [f for f in file_list if f.endswith((".vhd", ".vhdl"))]

        if v_files:
            build_kwargs["verilog_sources"] = v_files
        if vhdl_files:
            build_kwargs["vhdl_sources"] = vhdl_files

        if self.hdl_toplevel_lang:
            build_kwargs["hdl_toplevel_lang"] = self.hdl_toplevel_lang

        runner.build(**build_kwargs)
        return CompileResult(
            status="pass",
            output=f"{self.simulator.capitalize()} build successful.",
        )
    except Exception as e:
        logger.exception("%s build failed", self.simulator.capitalize())
        return CompileResult(status="fail", output=str(e))
run(test, seed, debug)

Run a cocotb simulation test case.

Parameters:

Name Type Description Default
test str

Test case identifier, either "module" or "module.testcase".

required
seed int

Random seed for the simulation.

required
debug bool

If True, enable waveform dumping (if supported by the simulator).

required

Returns:

Name Type Description
A SimResult

class:SimResult containing the status and path to the simulation log.

Source code in src/dv_agentic/tools/adapters/cocotb_base.py
def run(self, test: str, seed: int, debug: bool) -> SimResult:
    """Run a cocotb simulation test case.

    Args:
        test: Test case identifier, either "module" or "module.testcase".
        seed: Random seed for the simulation.
        debug: If True, enable waveform dumping (if supported by the simulator).

    Returns:
        A :class:`SimResult` containing the status and path to the simulation log.
    """
    env = os.environ.copy()
    env.update(
        {
            "SIM": self.simulator,
            "RANDOM_SEED": str(seed),
        }
    )
    log_path = f"sim_{test}_{seed}.log"

    # Robust testcase parsing: handle "module" or "module.testcase"
    parts = test.split(".", 1)
    test_module = parts[0]
    testcase = parts[1] if len(parts) > 1 else None

    try:
        runner = self._get_runner()
        runner.test(
            hdl_toplevel=self.hdl_toplevel,
            hdl_toplevel_lang=self.hdl_toplevel_lang,
            test_module=test_module,
            testcase=testcase,
            waves=debug,
            extra_env=env,
        )

        return SimResult(
            status="pass",
            job_id=f"{test}_{seed}",
            log_path=log_path,
        )
    except Exception as e:
        logger.exception(
            "%s simulation failed for test '%s'",
            self.simulator.capitalize(),
            test,
        )
        return SimResult(
            status="fail",
            job_id=f"{test}_{seed}",
            log_path=log_path,
            error_summary=str(e),
        )

pyuvm Adapter

pyuvm

Adapter for pyuvm functional coverage (External/Open-source environment).

PyuvmCoverageAdapter

Bases: CoverageTool

Coverage adapter for pyuvm (External environment).

Parses text-based coverage reports or logs generated by pyuvm testbenches using tools like cocotb-coverage or custom UVMCoverage subscribers.

Source code in src/dv_agentic/tools/adapters/pyuvm.py
class PyuvmCoverageAdapter(CoverageTool):
    """Coverage adapter for pyuvm (External environment).

    Parses text-based coverage reports or logs generated by pyuvm testbenches
    using tools like cocotb-coverage or custom UVMCoverage subscribers.
    """

    def __init__(
        self,
        default_report_path: str = "coverage.txt",
    ) -> None:
        """Initialize the pyuvm coverage adapter.

        Args:
            default_report_path: Default file to parse if no job-specific
                report is found.

        """
        self.default_report_path = default_report_path

    def get_coverage(self, job_id: str) -> CoverageDB:
        """Retrieve coverage results for a specific pyuvm job.

        Args:
            job_id: The ID of the simulation job.  Looks for a log or report
                file named ``sim_{job_id}.log`` or the default report path.

        """
        # Try job-specific log first, then fallback to default report
        possible_paths = [
            Path(f"sim_{job_id}.log"),
            Path(self.default_report_path),
        ]

        for path in possible_paths:
            if path.exists():
                logger.info("Parsing pyuvm coverage from %s", path)
                try:
                    content = path.read_text()
                    pct = self._parse_total(content)
                    if pct is not None:
                        return CoverageDB(path=str(path), overall_percentage=pct)
                except Exception:
                    logger.exception("Failed to read coverage file: %s", path)

        logger.warning("No coverage report found for job_id: %s", job_id)
        return CoverageDB(path="", overall_percentage=0.0)

    def _parse_total(self, output: str) -> float | None:
        """Extract the total coverage percentage from the output string."""
        match = _PYUVM_COV_RE.search(output)
        if match:
            return float(match.group(1))
        return None
__init__(default_report_path='coverage.txt')

Initialize the pyuvm coverage adapter.

Parameters:

Name Type Description Default
default_report_path str

Default file to parse if no job-specific report is found.

'coverage.txt'
Source code in src/dv_agentic/tools/adapters/pyuvm.py
def __init__(
    self,
    default_report_path: str = "coverage.txt",
) -> None:
    """Initialize the pyuvm coverage adapter.

    Args:
        default_report_path: Default file to parse if no job-specific
            report is found.

    """
    self.default_report_path = default_report_path
get_coverage(job_id)

Retrieve coverage results for a specific pyuvm job.

Parameters:

Name Type Description Default
job_id str

The ID of the simulation job. Looks for a log or report file named sim_{job_id}.log or the default report path.

required
Source code in src/dv_agentic/tools/adapters/pyuvm.py
def get_coverage(self, job_id: str) -> CoverageDB:
    """Retrieve coverage results for a specific pyuvm job.

    Args:
        job_id: The ID of the simulation job.  Looks for a log or report
            file named ``sim_{job_id}.log`` or the default report path.

    """
    # Try job-specific log first, then fallback to default report
    possible_paths = [
        Path(f"sim_{job_id}.log"),
        Path(self.default_report_path),
    ]

    for path in possible_paths:
        if path.exists():
            logger.info("Parsing pyuvm coverage from %s", path)
            try:
                content = path.read_text()
                pct = self._parse_total(content)
                if pct is not None:
                    return CoverageDB(path=str(path), overall_percentage=pct)
            except Exception:
                logger.exception("Failed to read coverage file: %s", path)

    logger.warning("No coverage report found for job_id: %s", job_id)
    return CoverageDB(path="", overall_percentage=0.0)

Simulator Specifics

ghdl_cocotb

Adapter for GHDL + cocotb + pyuvm (External/Open-source environment).

GHDLCocotbAdapter

Bases: CocotbBaseAdapter

Adapter for GHDL + cocotb + pyuvm (External Environment).

Source code in src/dv_agentic/tools/adapters/ghdl_cocotb.py
class GHDLCocotbAdapter(CocotbBaseAdapter):
    """Adapter for GHDL + cocotb + pyuvm (External Environment)."""

    def __init__(
        self,
        hdl_toplevel: str = "top",
        hdl_toplevel_lang: str = "vhdl",
    ) -> None:
        """Initialize GHDL cocotb adapter.

        Args:
            hdl_toplevel: Name of the HDL top-level module.
            hdl_toplevel_lang: Language of the top-level module (usually ``"vhdl"``).

        """
        super().__init__(
            simulator="ghdl",
            hdl_toplevel=hdl_toplevel,
            hdl_toplevel_lang=hdl_toplevel_lang,
        )
__init__(hdl_toplevel='top', hdl_toplevel_lang='vhdl')

Initialize GHDL cocotb adapter.

Parameters:

Name Type Description Default
hdl_toplevel str

Name of the HDL top-level module.

'top'
hdl_toplevel_lang str

Language of the top-level module (usually "vhdl").

'vhdl'
Source code in src/dv_agentic/tools/adapters/ghdl_cocotb.py
def __init__(
    self,
    hdl_toplevel: str = "top",
    hdl_toplevel_lang: str = "vhdl",
) -> None:
    """Initialize GHDL cocotb adapter.

    Args:
        hdl_toplevel: Name of the HDL top-level module.
        hdl_toplevel_lang: Language of the top-level module (usually ``"vhdl"``).

    """
    super().__init__(
        simulator="ghdl",
        hdl_toplevel=hdl_toplevel,
        hdl_toplevel_lang=hdl_toplevel_lang,
    )
icarus

Adapter for Icarus Verilog simulator (External/Open-source environment).

IcarusAdapter

Bases: CocotbBaseAdapter

Adapter for Icarus Verilog simulator (External Environment).

Source code in src/dv_agentic/tools/adapters/icarus.py
class IcarusAdapter(CocotbBaseAdapter):
    """Adapter for Icarus Verilog simulator (External Environment)."""

    def __init__(
        self,
        hdl_toplevel: str = "top",
    ) -> None:
        """Initialize Icarus adapter.

        Args:
            hdl_toplevel: Name of the HDL top-level module.

        """
        super().__init__(simulator="icarus", hdl_toplevel=hdl_toplevel)
__init__(hdl_toplevel='top')

Initialize Icarus adapter.

Parameters:

Name Type Description Default
hdl_toplevel str

Name of the HDL top-level module.

'top'
Source code in src/dv_agentic/tools/adapters/icarus.py
def __init__(
    self,
    hdl_toplevel: str = "top",
) -> None:
    """Initialize Icarus adapter.

    Args:
        hdl_toplevel: Name of the HDL top-level module.

    """
    super().__init__(simulator="icarus", hdl_toplevel=hdl_toplevel)
verilator

Adapter for Verilator simulator (External/Open-source environment).

VerilatorAdapter

Bases: CocotbBaseAdapter

Adapter for Verilator simulator (External Environment).

Source code in src/dv_agentic/tools/adapters/verilator.py
class VerilatorAdapter(CocotbBaseAdapter):
    """Adapter for Verilator simulator (External Environment)."""

    def __init__(
        self,
        hdl_toplevel: str = "top",
    ) -> None:
        """Initialize Verilator adapter.

        Args:
            hdl_toplevel: Name of the HDL top-level module.

        """
        super().__init__(simulator="verilator", hdl_toplevel=hdl_toplevel)
__init__(hdl_toplevel='top')

Initialize Verilator adapter.

Parameters:

Name Type Description Default
hdl_toplevel str

Name of the HDL top-level module.

'top'
Source code in src/dv_agentic/tools/adapters/verilator.py
def __init__(
    self,
    hdl_toplevel: str = "top",
) -> None:
    """Initialize Verilator adapter.

    Args:
        hdl_toplevel: Name of the HDL top-level module.

    """
    super().__init__(simulator="verilator", hdl_toplevel=hdl_toplevel)
xcelium

Adapter for Cadence Xcelium simulator (Internal environment).

XceliumAdapter

Bases: SimulatorTool

Adapter for Cadence Xcelium simulator (Internal Environment).

Source code in src/dv_agentic/tools/adapters/xcelium.py
class XceliumAdapter(SimulatorTool):
    """Adapter for Cadence Xcelium simulator (Internal Environment)."""

    def __init__(
        self,
        xrun_path: str = "xrun",
        collect_coverage: bool = True,
        cov_work_dir: str = "cov_work",
    ) -> None:
        """Initialize Xcelium adapter.

        Args:
            xrun_path: Path to the xrun binary.
            collect_coverage: Whether to instrument the simulation for coverage.
                Set to ``False`` for quick smoke runs that skip IMC collection.
            cov_work_dir: Root directory for per-job coverage DBs.
                Each run writes to ``{cov_work_dir}/{job_id}/``.

        """
        self.xrun_path = xrun_path
        self.collect_coverage = collect_coverage
        self.cov_work_dir = cov_work_dir

    def compile(self, file_list: list[str], top: str) -> CompileResult:
        """Compile the source files using xrun -compile."""
        cmd = [self.xrun_path, "-compile", "-elaborate", "-64bit", "-uvm", "-top", top, *file_list]

        try:
            result = subprocess.run(cmd, capture_output=True, text=True)  # noqa: S603
            status: Literal["pass", "fail"] = "pass" if result.returncode == 0 else "fail"

            return CompileResult(
                status=status,
                output=result.stdout + result.stderr,
            )
        except (subprocess.SubprocessError, FileNotFoundError) as e:
            logger.exception("Xcelium compile failed")
            return CompileResult(status="fail", output=str(e))

    def run(self, test: str, seed: int, debug: bool) -> SimResult:
        """Run a simulation using xrun -run."""
        cmd = [
            self.xrun_path,
            "-run",
            "-64bit",
            "-uvm",
            f"+UVM_TESTNAME={test}",
            f"+ntc_seed={seed}",
            "-l",
            f"sim_{test}_{seed}.log",
        ]

        if debug:
            cmd.extend(["-access", "+rwc", "-gui"])

        # Coverage instrumentation is delegated to IMCAdapter for analysis.
        # -covworkdir scopes each run to its own directory so merges are clean.
        cov_db_path: str | None = None
        if self.collect_coverage:
            cov_db_path = f"{self.cov_work_dir}/{test}_{seed}"
            cmd.extend(["-coverage", "all", "-covworkdir", cov_db_path, "-covoverwrite"])

        try:
            result = subprocess.run(  # noqa: S603
                cmd, capture_output=True, text=True, timeout=3600
            )
            status: Literal["pass", "fail"] = "pass" if result.returncode == 0 else "fail"
            log_path = f"sim_{test}_{seed}.log"
            error_summary = self._parse_errors(result.stdout + result.stderr)

            return SimResult(
                status=status,
                job_id=f"{test}_{seed}",
                log_path=log_path,
                error_summary=error_summary,
                cov_db_path=cov_db_path,
            )
        except subprocess.TimeoutExpired:
            logger.warning("Xcelium simulation timed out for test '%s' seed=%d", test, seed)
            return SimResult(
                status="timeout", job_id=f"{test}_{seed}", log_path="", cov_db_path=None
            )

    def _parse_errors(self, output: str) -> str | None:
        """Parse Xcelium-specific error patterns."""
        # Xcelium errors typically start with *E or *F
        error_pattern = r"\*E,(\w+): (.*)"
        matches = re.findall(error_pattern, output)
        if matches:
            return "\n".join(f"{code}: {msg}" for code, msg in matches)

        # Also check for UVM_ERROR
        uvm_error_pattern = r"UVM_ERROR @ (.*)"
        uvm_matches = re.findall(uvm_error_pattern, output)
        if uvm_matches:
            return "\n".join(uvm_matches)

        return None
__init__(xrun_path='xrun', collect_coverage=True, cov_work_dir='cov_work')

Initialize Xcelium adapter.

Parameters:

Name Type Description Default
xrun_path str

Path to the xrun binary.

'xrun'
collect_coverage bool

Whether to instrument the simulation for coverage. Set to False for quick smoke runs that skip IMC collection.

True
cov_work_dir str

Root directory for per-job coverage DBs. Each run writes to {cov_work_dir}/{job_id}/.

'cov_work'
Source code in src/dv_agentic/tools/adapters/xcelium.py
def __init__(
    self,
    xrun_path: str = "xrun",
    collect_coverage: bool = True,
    cov_work_dir: str = "cov_work",
) -> None:
    """Initialize Xcelium adapter.

    Args:
        xrun_path: Path to the xrun binary.
        collect_coverage: Whether to instrument the simulation for coverage.
            Set to ``False`` for quick smoke runs that skip IMC collection.
        cov_work_dir: Root directory for per-job coverage DBs.
            Each run writes to ``{cov_work_dir}/{job_id}/``.

    """
    self.xrun_path = xrun_path
    self.collect_coverage = collect_coverage
    self.cov_work_dir = cov_work_dir
compile(file_list, top)

Compile the source files using xrun -compile.

Source code in src/dv_agentic/tools/adapters/xcelium.py
def compile(self, file_list: list[str], top: str) -> CompileResult:
    """Compile the source files using xrun -compile."""
    cmd = [self.xrun_path, "-compile", "-elaborate", "-64bit", "-uvm", "-top", top, *file_list]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True)  # noqa: S603
        status: Literal["pass", "fail"] = "pass" if result.returncode == 0 else "fail"

        return CompileResult(
            status=status,
            output=result.stdout + result.stderr,
        )
    except (subprocess.SubprocessError, FileNotFoundError) as e:
        logger.exception("Xcelium compile failed")
        return CompileResult(status="fail", output=str(e))
run(test, seed, debug)

Run a simulation using xrun -run.

Source code in src/dv_agentic/tools/adapters/xcelium.py
def run(self, test: str, seed: int, debug: bool) -> SimResult:
    """Run a simulation using xrun -run."""
    cmd = [
        self.xrun_path,
        "-run",
        "-64bit",
        "-uvm",
        f"+UVM_TESTNAME={test}",
        f"+ntc_seed={seed}",
        "-l",
        f"sim_{test}_{seed}.log",
    ]

    if debug:
        cmd.extend(["-access", "+rwc", "-gui"])

    # Coverage instrumentation is delegated to IMCAdapter for analysis.
    # -covworkdir scopes each run to its own directory so merges are clean.
    cov_db_path: str | None = None
    if self.collect_coverage:
        cov_db_path = f"{self.cov_work_dir}/{test}_{seed}"
        cmd.extend(["-coverage", "all", "-covworkdir", cov_db_path, "-covoverwrite"])

    try:
        result = subprocess.run(  # noqa: S603
            cmd, capture_output=True, text=True, timeout=3600
        )
        status: Literal["pass", "fail"] = "pass" if result.returncode == 0 else "fail"
        log_path = f"sim_{test}_{seed}.log"
        error_summary = self._parse_errors(result.stdout + result.stderr)

        return SimResult(
            status=status,
            job_id=f"{test}_{seed}",
            log_path=log_path,
            error_summary=error_summary,
            cov_db_path=cov_db_path,
        )
    except subprocess.TimeoutExpired:
        logger.warning("Xcelium simulation timed out for test '%s' seed=%d", test, seed)
        return SimResult(
            status="timeout", job_id=f"{test}_{seed}", log_path="", cov_db_path=None
        )
imc

Adapter for Cadence IMC 24.06 + Verisium 25.12 coverage tools (Internal environment).

Workflow
  1. XceliumAdapter.run() writes a per-job coverage DB to {cov_work_dir}/{job_id}/ (one directory per simulation run).
  2. IMCAdapter.get_coverage(job_id) loads that directory and returns a CoverageDB summary.
  3. IMCAdapter.merge(job_ids) merges multiple runs into a single aggregated DB, then optionally invokes Verisium vsif for enterprise-level aggregation reports.
Assumed tool versions
  • IMC : 24.06.a001
  • Verisium : 25.12.081
  • OS : RHEL 8.4
IMCAdapter

Bases: CoverageTool

Coverage adapter for IMC 24.06 + Verisium 25.12 (Internal environment).

Paired with XceliumAdapter — both must share the same cov_work_dir so that job IDs resolve to the same filesystem paths.

Source code in src/dv_agentic/tools/adapters/imc.py
class IMCAdapter(CoverageTool):
    """Coverage adapter for IMC 24.06 + Verisium 25.12 (Internal environment).

    Paired with ``XceliumAdapter`` — both must share the same ``cov_work_dir``
    so that job IDs resolve to the same filesystem paths.
    """

    def __init__(
        self,
        imc_path: str = "imc",
        vsif_path: str = "vsif",
        cov_work_dir: str = "cov_work",
    ) -> None:
        """Initialize the IMC coverage adapter.

        Args:
            imc_path: Path to the ``imc`` binary (Cadence IMC 24.06).
            vsif_path: Path to the ``vsif`` binary (Verisium 25.12).
            cov_work_dir: Root directory where ``XceliumAdapter`` writes
                per-job coverage DBs.  Each job writes to
                ``{cov_work_dir}/{job_id}/``.

        """
        self.imc_path = imc_path
        self.vsif_path = vsif_path
        self.cov_work_dir = cov_work_dir

    # ------------------------------------------------------------------
    # CoverageTool ABC
    # ------------------------------------------------------------------

    def get_coverage(self, job_id: str) -> CoverageDB:
        """Load a single run's coverage DB and return a summary.

        Args:
            job_id: Simulation job identifier produced by ``XceliumAdapter``,
                e.g. ``"my_test_42"``.  The coverage DB is expected at
                ``{cov_work_dir}/{job_id}/``.

        """
        db_path = Path(self.cov_work_dir) / job_id
        return self._report(db_path)

    # ------------------------------------------------------------------
    # IMC / Verisium-specific: multi-run merge
    # ------------------------------------------------------------------

    def merge(
        self,
        job_ids: list[str],
        merged_dir: str = "cov_merged",
        use_verisium: bool = False,
    ) -> CoverageDB:
        """Merge coverage from multiple simulation runs into one aggregated DB.

        Runs ``imc -load <dir1> <dir2> ... -merge <merged_dir> -exit`` to
        aggregate per-job coverage DBs, then optionally invokes Verisium
        ``vsif`` for cross-session enterprise-level reporting.

        Args:
            job_ids: Job IDs whose coverage DBs will be merged.
            merged_dir: Destination directory for the aggregated DB.
            use_verisium: If ``True``, run ``vsif run <merged_dir>/merge.vsif``
                after the IMC merge step.  The ``.vsif`` file must already
                exist in ``merged_dir`` (typically hand-authored or generated
                by a prior Verisium session).

        """
        load_dirs = [str(Path(self.cov_work_dir) / jid) for jid in job_ids]
        cmd = [self.imc_path, "-64", "-load", *load_dirs, "-merge", merged_dir, "-exit"]
        logger.info("IMC merge: %d run(s) → %s", len(job_ids), merged_dir)

        try:
            result = subprocess.run(  # noqa: S603
                cmd, capture_output=True, text=True, timeout=600
            )
            if result.returncode != 0:
                logger.error("IMC merge failed:\n%s", result.stderr)
        except (subprocess.SubprocessError, FileNotFoundError):
            logger.exception("IMC merge invocation failed")

        if use_verisium:
            self._verisium_merge(merged_dir)

        return self._report(Path(merged_dir))

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _report(self, db_path: Path) -> CoverageDB:
        """Run ``imc -reportstats`` on *db_path* and parse total coverage %.

        Args:
            db_path: Path to an IMC coverage DB directory.

        """
        cmd = [
            self.imc_path,
            "-64",
            "-load",
            str(db_path),
            "-reportstats",
            "-exit",
        ]
        try:
            result = subprocess.run(  # noqa: S603
                cmd, capture_output=True, text=True, timeout=120
            )
            pct = self._parse_total(result.stdout + result.stderr)
            if pct is None:
                logger.warning(
                    "IMC report: could not parse total coverage percentage from output "
                    "(db_path=%s)",
                    db_path,
                )
                pct = 0.0
        except (subprocess.SubprocessError, FileNotFoundError):
            logger.exception("IMC report invocation failed for '%s'", db_path)
            pct = 0.0

        return CoverageDB(path=str(db_path), overall_percentage=pct)

    def _parse_total(self, output: str) -> float | None:
        """Extract the total coverage percentage from IMC reportstats output.

        Handles common IMC 24.06 output variants:

        - ``Cumulative coverage result: 87.65 %``
        - ``Total coverage: 82.35%``
        - ``Overall coverage: 79.10 %``

        Args:
            output: Raw stdout + stderr from the ``imc`` process.

        """
        match = _TOTAL_COV_RE.search(output)
        if match:
            return float(match.group(1))
        return None

    def _verisium_merge(self, merged_dir: str) -> None:
        """Invoke Verisium ``vsif run`` on the ``.vsif`` file in *merged_dir*.

        The ``.vsif`` file encodes which merged DB directories to aggregate and
        which Verisium report templates to apply.  It must exist at
        ``{merged_dir}/merge.vsif`` before calling this method.

        Args:
            merged_dir: Directory containing the ``merge.vsif`` file.

        """
        vsif_file = Path(merged_dir) / "merge.vsif"
        if not vsif_file.exists():
            logger.warning(
                "Verisium merge skipped: %s not found.  Create or copy a .vsif file there first.",
                vsif_file,
            )
            return

        cmd = [self.vsif_path, "run", str(vsif_file)]
        logger.info("Verisium vsif: running %s", vsif_file)
        try:
            result = subprocess.run(  # noqa: S603
                cmd, capture_output=True, text=True, timeout=1800
            )
            if result.returncode != 0:
                logger.error("Verisium vsif failed:\n%s", result.stderr)
        except (subprocess.SubprocessError, FileNotFoundError):
            logger.exception("Verisium vsif invocation failed")
__init__(imc_path='imc', vsif_path='vsif', cov_work_dir='cov_work')

Initialize the IMC coverage adapter.

Parameters:

Name Type Description Default
imc_path str

Path to the imc binary (Cadence IMC 24.06).

'imc'
vsif_path str

Path to the vsif binary (Verisium 25.12).

'vsif'
cov_work_dir str

Root directory where XceliumAdapter writes per-job coverage DBs. Each job writes to {cov_work_dir}/{job_id}/.

'cov_work'
Source code in src/dv_agentic/tools/adapters/imc.py
def __init__(
    self,
    imc_path: str = "imc",
    vsif_path: str = "vsif",
    cov_work_dir: str = "cov_work",
) -> None:
    """Initialize the IMC coverage adapter.

    Args:
        imc_path: Path to the ``imc`` binary (Cadence IMC 24.06).
        vsif_path: Path to the ``vsif`` binary (Verisium 25.12).
        cov_work_dir: Root directory where ``XceliumAdapter`` writes
            per-job coverage DBs.  Each job writes to
            ``{cov_work_dir}/{job_id}/``.

    """
    self.imc_path = imc_path
    self.vsif_path = vsif_path
    self.cov_work_dir = cov_work_dir
get_coverage(job_id)

Load a single run's coverage DB and return a summary.

Parameters:

Name Type Description Default
job_id str

Simulation job identifier produced by XceliumAdapter, e.g. "my_test_42". The coverage DB is expected at {cov_work_dir}/{job_id}/.

required
Source code in src/dv_agentic/tools/adapters/imc.py
def get_coverage(self, job_id: str) -> CoverageDB:
    """Load a single run's coverage DB and return a summary.

    Args:
        job_id: Simulation job identifier produced by ``XceliumAdapter``,
            e.g. ``"my_test_42"``.  The coverage DB is expected at
            ``{cov_work_dir}/{job_id}/``.

    """
    db_path = Path(self.cov_work_dir) / job_id
    return self._report(db_path)
merge(job_ids, merged_dir='cov_merged', use_verisium=False)

Merge coverage from multiple simulation runs into one aggregated DB.

Runs imc -load <dir1> <dir2> ... -merge <merged_dir> -exit to aggregate per-job coverage DBs, then optionally invokes Verisium vsif for cross-session enterprise-level reporting.

Parameters:

Name Type Description Default
job_ids list[str]

Job IDs whose coverage DBs will be merged.

required
merged_dir str

Destination directory for the aggregated DB.

'cov_merged'
use_verisium bool

If True, run vsif run <merged_dir>/merge.vsif after the IMC merge step. The .vsif file must already exist in merged_dir (typically hand-authored or generated by a prior Verisium session).

False
Source code in src/dv_agentic/tools/adapters/imc.py
def merge(
    self,
    job_ids: list[str],
    merged_dir: str = "cov_merged",
    use_verisium: bool = False,
) -> CoverageDB:
    """Merge coverage from multiple simulation runs into one aggregated DB.

    Runs ``imc -load <dir1> <dir2> ... -merge <merged_dir> -exit`` to
    aggregate per-job coverage DBs, then optionally invokes Verisium
    ``vsif`` for cross-session enterprise-level reporting.

    Args:
        job_ids: Job IDs whose coverage DBs will be merged.
        merged_dir: Destination directory for the aggregated DB.
        use_verisium: If ``True``, run ``vsif run <merged_dir>/merge.vsif``
            after the IMC merge step.  The ``.vsif`` file must already
            exist in ``merged_dir`` (typically hand-authored or generated
            by a prior Verisium session).

    """
    load_dirs = [str(Path(self.cov_work_dir) / jid) for jid in job_ids]
    cmd = [self.imc_path, "-64", "-load", *load_dirs, "-merge", merged_dir, "-exit"]
    logger.info("IMC merge: %d run(s) → %s", len(job_ids), merged_dir)

    try:
        result = subprocess.run(  # noqa: S603
            cmd, capture_output=True, text=True, timeout=600
        )
        if result.returncode != 0:
            logger.error("IMC merge failed:\n%s", result.stderr)
    except (subprocess.SubprocessError, FileNotFoundError):
        logger.exception("IMC merge invocation failed")

    if use_verisium:
        self._verisium_merge(merged_dir)

    return self._report(Path(merged_dir))