Skip to content

Runner API Reference

Auto-generated API documentation for the runner module.

runner

Structured subprocess execution — zero dependencies, stdlib only, Python 3.10+.

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

Run external commands with controlled execution: timeouts with graceful kill escalation, streaming output, environment isolation, and cross-platform support. Designed as a building block (Layer 1) for higher-level execution frameworks.

Quick start::

from runner import run

result = run("echo hello world")
print(result.stdout)          # "hello world

" print(result.returncode) # 0 print(result.duration) # 0.003 (seconds)

Streaming output::

from runner import stream

with stream(["make", "build"]) as proc:
    for line in proc.iter_lines():
        print(f"[build] {line}", end="")

Async execution::

import asyncio
from runner import run_async

result = asyncio.run(run_async("ls -la"))

Requires Python 3.10+.

RunnerError

Bases: Exception

Base exception for all runner operations.

Source code in runner/runner.py
class RunnerError(Exception):
    """Base exception for all runner operations."""

CommandNotFoundError

Bases: RunnerError

Raised when the command binary cannot be located on PATH.

Attributes:

Name Type Description
name

The command name that was not found.

Source code in runner/runner.py
class CommandNotFoundError(RunnerError):
    """Raised when the command binary cannot be located on PATH.

    Attributes:
        name: The command name that was not found.
    """

    def __init__(self, name: str) -> None:
        self.name = name
        super().__init__(f"Command not found: {name}")

CommandFailedError

Bases: RunnerError

Raised when a command exits with a disallowed return code.

Attributes:

Name Type Description
result

The full RunResult including stdout, stderr, returncode, duration.

Source code in runner/runner.py
class CommandFailedError(RunnerError):
    """Raised when a command exits with a disallowed return code.

    Attributes:
        result: The full RunResult including stdout, stderr, returncode,
            duration.
    """

    def __init__(self, result: RunResult) -> None:
        self.result = result
        cmd_str = " ".join(result.command)
        msg = f"Command failed (rc={result.returncode}): {cmd_str}"
        if result.stderr.strip():
            msg += f"\n{result.stderr.rstrip()}"
        super().__init__(msg)

CommandTimeoutError

Bases: RunnerError

Raised when a command exceeds its timeout.

Attributes:

Name Type Description
command

The command that timed out.

timeout

The timeout value in seconds.

partial_stdout

Any stdout captured before the timeout.

partial_stderr

Any stderr captured before the timeout.

Source code in runner/runner.py
class CommandTimeoutError(RunnerError):
    """Raised when a command exceeds its timeout.

    Attributes:
        command: The command that timed out.
        timeout: The timeout value in seconds.
        partial_stdout: Any stdout captured before the timeout.
        partial_stderr: Any stderr captured before the timeout.
    """

    def __init__(
        self,
        command: tuple[str, ...],
        timeout: float,
        partial_stdout: str = "",
        partial_stderr: str = "",
    ) -> None:
        self.command = command
        self.timeout = timeout
        self.partial_stdout = partial_stdout
        self.partial_stderr = partial_stderr
        cmd_str = " ".join(command)
        super().__init__(f"Command timed out after {timeout}s: {cmd_str}")

CommandBlockedError

Bases: RunnerError

Raised when a command is rejected by allowlist/blocklist policy.

Attributes:

Name Type Description
command

The rejected command name.

reason

Human-readable explanation.

Source code in runner/runner.py
class CommandBlockedError(RunnerError):
    """Raised when a command is rejected by allowlist/blocklist policy.

    Attributes:
        command: The rejected command name.
        reason: Human-readable explanation.
    """

    def __init__(self, command: str, reason: str) -> None:
        self.command = command
        self.reason = reason
        super().__init__(f"Command blocked: {command} ({reason})")

RunResult dataclass

Result of a completed command execution.

Attributes:

Name Type Description
command tuple[str, ...]

The command and arguments as a tuple.

returncode int

Process exit code.

stdout str

Captured standard output (decoded text).

stderr str

Captured standard error (decoded text).

duration float

Wall-clock execution time in seconds.

pid int

Process ID of the executed command.

Source code in runner/runner.py
@dataclasses.dataclass(frozen=True, slots=True)
class RunResult:
    """Result of a completed command execution.

    Attributes:
        command: The command and arguments as a tuple.
        returncode: Process exit code.
        stdout: Captured standard output (decoded text).
        stderr: Captured standard error (decoded text).
        duration: Wall-clock execution time in seconds.
        pid: Process ID of the executed command.
    """

    command: tuple[str, ...]
    returncode: int
    stdout: str
    stderr: str
    duration: float
    pid: int

StreamHandle

Live handle to a running process for streaming output.

Returned by the :func:stream context manager. Provides iterators over stdout and/or stderr lines and process control methods.

Lifecycle
  1. Create — the :func:stream context manager starts the subprocess, optionally writes input to stdin, then yields this handle. At this point the handle owns the process.
  2. Yield lines — the caller iterates via :meth:iter_lines or :meth:iter_any. Lines are yielded as they arrive.
  3. Cleanup — when the with block exits (normally or via exception), :meth:_cleanup is called automatically.
Process ownership

The handle takes exclusive ownership of the underlying :class:subprocess.Popen object. Callers must not interact with the Popen directly. The handle is responsible for ensuring the process is terminated when the context exits.

Unconsumed output

If the caller does not fully consume stdout/stderr (e.g. breaks out of the iterator early), the remaining pipe data is discarded during cleanup. The process is still terminated cleanly via :meth:_cleanup.

Cleanup semantics

:meth:_cleanup checks whether the process is still running (via poll()). If so, it calls :func:_terminate_with_escalation (SIGTERM, then SIGKILL after kill_delay seconds). The returncode attribute is then set from the process exit code. Cleanup is invoked by the finally clause in :func:stream, so it always runs even if the caller raises an exception.

Attributes:

Name Type Description
pid int

Process ID.

Source code in runner/runner.py
class StreamHandle:
    """Live handle to a running process for streaming output.

    Returned by the :func:`stream` context manager.  Provides iterators
    over stdout and/or stderr lines and process control methods.

    Lifecycle:
        1. **Create** — the :func:`stream` context manager starts the
           subprocess, optionally writes *input* to stdin, then yields
           this handle.  At this point the handle **owns** the process.
        2. **Yield lines** — the caller iterates via :meth:`iter_lines`
           or :meth:`iter_any`.  Lines are yielded as they arrive.
        3. **Cleanup** — when the ``with`` block exits (normally or via
           exception), :meth:`_cleanup` is called automatically.

    Process ownership:
        The handle takes exclusive ownership of the underlying
        :class:`subprocess.Popen` object.  Callers must not interact
        with the ``Popen`` directly.  The handle is responsible for
        ensuring the process is terminated when the context exits.

    Unconsumed output:
        If the caller does not fully consume stdout/stderr (e.g. breaks
        out of the iterator early), the remaining pipe data is
        discarded during cleanup.  The process is still terminated
        cleanly via :meth:`_cleanup`.

    Cleanup semantics:
        :meth:`_cleanup` checks whether the process is still running
        (via ``poll()``).  If so, it calls
        :func:`_terminate_with_escalation` (SIGTERM, then SIGKILL after
        *kill_delay* seconds).  The ``returncode`` attribute is then
        set from the process exit code.  Cleanup is invoked by the
        ``finally`` clause in :func:`stream`, so it always runs even
        if the caller raises an exception.

    Attributes:
        pid: Process ID.
    """

    def __init__(
        self,
        proc: subprocess.Popen,  # type: ignore[type-arg]
        encoding: str,
        timeout: float | None,
        kill_delay: float,
    ) -> None:
        self._proc = proc
        self._encoding = encoding
        self._timeout = timeout
        self._kill_delay = kill_delay
        self.pid: int = proc.pid
        self._returncode: int | None = None

    @property
    def returncode(self) -> int | None:
        """Exit code, available after iteration completes or process exits."""
        if self._returncode is not None:
            return self._returncode
        rc = self._proc.poll()
        if rc is not None:
            self._returncode = rc
        return self._returncode

    def iter_lines(self, *, source: str = "stdout") -> Iterator[str]:
        """Iterate over lines from stdout or stderr.

        Args:
            source: ``"stdout"`` or ``"stderr"``.

        Yields:
            Lines of text (including trailing newline).
        """
        pipe = self._proc.stdout if source == "stdout" else self._proc.stderr
        if pipe is None:
            return
        try:
            for line in pipe:
                yield line
        except ValueError:
            pass
        self._returncode = self._proc.wait()

    def iter_any(self) -> Iterator[tuple[str, str]]:
        """Iterate over interleaved lines from both stdout and stderr.

        Yields:
            Tuples of ``(source, line)`` where source is ``"stdout"``
            or ``"stderr"``.
        """
        q: queue.Queue[tuple[str, str] | None] = queue.Queue()

        def _reader(pipe: IO[str], label: str) -> None:
            try:
                for line in pipe:
                    q.put((label, line))
            except ValueError:
                # Tier 2: best-effort observable — expected on pipe close
                pass
            finally:
                q.put(None)

        threads = []
        for pipe, label in [
            (self._proc.stdout, "stdout"),
            (self._proc.stderr, "stderr"),
        ]:
            if pipe is not None:
                t = threading.Thread(target=_reader, args=(pipe, label), daemon=True)
                t.start()
                threads.append(t)

        finished = 0
        total = len(threads)
        while finished < total:
            item = q.get()
            if item is None:
                finished += 1
            else:
                yield item

        for t in threads:
            t.join(timeout=2)
        self._returncode = self._proc.wait()

    def kill(self) -> None:
        """Forcibly kill the process."""
        self._proc.kill()
        self._proc.wait()

    def _cleanup(self) -> None:
        """Ensure the process is terminated and returncode is captured.

        Called automatically by the :func:`stream` context manager on
        exit.  If the process is still running (``poll()`` returns
        ``None``), escalates through SIGTERM -> SIGKILL.  Always sets
        ``self._returncode`` from the process exit code.
        """
        if self._proc.poll() is None:
            _terminate_with_escalation(self._proc, self._kill_delay)
        self._returncode = self._proc.returncode

returncode property

Exit code, available after iteration completes or process exits.

iter_lines(*, source='stdout')

Iterate over lines from stdout or stderr.

Parameters:

Name Type Description Default
source str

"stdout" or "stderr".

'stdout'

Yields:

Type Description
str

Lines of text (including trailing newline).

Source code in runner/runner.py
def iter_lines(self, *, source: str = "stdout") -> Iterator[str]:
    """Iterate over lines from stdout or stderr.

    Args:
        source: ``"stdout"`` or ``"stderr"``.

    Yields:
        Lines of text (including trailing newline).
    """
    pipe = self._proc.stdout if source == "stdout" else self._proc.stderr
    if pipe is None:
        return
    try:
        for line in pipe:
            yield line
    except ValueError:
        pass
    self._returncode = self._proc.wait()

iter_any()

Iterate over interleaved lines from both stdout and stderr.

Yields:

Type Description
str

Tuples of (source, line) where source is "stdout"

str

or "stderr".

Source code in runner/runner.py
def iter_any(self) -> Iterator[tuple[str, str]]:
    """Iterate over interleaved lines from both stdout and stderr.

    Yields:
        Tuples of ``(source, line)`` where source is ``"stdout"``
        or ``"stderr"``.
    """
    q: queue.Queue[tuple[str, str] | None] = queue.Queue()

    def _reader(pipe: IO[str], label: str) -> None:
        try:
            for line in pipe:
                q.put((label, line))
        except ValueError:
            # Tier 2: best-effort observable — expected on pipe close
            pass
        finally:
            q.put(None)

    threads = []
    for pipe, label in [
        (self._proc.stdout, "stdout"),
        (self._proc.stderr, "stderr"),
    ]:
        if pipe is not None:
            t = threading.Thread(target=_reader, args=(pipe, label), daemon=True)
            t.start()
            threads.append(t)

    finished = 0
    total = len(threads)
    while finished < total:
        item = q.get()
        if item is None:
            finished += 1
        else:
            yield item

    for t in threads:
        t.join(timeout=2)
    self._returncode = self._proc.wait()

kill()

Forcibly kill the process.

Source code in runner/runner.py
def kill(self) -> None:
    """Forcibly kill the process."""
    self._proc.kill()
    self._proc.wait()

AsyncStreamHandle

Async live handle to a running process for streaming output.

Returned by the :func:stream_async async context manager.

Lifecycle
  1. Create — :func:stream_async starts the subprocess via asyncio.create_subprocess_exec, optionally writes input to stdin, then yields this handle. The handle owns the process from this point.
  2. Yield lines — the caller iterates via :meth:aiter_lines or :meth:aiter_any. Lines are yielded as they arrive from the async stream readers.
  3. Cleanup — when the async with block exits (normally or via exception), :meth:_cleanup is awaited automatically.
Process ownership

The handle takes exclusive ownership of the underlying asyncio.subprocess.Process. Callers must not interact with the process object directly.

Unconsumed output

If the caller does not fully consume stdout/stderr (e.g. breaks out of the async iterator early), remaining pipe data is discarded during cleanup. The process is still terminated cleanly via :meth:_cleanup.

Cleanup semantics

:meth:_cleanup checks proc.returncode. If None (the process is still running), it awaits :func:_async_terminate_with_escalation (SIGTERM, then SIGKILL after kill_delay seconds). The returncode attribute is then set. Cleanup is invoked by the finally clause in :func:stream_async.

Attributes:

Name Type Description
pid int

Process ID.

Source code in runner/runner.py
class AsyncStreamHandle:
    """Async live handle to a running process for streaming output.

    Returned by the :func:`stream_async` async context manager.

    Lifecycle:
        1. **Create** — :func:`stream_async` starts the subprocess via
           ``asyncio.create_subprocess_exec``, optionally writes *input*
           to stdin, then yields this handle.  The handle **owns** the
           process from this point.
        2. **Yield lines** — the caller iterates via
           :meth:`aiter_lines` or :meth:`aiter_any`.  Lines are yielded
           as they arrive from the async stream readers.
        3. **Cleanup** — when the ``async with`` block exits (normally
           or via exception), :meth:`_cleanup` is awaited automatically.

    Process ownership:
        The handle takes exclusive ownership of the underlying
        ``asyncio.subprocess.Process``.  Callers must not interact with
        the process object directly.

    Unconsumed output:
        If the caller does not fully consume stdout/stderr (e.g. breaks
        out of the async iterator early), remaining pipe data is
        discarded during cleanup.  The process is still terminated
        cleanly via :meth:`_cleanup`.

    Cleanup semantics:
        :meth:`_cleanup` checks ``proc.returncode``.  If ``None`` (the
        process is still running), it awaits
        :func:`_async_terminate_with_escalation` (SIGTERM, then SIGKILL
        after *kill_delay* seconds).  The ``returncode`` attribute is
        then set.  Cleanup is invoked by the ``finally`` clause in
        :func:`stream_async`.

    Attributes:
        pid: Process ID.
    """

    def __init__(
        self,
        proc: asyncio.subprocess.Process,
        encoding: str,
        timeout: float | None,
        kill_delay: float,
    ) -> None:
        self._proc = proc
        self._encoding = encoding
        self._timeout = timeout
        self._kill_delay = kill_delay
        self.pid: int = proc.pid  # type: ignore[assignment]
        self._returncode: int | None = None

    @property
    def returncode(self) -> int | None:
        """Exit code, available after iteration completes or process exits."""
        if self._returncode is not None:
            return self._returncode
        rc = self._proc.returncode
        if rc is not None:
            self._returncode = rc
        return self._returncode

    async def aiter_lines(self, *, source: str = "stdout") -> AsyncIterator[str]:
        """Iterate over lines from stdout or stderr.

        Args:
            source: ``"stdout"`` or ``"stderr"``.

        Yields:
            Lines of text (including trailing newline).
        """
        stream_reader = self._proc.stdout if source == "stdout" else self._proc.stderr
        if stream_reader is None:
            return
        while True:
            line_bytes = await stream_reader.readline()
            if not line_bytes:
                break
            yield line_bytes.decode(self._encoding)
        await self._proc.wait()
        self._returncode = self._proc.returncode

    async def aiter_any(self) -> AsyncIterator[tuple[str, str]]:
        """Iterate over interleaved lines from both stdout and stderr.

        Yields:
            Tuples of ``(source, line)`` where source is ``"stdout"``
            or ``"stderr"``.
        """
        q: asyncio.Queue[tuple[str, str] | None] = asyncio.Queue()

        async def _reader(
            stream_reader: asyncio.StreamReader | None, label: str
        ) -> None:
            if stream_reader is None:
                await q.put(None)
                return
            while True:
                line_bytes = await stream_reader.readline()
                if not line_bytes:
                    break
                await q.put((label, line_bytes.decode(self._encoding)))
            await q.put(None)

        tasks = [
            asyncio.create_task(_reader(self._proc.stdout, "stdout")),
            asyncio.create_task(_reader(self._proc.stderr, "stderr")),
        ]

        finished = 0
        while finished < len(tasks):
            item = await q.get()
            if item is None:
                finished += 1
            else:
                yield item

        await asyncio.gather(*tasks)
        await self._proc.wait()
        self._returncode = self._proc.returncode

    async def kill(self) -> None:
        """Forcibly kill the process."""
        self._proc.kill()
        await self._proc.wait()

    async def _cleanup(self) -> None:
        """Ensure the process is terminated and returncode is captured.

        Called automatically by the :func:`stream_async` context manager
        on exit.  If the process is still running (``returncode is
        None``), escalates through SIGTERM -> SIGKILL.  Always sets
        ``self._returncode`` from the process exit code.
        """
        if self._proc.returncode is None:
            await _async_terminate_with_escalation(self._proc, self._kill_delay)
        self._returncode = self._proc.returncode

returncode property

Exit code, available after iteration completes or process exits.

aiter_lines(*, source='stdout') async

Iterate over lines from stdout or stderr.

Parameters:

Name Type Description Default
source str

"stdout" or "stderr".

'stdout'

Yields:

Type Description
AsyncIterator[str]

Lines of text (including trailing newline).

Source code in runner/runner.py
async def aiter_lines(self, *, source: str = "stdout") -> AsyncIterator[str]:
    """Iterate over lines from stdout or stderr.

    Args:
        source: ``"stdout"`` or ``"stderr"``.

    Yields:
        Lines of text (including trailing newline).
    """
    stream_reader = self._proc.stdout if source == "stdout" else self._proc.stderr
    if stream_reader is None:
        return
    while True:
        line_bytes = await stream_reader.readline()
        if not line_bytes:
            break
        yield line_bytes.decode(self._encoding)
    await self._proc.wait()
    self._returncode = self._proc.returncode

aiter_any() async

Iterate over interleaved lines from both stdout and stderr.

Yields:

Type Description
AsyncIterator[tuple[str, str]]

Tuples of (source, line) where source is "stdout"

AsyncIterator[tuple[str, str]]

or "stderr".

Source code in runner/runner.py
async def aiter_any(self) -> AsyncIterator[tuple[str, str]]:
    """Iterate over interleaved lines from both stdout and stderr.

    Yields:
        Tuples of ``(source, line)`` where source is ``"stdout"``
        or ``"stderr"``.
    """
    q: asyncio.Queue[tuple[str, str] | None] = asyncio.Queue()

    async def _reader(
        stream_reader: asyncio.StreamReader | None, label: str
    ) -> None:
        if stream_reader is None:
            await q.put(None)
            return
        while True:
            line_bytes = await stream_reader.readline()
            if not line_bytes:
                break
            await q.put((label, line_bytes.decode(self._encoding)))
        await q.put(None)

    tasks = [
        asyncio.create_task(_reader(self._proc.stdout, "stdout")),
        asyncio.create_task(_reader(self._proc.stderr, "stderr")),
    ]

    finished = 0
    while finished < len(tasks):
        item = await q.get()
        if item is None:
            finished += 1
        else:
            yield item

    await asyncio.gather(*tasks)
    await self._proc.wait()
    self._returncode = self._proc.returncode

kill() async

Forcibly kill the process.

Source code in runner/runner.py
async def kill(self) -> None:
    """Forcibly kill the process."""
    self._proc.kill()
    await self._proc.wait()

run(cmd, *, input=None, cwd=None, env=None, env_extra=None, env_remove=None, timeout=DEFAULT_TIMEOUT, kill_delay=DEFAULT_KILL_DELAY, check=True, encoding=DEFAULT_ENCODING, on_stdout=None, on_stderr=None, allowed_commands=None, blocked_commands=None)

Run a command synchronously and return the result.

Parameters:

Name Type Description Default
cmd str | Sequence[str]

Command as a string (auto-split) or sequence of arguments.

required
input str | None

Text to send on stdin.

None
cwd str | Path | None

Working directory for the subprocess.

None
env dict[str, str] | None

Complete replacement environment (no inheritance).

None
env_extra dict[str, str] | None

Extra variables to merge into the inherited environment.

None
env_remove Sequence[str] | None

Variables to strip from the inherited environment.

None
timeout float | None

Maximum seconds to wait. None means no timeout.

DEFAULT_TIMEOUT
kill_delay float

Seconds to wait between SIGTERM and SIGKILL.

DEFAULT_KILL_DELAY
check bool

If True, raise CommandFailedError on non-zero exit.

True
encoding str

Text encoding for stdout/stderr.

DEFAULT_ENCODING
on_stdout Callable[[str], None] | None

Per-line callback for stdout (output is still captured).

None
on_stderr Callable[[str], None] | None

Per-line callback for stderr (output is still captured).

None
allowed_commands Sequence[str] | None

If set, only these command names are permitted.

None
blocked_commands Sequence[str] | None

If set, these command names are rejected.

None

Returns:

Type Description
RunResult

A RunResult with captured output, exit code, and timing.

Raises:

Type Description
CommandNotFoundError

If the command binary is not found.

CommandFailedError

If check=True and the exit code is non-zero.

CommandTimeoutError

If execution exceeds timeout.

CommandBlockedError

If the command violates the policy.

ValueError

If the command is empty.

Source code in runner/runner.py
def run(
    cmd: str | Sequence[str],
    *,
    input: str | None = None,  # noqa: A002
    cwd: str | Path | None = None,
    env: dict[str, str] | None = None,
    env_extra: dict[str, str] | None = None,
    env_remove: Sequence[str] | None = None,
    timeout: float | None = DEFAULT_TIMEOUT,
    kill_delay: float = DEFAULT_KILL_DELAY,
    check: bool = True,
    encoding: str = DEFAULT_ENCODING,
    on_stdout: Callable[[str], None] | None = None,
    on_stderr: Callable[[str], None] | None = None,
    allowed_commands: Sequence[str] | None = None,
    blocked_commands: Sequence[str] | None = None,
) -> RunResult:
    """Run a command synchronously and return the result.

    Args:
        cmd: Command as a string (auto-split) or sequence of arguments.
        input: Text to send on stdin.
        cwd: Working directory for the subprocess.
        env: Complete replacement environment (no inheritance).
        env_extra: Extra variables to merge into the inherited environment.
        env_remove: Variables to strip from the inherited environment.
        timeout: Maximum seconds to wait. None means no timeout.
        kill_delay: Seconds to wait between SIGTERM and SIGKILL.
        check: If True, raise CommandFailedError on non-zero exit.
        encoding: Text encoding for stdout/stderr.
        on_stdout: Per-line callback for stdout (output is still captured).
        on_stderr: Per-line callback for stderr (output is still captured).
        allowed_commands: If set, only these command names are permitted.
        blocked_commands: If set, these command names are rejected.

    Returns:
        A RunResult with captured output, exit code, and timing.

    Raises:
        CommandNotFoundError: If the command binary is not found.
        CommandFailedError: If check=True and the exit code is non-zero.
        CommandTimeoutError: If execution exceeds *timeout*.
        CommandBlockedError: If the command violates the policy.
        ValueError: If the command is empty.
    """
    # Phase 1-3: command normalization, policy, environment
    cmd_tuple = _parse_cmd(cmd)
    _check_command_policy(cmd_tuple[0], allowed_commands, blocked_commands)
    computed_env = _build_env(env, env_extra, env_remove)
    popen_kwargs = _popen_platform_kwargs()

    if which(cmd_tuple[0]) is None and not Path(cmd_tuple[0]).is_absolute():
        raise CommandNotFoundError(cmd_tuple[0])

    t0 = time.monotonic()

    # Phase 4: process startup
    try:
        proc = subprocess.Popen(
            cmd_tuple,
            stdin=subprocess.PIPE if input is not None else subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=cwd,
            env=computed_env,
            encoding=encoding,
            **popen_kwargs,
        )
    except FileNotFoundError as exc:
        raise CommandNotFoundError(cmd_tuple[0]) from exc

    pid = proc.pid

    # Phase 5+6: stdout/stderr collection + timeout
    has_callbacks = on_stdout is not None or on_stderr is not None
    try:
        if has_callbacks:
            stdout_text, stderr_text = _sync_collect_with_callbacks(
                proc, on_stdout, on_stderr, input, timeout, kill_delay, cmd_tuple
            )
        else:
            stdout_text, stderr_text = _sync_collect_simple(
                proc, input, timeout, kill_delay, cmd_tuple, encoding
            )
    except CommandTimeoutError:
        raise
    except Exception:
        if proc.poll() is None:
            _terminate_with_escalation(proc, kill_delay)
        raise

    duration = time.monotonic() - t0

    # Phase 7+8: result construction and exit-code check
    result = RunResult(
        command=cmd_tuple,
        returncode=proc.returncode,
        stdout=stdout_text,
        stderr=stderr_text,
        duration=duration,
        pid=pid,
    )

    if check and proc.returncode != 0:
        raise CommandFailedError(result)

    return result

run_async(cmd, *, input=None, cwd=None, env=None, env_extra=None, env_remove=None, timeout=DEFAULT_TIMEOUT, kill_delay=DEFAULT_KILL_DELAY, check=True, encoding=DEFAULT_ENCODING, on_stdout=None, on_stderr=None, allowed_commands=None, blocked_commands=None) async

Run a command asynchronously and return the result.

Async counterpart of :func:run. Uses asyncio.create_subprocess_exec internally.

Parameters:

Name Type Description Default
cmd str | Sequence[str]

Command as a string (auto-split) or sequence of arguments.

required
input str | None

Text to send on stdin.

None
cwd str | Path | None

Working directory for the subprocess.

None
env dict[str, str] | None

Complete replacement environment (no inheritance).

None
env_extra dict[str, str] | None

Extra variables to merge into the inherited environment.

None
env_remove Sequence[str] | None

Variables to strip from the inherited environment.

None
timeout float | None

Maximum seconds to wait. None means no timeout.

DEFAULT_TIMEOUT
kill_delay float

Seconds to wait between SIGTERM and SIGKILL.

DEFAULT_KILL_DELAY
check bool

If True, raise CommandFailedError on non-zero exit.

True
encoding str

Text encoding for stdout/stderr.

DEFAULT_ENCODING
on_stdout Callable[[str], None] | None

Per-line callback for stdout (output is still captured).

None
on_stderr Callable[[str], None] | None

Per-line callback for stderr (output is still captured).

None
allowed_commands Sequence[str] | None

If set, only these command names are permitted.

None
blocked_commands Sequence[str] | None

If set, these command names are rejected.

None

Returns:

Type Description
RunResult

A RunResult with captured output, exit code, and timing.

Raises:

Type Description
CommandNotFoundError

If the command binary is not found.

CommandFailedError

If check=True and the exit code is non-zero.

CommandTimeoutError

If execution exceeds timeout.

CommandBlockedError

If the command violates the policy.

ValueError

If the command is empty.

Source code in runner/runner.py
async def run_async(
    cmd: str | Sequence[str],
    *,
    input: str | None = None,  # noqa: A002
    cwd: str | Path | None = None,
    env: dict[str, str] | None = None,
    env_extra: dict[str, str] | None = None,
    env_remove: Sequence[str] | None = None,
    timeout: float | None = DEFAULT_TIMEOUT,
    kill_delay: float = DEFAULT_KILL_DELAY,
    check: bool = True,
    encoding: str = DEFAULT_ENCODING,
    on_stdout: Callable[[str], None] | None = None,
    on_stderr: Callable[[str], None] | None = None,
    allowed_commands: Sequence[str] | None = None,
    blocked_commands: Sequence[str] | None = None,
) -> RunResult:
    """Run a command asynchronously and return the result.

    Async counterpart of :func:`run`. Uses
    ``asyncio.create_subprocess_exec`` internally.

    Args:
        cmd: Command as a string (auto-split) or sequence of arguments.
        input: Text to send on stdin.
        cwd: Working directory for the subprocess.
        env: Complete replacement environment (no inheritance).
        env_extra: Extra variables to merge into the inherited environment.
        env_remove: Variables to strip from the inherited environment.
        timeout: Maximum seconds to wait. None means no timeout.
        kill_delay: Seconds to wait between SIGTERM and SIGKILL.
        check: If True, raise CommandFailedError on non-zero exit.
        encoding: Text encoding for stdout/stderr.
        on_stdout: Per-line callback for stdout (output is still captured).
        on_stderr: Per-line callback for stderr (output is still captured).
        allowed_commands: If set, only these command names are permitted.
        blocked_commands: If set, these command names are rejected.

    Returns:
        A RunResult with captured output, exit code, and timing.

    Raises:
        CommandNotFoundError: If the command binary is not found.
        CommandFailedError: If check=True and the exit code is non-zero.
        CommandTimeoutError: If execution exceeds *timeout*.
        CommandBlockedError: If the command violates the policy.
        ValueError: If the command is empty.
    """
    # Phase 1-3: command normalization, policy, environment
    cmd_tuple = _parse_cmd(cmd)
    _check_command_policy(cmd_tuple[0], allowed_commands, blocked_commands)
    computed_env = _build_env(env, env_extra, env_remove)

    if which(cmd_tuple[0]) is None and not Path(cmd_tuple[0]).is_absolute():
        raise CommandNotFoundError(cmd_tuple[0])

    t0 = time.monotonic()

    # Phase 4: process startup
    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd_tuple,
            stdin=(
                asyncio.subprocess.PIPE
                if input is not None
                else asyncio.subprocess.DEVNULL
            ),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=cwd,
            env=computed_env,
        )
    except FileNotFoundError as exc:
        raise CommandNotFoundError(cmd_tuple[0]) from exc

    pid = proc.pid
    input_bytes = input.encode(encoding) if input is not None else None

    # Phase 5+6: stdout/stderr collection + timeout
    has_callbacks = on_stdout is not None or on_stderr is not None
    try:
        if has_callbacks:
            stdout_text, stderr_text = await _async_collect_with_callbacks(
                proc,
                on_stdout,
                on_stderr,
                input_bytes,
                timeout,
                kill_delay,
                cmd_tuple,
                encoding,
            )
        else:
            stdout_text, stderr_text = await _async_collect_simple(
                proc,
                input_bytes,
                timeout,
                kill_delay,
                cmd_tuple,
                encoding,
            )
    except CommandTimeoutError:
        raise
    except Exception:
        if proc.returncode is None:
            await _async_terminate_with_escalation(proc, kill_delay)
        raise

    duration = time.monotonic() - t0

    # Phase 7+8: result construction and exit-code check
    result = RunResult(
        command=cmd_tuple,
        returncode=proc.returncode if proc.returncode is not None else -1,
        stdout=stdout_text,
        stderr=stderr_text,
        duration=duration,
        pid=pid,
    )

    if check and result.returncode != 0:
        raise CommandFailedError(result)

    return result

stream(cmd, *, input=None, cwd=None, env=None, env_extra=None, env_remove=None, timeout=None, kill_delay=DEFAULT_KILL_DELAY, encoding=DEFAULT_ENCODING, allowed_commands=None, blocked_commands=None)

Context manager for streaming subprocess output.

Yields a :class:StreamHandle that provides line iterators over stdout and stderr. The process is automatically cleaned up on context exit.

Parameters:

Name Type Description Default
cmd str | Sequence[str]

Command as a string (auto-split) or sequence of arguments.

required
input str | None

Text to send on stdin.

None
cwd str | Path | None

Working directory for the subprocess.

None
env dict[str, str] | None

Complete replacement environment (no inheritance).

None
env_extra dict[str, str] | None

Extra variables to merge into the inherited environment.

None
env_remove Sequence[str] | None

Variables to strip from the inherited environment.

None
timeout float | None

Maximum seconds for the process. None means no timeout.

None
kill_delay float

Seconds to wait between SIGTERM and SIGKILL.

DEFAULT_KILL_DELAY
encoding str

Text encoding for stdout/stderr.

DEFAULT_ENCODING
allowed_commands Sequence[str] | None

If set, only these command names are permitted.

None
blocked_commands Sequence[str] | None

If set, these command names are rejected.

None

Yields:

Type Description
StreamHandle

A StreamHandle for reading process output.

Raises:

Type Description
CommandNotFoundError

If the command binary is not found.

CommandBlockedError

If the command violates the policy.

ValueError

If the command is empty.

Source code in runner/runner.py
@contextmanager
def stream(
    cmd: str | Sequence[str],
    *,
    input: str | None = None,  # noqa: A002
    cwd: str | Path | None = None,
    env: dict[str, str] | None = None,
    env_extra: dict[str, str] | None = None,
    env_remove: Sequence[str] | None = None,
    timeout: float | None = None,
    kill_delay: float = DEFAULT_KILL_DELAY,
    encoding: str = DEFAULT_ENCODING,
    allowed_commands: Sequence[str] | None = None,
    blocked_commands: Sequence[str] | None = None,
) -> Iterator[StreamHandle]:
    """Context manager for streaming subprocess output.

    Yields a :class:`StreamHandle` that provides line iterators over
    stdout and stderr.  The process is automatically cleaned up on
    context exit.

    Args:
        cmd: Command as a string (auto-split) or sequence of arguments.
        input: Text to send on stdin.
        cwd: Working directory for the subprocess.
        env: Complete replacement environment (no inheritance).
        env_extra: Extra variables to merge into the inherited environment.
        env_remove: Variables to strip from the inherited environment.
        timeout: Maximum seconds for the process. None means no timeout.
        kill_delay: Seconds to wait between SIGTERM and SIGKILL.
        encoding: Text encoding for stdout/stderr.
        allowed_commands: If set, only these command names are permitted.
        blocked_commands: If set, these command names are rejected.

    Yields:
        A StreamHandle for reading process output.

    Raises:
        CommandNotFoundError: If the command binary is not found.
        CommandBlockedError: If the command violates the policy.
        ValueError: If the command is empty.
    """
    cmd_tuple = _parse_cmd(cmd)
    _check_command_policy(cmd_tuple[0], allowed_commands, blocked_commands)

    computed_env = _build_env(env, env_extra, env_remove)
    popen_kwargs = _popen_platform_kwargs()

    if which(cmd_tuple[0]) is None and not Path(cmd_tuple[0]).is_absolute():
        raise CommandNotFoundError(cmd_tuple[0])

    try:
        proc = subprocess.Popen(
            cmd_tuple,
            stdin=subprocess.PIPE if input is not None else subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=cwd,
            env=computed_env,
            encoding=encoding,
            **popen_kwargs,
        )
    except FileNotFoundError as exc:
        raise CommandNotFoundError(cmd_tuple[0]) from exc

    if input is not None and proc.stdin is not None:
        proc.stdin.write(input)
        proc.stdin.close()

    handle = StreamHandle(proc, encoding, timeout, kill_delay)
    try:
        yield handle
    finally:
        # Tier 1: must-succeed — process cleanup
        handle._cleanup()

stream_async(cmd, *, input=None, cwd=None, env=None, env_extra=None, env_remove=None, timeout=None, kill_delay=DEFAULT_KILL_DELAY, encoding=DEFAULT_ENCODING, allowed_commands=None, blocked_commands=None) async

Async context manager for streaming subprocess output.

Yields an :class:AsyncStreamHandle that provides async line iterators over stdout and stderr. The process is automatically cleaned up on context exit.

Parameters:

Name Type Description Default
cmd str | Sequence[str]

Command as a string (auto-split) or sequence of arguments.

required
input str | None

Text to send on stdin.

None
cwd str | Path | None

Working directory for the subprocess.

None
env dict[str, str] | None

Complete replacement environment (no inheritance).

None
env_extra dict[str, str] | None

Extra variables to merge into the inherited environment.

None
env_remove Sequence[str] | None

Variables to strip from the inherited environment.

None
timeout float | None

Maximum seconds for the process. None means no timeout.

None
kill_delay float

Seconds to wait between SIGTERM and SIGKILL.

DEFAULT_KILL_DELAY
encoding str

Text encoding for stdout/stderr.

DEFAULT_ENCODING
allowed_commands Sequence[str] | None

If set, only these command names are permitted.

None
blocked_commands Sequence[str] | None

If set, these command names are rejected.

None

Yields:

Type Description
AsyncIterator[AsyncStreamHandle]

An AsyncStreamHandle for reading process output.

Raises:

Type Description
CommandNotFoundError

If the command binary is not found.

CommandBlockedError

If the command violates the policy.

ValueError

If the command is empty.

Source code in runner/runner.py
@asynccontextmanager
async def stream_async(
    cmd: str | Sequence[str],
    *,
    input: str | None = None,  # noqa: A002
    cwd: str | Path | None = None,
    env: dict[str, str] | None = None,
    env_extra: dict[str, str] | None = None,
    env_remove: Sequence[str] | None = None,
    timeout: float | None = None,
    kill_delay: float = DEFAULT_KILL_DELAY,
    encoding: str = DEFAULT_ENCODING,
    allowed_commands: Sequence[str] | None = None,
    blocked_commands: Sequence[str] | None = None,
) -> AsyncIterator[AsyncStreamHandle]:
    """Async context manager for streaming subprocess output.

    Yields an :class:`AsyncStreamHandle` that provides async line
    iterators over stdout and stderr.  The process is automatically
    cleaned up on context exit.

    Args:
        cmd: Command as a string (auto-split) or sequence of arguments.
        input: Text to send on stdin.
        cwd: Working directory for the subprocess.
        env: Complete replacement environment (no inheritance).
        env_extra: Extra variables to merge into the inherited environment.
        env_remove: Variables to strip from the inherited environment.
        timeout: Maximum seconds for the process. None means no timeout.
        kill_delay: Seconds to wait between SIGTERM and SIGKILL.
        encoding: Text encoding for stdout/stderr.
        allowed_commands: If set, only these command names are permitted.
        blocked_commands: If set, these command names are rejected.

    Yields:
        An AsyncStreamHandle for reading process output.

    Raises:
        CommandNotFoundError: If the command binary is not found.
        CommandBlockedError: If the command violates the policy.
        ValueError: If the command is empty.
    """
    cmd_tuple = _parse_cmd(cmd)
    _check_command_policy(cmd_tuple[0], allowed_commands, blocked_commands)

    computed_env = _build_env(env, env_extra, env_remove)

    if which(cmd_tuple[0]) is None and not Path(cmd_tuple[0]).is_absolute():
        raise CommandNotFoundError(cmd_tuple[0])

    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd_tuple,
            stdin=(
                asyncio.subprocess.PIPE
                if input is not None
                else asyncio.subprocess.DEVNULL
            ),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=cwd,
            env=computed_env,
        )
    except FileNotFoundError as exc:
        raise CommandNotFoundError(cmd_tuple[0]) from exc

    if input is not None and proc.stdin is not None:
        proc.stdin.write(input.encode(encoding))
        await proc.stdin.drain()
        proc.stdin.close()
        await proc.stdin.wait_closed()

    handle = AsyncStreamHandle(proc, encoding, timeout, kill_delay)
    try:
        yield handle
    finally:
        # Tier 1: must-succeed — async process cleanup
        await handle._cleanup()

shell_split(s)

Split a shell command string into a list of arguments.

Uses :func:shlex.split with POSIX mode on Unix and non-POSIX on Windows.

Parameters:

Name Type Description Default
s str

Shell command string, e.g. 'git commit -m "hello world"'.

required

Returns:

Type Description
list[str]

List of arguments.

Raises:

Type Description
ValueError

On unterminated quotes or other parse errors.

Source code in runner/runner.py
def shell_split(s: str) -> list[str]:
    """Split a shell command string into a list of arguments.

    Uses :func:`shlex.split` with POSIX mode on Unix and non-POSIX on
    Windows.

    Args:
        s: Shell command string, e.g. ``'git commit -m "hello world"'``.

    Returns:
        List of arguments.

    Raises:
        ValueError: On unterminated quotes or other parse errors.
    """
    return shlex.split(s, posix=not _IS_WINDOWS)

shell_quote(*args)

Quote arguments for safe shell interpolation.

On Unix, uses :func:shlex.quote. On Windows, uses cmd.exe-safe quoting with double-quote escaping.

Parameters:

Name Type Description Default
*args str

Individual arguments to quote.

()

Returns:

Type Description
str

Space-joined quoted string.

Source code in runner/runner.py
def shell_quote(*args: str) -> str:
    """Quote arguments for safe shell interpolation.

    On Unix, uses :func:`shlex.quote`.  On Windows, uses ``cmd.exe``-safe
    quoting with double-quote escaping.

    Args:
        *args: Individual arguments to quote.

    Returns:
        Space-joined quoted string.
    """
    if _IS_WINDOWS:
        return " ".join(_win_quote(a) for a in args)
    return " ".join(shlex.quote(a) for a in args)

which(name)

Locate a command on the system PATH.

Cross-platform wrapper around :func:shutil.which.

Binary lookup order (Pattern 2 convention): 1. Exact path — if name is absolute, return it directly if it exists. 2. PATH search — delegates to :func:shutil.which, which walks os.environ["PATH"] entries in order, respecting PATHEXT on Windows.

Parameters:

Name Type Description Default
name str

Command name (e.g. "git").

required

Returns:

Type Description
str | None

Absolute path to the binary, or None if not found.

Source code in runner/runner.py
def which(name: str) -> str | None:
    """Locate a command on the system PATH.

    Cross-platform wrapper around :func:`shutil.which`.

    Binary lookup order (Pattern 2 convention):
      1. Exact path — if *name* is absolute, return it directly if it exists.
      2. PATH search — delegates to :func:`shutil.which`, which walks
         ``os.environ["PATH"]`` entries in order, respecting PATHEXT on
         Windows.

    Args:
        name: Command name (e.g. ``"git"``).

    Returns:
        Absolute path to the binary, or ``None`` if not found.
    """
    return shutil.which(name)