Skip to content

ACP API Reference

Auto-generated API documentation for the ACP module.

acp

ACP (Agent Client Protocol) -- Zero-dependency Python implementation.

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

The Agent Client Protocol standardizes communication between code editors (Clients) and AI coding agents (Agents). It uses JSON-RPC 2.0 over stdio (newline-delimited JSON), similar to how the Language Server Protocol (LSP) standardized language-server integration.

This single-file module provides:

  • JSONRPCTransport -- async read/write of newline-delimited JSON-RPC 2.0 messages over arbitrary asyncio.StreamReader / asyncio.StreamWriter pairs (typically stdin/stdout of a subprocess).

  • Protocol data types -- pure-dataclass representations of every message and structure defined by the ACP specification (protocol version 1).

  • ACPClient -- high-level async helper that spawns an agent subprocess, performs the initialize handshake, creates sessions, sends prompts, and yields session/update notifications as an async iterator.

  • ACPAgent -- abstract base class for implementing an ACP-compatible agent. Subclass it and override the on_* handler methods; call agent.run() to start the stdio event loop.

Requires Python >= 3.10. No third-party packages are needed -- only the standard library (asyncio, json, dataclasses, enum, typing, sys, abc, uuid, logging).

Quickstart -- Client side::

async def main():
    client = ACPClient(["python", "-m", "my_agent"])
    await client.start()
    init = await client.initialize()
    session = await client.new_session("/home/user/project")
    async for update in client.prompt(session.session_id, "Hello!"):
        print(update)
    await client.stop()

Quickstart -- Agent side::

class EchoAgent(ACPAgent):
    async def on_initialize(self, params):
        return InitializeResult(protocol_version=1)

    async def on_new_session(self, params):
        return NewSessionResult(session_id="sess_1")

    async def on_prompt(self, params):
        text = ""
        for block in params.prompt:
            if isinstance(block, TextContent):
                text = block.text
        await self.send_update(params.session_id,
            AgentMessageChunkUpdate(
                content=TextContent(text=f"Echo: {text}")))
        return PromptResult(stop_reason=StopReason.END_TURN)

if __name__ == "__main__":
    asyncio.run(EchoAgent().run())

JSONRPCError dataclass

A JSON-RPC 2.0 error object.

Attributes:

Name Type Description
code int

Numeric error code.

message str

Human-readable error message.

data Any

Optional additional error data.

Source code in jsonrpc/jsonrpc.py
@dataclass
class JSONRPCError:
    """A JSON-RPC 2.0 error object.

    Attributes:
        code: Numeric error code.
        message: Human-readable error message.
        data: Optional additional error data.
    """

    code: int = INTERNAL_ERROR
    message: str = "Internal error"
    data: Any = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a dictionary."""
        d: dict[str, Any] = {"code": self.code, "message": self.message}
        if self.data is not None:
            d["data"] = self.data
        return d

    @classmethod
    def from_dict(cls, raw: dict[str, Any]) -> JSONRPCError:
        """Deserialize from a dictionary."""
        return cls(
            code=raw["code"],
            message=raw["message"],
            data=raw.get("data"),
        )

to_dict()

Serialize to a dictionary.

Source code in jsonrpc/jsonrpc.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dictionary."""
    d: dict[str, Any] = {"code": self.code, "message": self.message}
    if self.data is not None:
        d["data"] = self.data
    return d

from_dict(raw) classmethod

Deserialize from a dictionary.

Source code in jsonrpc/jsonrpc.py
@classmethod
def from_dict(cls, raw: dict[str, Any]) -> JSONRPCError:
    """Deserialize from a dictionary."""
    return cls(
        code=raw["code"],
        message=raw["message"],
        data=raw.get("data"),
    )

JSONRPCException

Bases: Exception

Exception wrapper around a JSONRPCError data object.

Attributes:

Name Type Description
error

The underlying JSONRPCError instance.

Source code in jsonrpc/jsonrpc.py
class JSONRPCException(Exception):
    """Exception wrapper around a ``JSONRPCError`` data object.

    Attributes:
        error: The underlying ``JSONRPCError`` instance.
    """

    def __init__(self, error: JSONRPCError) -> None:
        super().__init__(error.message)
        self.error = error

JSONRPCTransport

Async JSON-RPC 2.0 transport over newline-delimited JSON streams.

Each message is a single JSON object terminated by \n. Messages must not contain embedded newlines.

Attributes:

Name Type Description
reader

Async stream to read incoming messages from.

writer

Async stream to write outgoing messages to.

Source code in jsonrpc/jsonrpc.py
class JSONRPCTransport:
    """Async JSON-RPC 2.0 transport over newline-delimited JSON streams.

    Each message is a single JSON object terminated by ``\\n``.
    Messages must not contain embedded newlines.

    Attributes:
        reader: Async stream to read incoming messages from.
        writer: Async stream to write outgoing messages to.
    """

    def __init__(
        self,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ) -> None:
        self.reader = reader
        self.writer = writer
        self._closed = False

    async def read_message(self) -> dict[str, Any] | None:
        """Read the next JSON-RPC message.

        Returns:
            Parsed JSON object, or ``None`` on EOF.
        """
        while True:
            line = await self.reader.readline()
            if not line:
                return None
            text = line.decode("utf-8").strip()
            if not text:
                continue
            try:
                return json.loads(text)
            except json.JSONDecodeError:
                logger.warning("Ignoring malformed JSON line: %s", text[:120])

    async def write_message(self, msg: dict[str, Any]) -> None:
        """Write a JSON-RPC message followed by a newline.

        Args:
            msg: JSON-serializable dictionary to send.
        """
        raw = json.dumps(msg, separators=(",", ":"), ensure_ascii=False)
        self.writer.write((raw + "\n").encode("utf-8"))
        await self.writer.drain()

    async def send_request(
        self,
        method: str,
        params: Any = None,
        req_id: Union[int, str, None] = None,
    ) -> dict[str, Any]:
        """Build and send a JSON-RPC request.

        Args:
            method: The RPC method name.
            params: Parameters for the method.
            req_id: Optional explicit request id.

        Returns:
            The message dictionary that was sent.
        """
        msg: dict[str, Any] = {
            "jsonrpc": JSONRPC_VERSION,
            "id": req_id if req_id is not None else next_id(),
            "method": method,
        }
        if params is not None:
            msg["params"] = params
        await self.write_message(msg)
        return msg

    async def send_notification(self, method: str, params: Any = None) -> None:
        """Build and send a JSON-RPC notification (no ``id``).

        Args:
            method: The RPC method name.
            params: Parameters for the notification.
        """
        msg: dict[str, Any] = {"jsonrpc": JSONRPC_VERSION, "method": method}
        if params is not None:
            msg["params"] = params
        await self.write_message(msg)

    async def send_result(self, req_id: Union[int, str], result: Any) -> None:
        """Send a JSON-RPC success response.

        Args:
            req_id: The id of the original request.
            result: The result payload.
        """
        await self.write_message(
            {"jsonrpc": JSONRPC_VERSION, "id": req_id, "result": result}
        )

    async def send_error(
        self, req_id: Union[int, str, None], error: JSONRPCError
    ) -> None:
        """Send a JSON-RPC error response.

        Args:
            req_id: The id of the original request (may be ``None``).
            error: The error object.
        """
        await self.write_message(
            {"jsonrpc": JSONRPC_VERSION, "id": req_id, "error": error.to_dict()}
        )

    async def close(self) -> None:
        """Close the writer stream."""
        if not self._closed:
            self._closed = True
            self.writer.close()

    @property
    def is_closed(self) -> bool:
        """Whether the writer has been closed."""
        return self._closed

is_closed property

Whether the writer has been closed.

read_message() async

Read the next JSON-RPC message.

Returns:

Type Description
dict[str, Any] | None

Parsed JSON object, or None on EOF.

Source code in jsonrpc/jsonrpc.py
async def read_message(self) -> dict[str, Any] | None:
    """Read the next JSON-RPC message.

    Returns:
        Parsed JSON object, or ``None`` on EOF.
    """
    while True:
        line = await self.reader.readline()
        if not line:
            return None
        text = line.decode("utf-8").strip()
        if not text:
            continue
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            logger.warning("Ignoring malformed JSON line: %s", text[:120])

write_message(msg) async

Write a JSON-RPC message followed by a newline.

Parameters:

Name Type Description Default
msg dict[str, Any]

JSON-serializable dictionary to send.

required
Source code in jsonrpc/jsonrpc.py
async def write_message(self, msg: dict[str, Any]) -> None:
    """Write a JSON-RPC message followed by a newline.

    Args:
        msg: JSON-serializable dictionary to send.
    """
    raw = json.dumps(msg, separators=(",", ":"), ensure_ascii=False)
    self.writer.write((raw + "\n").encode("utf-8"))
    await self.writer.drain()

send_request(method, params=None, req_id=None) async

Build and send a JSON-RPC request.

Parameters:

Name Type Description Default
method str

The RPC method name.

required
params Any

Parameters for the method.

None
req_id Union[int, str, None]

Optional explicit request id.

None

Returns:

Type Description
dict[str, Any]

The message dictionary that was sent.

Source code in jsonrpc/jsonrpc.py
async def send_request(
    self,
    method: str,
    params: Any = None,
    req_id: Union[int, str, None] = None,
) -> dict[str, Any]:
    """Build and send a JSON-RPC request.

    Args:
        method: The RPC method name.
        params: Parameters for the method.
        req_id: Optional explicit request id.

    Returns:
        The message dictionary that was sent.
    """
    msg: dict[str, Any] = {
        "jsonrpc": JSONRPC_VERSION,
        "id": req_id if req_id is not None else next_id(),
        "method": method,
    }
    if params is not None:
        msg["params"] = params
    await self.write_message(msg)
    return msg

send_notification(method, params=None) async

Build and send a JSON-RPC notification (no id).

Parameters:

Name Type Description Default
method str

The RPC method name.

required
params Any

Parameters for the notification.

None
Source code in jsonrpc/jsonrpc.py
async def send_notification(self, method: str, params: Any = None) -> None:
    """Build and send a JSON-RPC notification (no ``id``).

    Args:
        method: The RPC method name.
        params: Parameters for the notification.
    """
    msg: dict[str, Any] = {"jsonrpc": JSONRPC_VERSION, "method": method}
    if params is not None:
        msg["params"] = params
    await self.write_message(msg)

send_result(req_id, result) async

Send a JSON-RPC success response.

Parameters:

Name Type Description Default
req_id Union[int, str]

The id of the original request.

required
result Any

The result payload.

required
Source code in jsonrpc/jsonrpc.py
async def send_result(self, req_id: Union[int, str], result: Any) -> None:
    """Send a JSON-RPC success response.

    Args:
        req_id: The id of the original request.
        result: The result payload.
    """
    await self.write_message(
        {"jsonrpc": JSONRPC_VERSION, "id": req_id, "result": result}
    )

send_error(req_id, error) async

Send a JSON-RPC error response.

Parameters:

Name Type Description Default
req_id Union[int, str, None]

The id of the original request (may be None).

required
error JSONRPCError

The error object.

required
Source code in jsonrpc/jsonrpc.py
async def send_error(
    self, req_id: Union[int, str, None], error: JSONRPCError
) -> None:
    """Send a JSON-RPC error response.

    Args:
        req_id: The id of the original request (may be ``None``).
        error: The error object.
    """
    await self.write_message(
        {"jsonrpc": JSONRPC_VERSION, "id": req_id, "error": error.to_dict()}
    )

close() async

Close the writer stream.

Source code in jsonrpc/jsonrpc.py
async def close(self) -> None:
    """Close the writer stream."""
    if not self._closed:
        self._closed = True
        self.writer.close()

StopReason

Bases: str, Enum

Reason an agent stopped a prompt turn.

Source code in acp/acp.py
class StopReason(str, Enum):
    """Reason an agent stopped a prompt turn."""

    END_TURN = "end_turn"
    MAX_TOKENS = "max_tokens"
    MAX_TURN_REQUESTS = "max_turn_requests"
    REFUSAL = "refusal"
    CANCELLED = "cancelled"

ToolKind

Bases: str, Enum

Category of a tool being invoked.

Source code in acp/acp.py
class ToolKind(str, Enum):
    """Category of a tool being invoked."""

    READ = "read"
    EDIT = "edit"
    DELETE = "delete"
    MOVE = "move"
    SEARCH = "search"
    EXECUTE = "execute"
    THINK = "think"
    FETCH = "fetch"
    OTHER = "other"

ToolCallStatus

Bases: str, Enum

Execution status of a tool call.

Source code in acp/acp.py
class ToolCallStatus(str, Enum):
    """Execution status of a tool call."""

    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

PermissionOptionKind

Bases: str, Enum

Kind of permission option presented to the user.

Source code in acp/acp.py
class PermissionOptionKind(str, Enum):
    """Kind of permission option presented to the user."""

    ALLOW_ONCE = "allow_once"
    ALLOW_ALWAYS = "allow_always"
    REJECT_ONCE = "reject_once"
    REJECT_ALWAYS = "reject_always"

PlanEntryPriority

Bases: str, Enum

Priority of a plan entry.

Source code in acp/acp.py
class PlanEntryPriority(str, Enum):
    """Priority of a plan entry."""

    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

PlanEntryStatus

Bases: str, Enum

Execution status of a plan entry.

Source code in acp/acp.py
class PlanEntryStatus(str, Enum):
    """Execution status of a plan entry."""

    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"

TextContent dataclass

Plain text content block.

Attributes:

Name Type Description
text str

The text payload.

type str

Discriminator (always "text").

Source code in acp/acp.py
@dataclass
class TextContent:
    """Plain text content block.

    Attributes:
        text: The text payload.
        type: Discriminator (always ``"text"``).
    """

    text: str
    type: str = "text"

ImageContent dataclass

Base64-encoded image content block.

Attributes:

Name Type Description
data str

Base64-encoded image data.

mime_type str

MIME type such as "image/png".

type str

Discriminator (always "image").

Source code in acp/acp.py
@dataclass
class ImageContent:
    """Base64-encoded image content block.

    Attributes:
        data: Base64-encoded image data.
        mime_type: MIME type such as ``"image/png"``.
        type: Discriminator (always ``"image"``).
    """

    data: str
    mime_type: str
    type: str = "image"

AudioContent dataclass

Base64-encoded audio content block.

Attributes:

Name Type Description
data str

Base64-encoded audio data.

mime_type str

MIME type such as "audio/wav".

type str

Discriminator (always "audio").

Source code in acp/acp.py
@dataclass
class AudioContent:
    """Base64-encoded audio content block.

    Attributes:
        data: Base64-encoded audio data.
        mime_type: MIME type such as ``"audio/wav"``.
        type: Discriminator (always ``"audio"``).
    """

    data: str
    mime_type: str
    type: str = "audio"

ResourceContent dataclass

Embedded resource content block.

Attributes:

Name Type Description
resource _TextResource

The embedded resource (text or blob).

type str

Discriminator (always "resource").

Source code in acp/acp.py
@dataclass
class ResourceContent:
    """Embedded resource content block.

    Attributes:
        resource: The embedded resource (text or blob).
        type: Discriminator (always ``"resource"``).
    """

    resource: _TextResource
    type: str = "resource"

ResourceLinkContent dataclass

Reference to an external resource.

Attributes:

Name Type Description
uri str

URI of the resource.

name str

Human-readable resource name.

mime_type str | None

Optional MIME type.

title str | None

Optional display title.

description str | None

Optional description.

size int | None

Optional size in bytes.

type str

Discriminator (always "resource_link").

Source code in acp/acp.py
@dataclass
class ResourceLinkContent:
    """Reference to an external resource.

    Attributes:
        uri: URI of the resource.
        name: Human-readable resource name.
        mime_type: Optional MIME type.
        title: Optional display title.
        description: Optional description.
        size: Optional size in bytes.
        type: Discriminator (always ``"resource_link"``).
    """

    uri: str
    name: str
    mime_type: str | None = None
    title: str | None = None
    description: str | None = None
    size: int | None = None
    type: str = "resource_link"

ImplementationInfo dataclass

Information about a client or agent implementation.

Attributes:

Name Type Description
name str

Programmatic identifier.

version str

Version string.

title str | None

Human-readable display name.

Source code in acp/acp.py
@dataclass
class ImplementationInfo:
    """Information about a client or agent implementation.

    Attributes:
        name: Programmatic identifier.
        version: Version string.
        title: Human-readable display name.
    """

    name: str
    version: str = ""
    title: str | None = None

FsCapabilities dataclass

Client file-system capabilities.

Attributes:

Name Type Description
read_text_file bool

Whether fs/read_text_file is available.

write_text_file bool

Whether fs/write_text_file is available.

Source code in acp/acp.py
@dataclass
class FsCapabilities:
    """Client file-system capabilities.

    Attributes:
        read_text_file: Whether ``fs/read_text_file`` is available.
        write_text_file: Whether ``fs/write_text_file`` is available.
    """

    read_text_file: bool = False
    write_text_file: bool = False

ClientCapabilities dataclass

Capabilities supported by the client.

Attributes:

Name Type Description
fs FsCapabilities | None

File-system method availability.

terminal bool

Whether all terminal/* methods are available.

Source code in acp/acp.py
@dataclass
class ClientCapabilities:
    """Capabilities supported by the client.

    Attributes:
        fs: File-system method availability.
        terminal: Whether all ``terminal/*`` methods are available.
    """

    fs: FsCapabilities | None = None
    terminal: bool = False

PromptCapabilities dataclass

Content types the agent supports in prompts.

Attributes:

Name Type Description
image bool

Whether image content is supported.

audio bool

Whether audio content is supported.

embedded_context bool

Whether embedded resource content is supported.

Source code in acp/acp.py
@dataclass
class PromptCapabilities:
    """Content types the agent supports in prompts.

    Attributes:
        image: Whether image content is supported.
        audio: Whether audio content is supported.
        embedded_context: Whether embedded resource content is supported.
    """

    image: bool = False
    audio: bool = False
    embedded_context: bool = False

McpCapabilities dataclass

MCP transport capabilities.

Attributes:

Name Type Description
http bool

Whether HTTP MCP transport is supported.

sse bool

Whether SSE MCP transport is supported.

Source code in acp/acp.py
@dataclass
class McpCapabilities:
    """MCP transport capabilities.

    Attributes:
        http: Whether HTTP MCP transport is supported.
        sse: Whether SSE MCP transport is supported.
    """

    http: bool = False
    sse: bool = False

SessionListCapability dataclass

Marker for session/list support.

Source code in acp/acp.py
@dataclass
class SessionListCapability:
    """Marker for session/list support."""

    pass

SessionCapabilities dataclass

Session-level capabilities.

Attributes:

Name Type Description
list SessionListCapability | None

If present, session/list is supported.

Source code in acp/acp.py
@dataclass
class SessionCapabilities:
    """Session-level capabilities.

    Attributes:
        list: If present, ``session/list`` is supported.
    """

    list: SessionListCapability | None = None

AgentCapabilities dataclass

Capabilities supported by the agent.

Attributes:

Name Type Description
load_session bool

Whether session/load is available.

prompt_capabilities PromptCapabilities | None

Supported content types in prompts.

mcp_capabilities McpCapabilities | None

Supported MCP transports.

session_capabilities SessionCapabilities | None

Session-level capabilities.

Source code in acp/acp.py
@dataclass
class AgentCapabilities:
    """Capabilities supported by the agent.

    Attributes:
        load_session: Whether ``session/load`` is available.
        prompt_capabilities: Supported content types in prompts.
        mcp_capabilities: Supported MCP transports.
        session_capabilities: Session-level capabilities.
    """

    load_session: bool = False
    prompt_capabilities: PromptCapabilities | None = None
    mcp_capabilities: McpCapabilities | None = None
    session_capabilities: SessionCapabilities | None = None

AuthMethod dataclass

An authentication method advertised by the agent.

Attributes:

Name Type Description
id str

Unique identifier for this auth method.

name str

Human-readable name.

description str | None

Optional description.

Source code in acp/acp.py
@dataclass
class AuthMethod:
    """An authentication method advertised by the agent.

    Attributes:
        id: Unique identifier for this auth method.
        name: Human-readable name.
        description: Optional description.
    """

    id: str
    name: str
    description: str | None = None

InitializeParams dataclass

Parameters for the initialize request.

Attributes:

Name Type Description
protocol_version int

Latest protocol version the client supports.

client_capabilities ClientCapabilities | None

Capabilities the client supports.

client_info ImplementationInfo | None

Information about the client implementation.

Source code in acp/acp.py
@dataclass
class InitializeParams:
    """Parameters for the ``initialize`` request.

    Attributes:
        protocol_version: Latest protocol version the client supports.
        client_capabilities: Capabilities the client supports.
        client_info: Information about the client implementation.
    """

    protocol_version: int = 1
    client_capabilities: ClientCapabilities | None = None
    client_info: ImplementationInfo | None = None

InitializeResult dataclass

Result of the initialize request.

Attributes:

Name Type Description
protocol_version int

Negotiated protocol version.

agent_capabilities AgentCapabilities | None

Capabilities the agent supports.

agent_info ImplementationInfo | None

Information about the agent implementation.

auth_methods list[AuthMethod] | None

Available authentication methods.

Source code in acp/acp.py
@dataclass
class InitializeResult:
    """Result of the ``initialize`` request.

    Attributes:
        protocol_version: Negotiated protocol version.
        agent_capabilities: Capabilities the agent supports.
        agent_info: Information about the agent implementation.
        auth_methods: Available authentication methods.
    """

    protocol_version: int = 1
    agent_capabilities: AgentCapabilities | None = None
    agent_info: ImplementationInfo | None = None
    auth_methods: list[AuthMethod] | None = None

EnvVariable dataclass

An environment variable.

Attributes:

Name Type Description
name str

Variable name.

value str

Variable value.

Source code in acp/acp.py
@dataclass
class EnvVariable:
    """An environment variable.

    Attributes:
        name: Variable name.
        value: Variable value.
    """

    name: str
    value: str

McpServerStdio dataclass

Stdio MCP server specification.

Attributes:

Name Type Description
name str

Human-readable server name.

command str

Path to the MCP server executable.

args list[str]

Command-line arguments.

env list[EnvVariable] | None

Environment variables.

Source code in acp/acp.py
@dataclass
class McpServerStdio:
    """Stdio MCP server specification.

    Attributes:
        name: Human-readable server name.
        command: Path to the MCP server executable.
        args: Command-line arguments.
        env: Environment variables.
    """

    name: str
    command: str
    args: list[str] = field(default_factory=list)
    env: list[EnvVariable] | None = None

HttpHeader dataclass

An HTTP header.

Attributes:

Name Type Description
name str

Header name.

value str

Header value.

Source code in acp/acp.py
@dataclass
class HttpHeader:
    """An HTTP header.

    Attributes:
        name: Header name.
        value: Header value.
    """

    name: str
    value: str

McpServerHttp dataclass

HTTP MCP server specification.

Attributes:

Name Type Description
name str

Human-readable server name.

url str

URL of the MCP server.

headers list[HttpHeader]

HTTP headers.

type str

Transport type ("http" or "sse").

Source code in acp/acp.py
@dataclass
class McpServerHttp:
    """HTTP MCP server specification.

    Attributes:
        name: Human-readable server name.
        url: URL of the MCP server.
        headers: HTTP headers.
        type: Transport type (``"http"`` or ``"sse"``).
    """

    name: str
    url: str
    headers: list[HttpHeader] = field(default_factory=list)
    type: str = "http"

NewSessionParams dataclass

Parameters for session/new.

Attributes:

Name Type Description
cwd str

Absolute path to the working directory.

mcp_servers list[dict[str, Any]] | None

MCP servers to connect to.

Source code in acp/acp.py
@dataclass
class NewSessionParams:
    """Parameters for ``session/new``.

    Attributes:
        cwd: Absolute path to the working directory.
        mcp_servers: MCP servers to connect to.
    """

    cwd: str
    mcp_servers: list[dict[str, Any]] | None = None

SessionMode dataclass

An operating mode for the agent.

Attributes:

Name Type Description
id str

Unique mode identifier.

name str

Human-readable mode name.

description str | None

Optional description.

Source code in acp/acp.py
@dataclass
class SessionMode:
    """An operating mode for the agent.

    Attributes:
        id: Unique mode identifier.
        name: Human-readable mode name.
        description: Optional description.
    """

    id: str
    name: str
    description: str | None = None

SessionModeState dataclass

Current mode state for a session.

Attributes:

Name Type Description
current_mode_id str

The currently active mode.

available_modes list[SessionMode]

All available modes.

Source code in acp/acp.py
@dataclass
class SessionModeState:
    """Current mode state for a session.

    Attributes:
        current_mode_id: The currently active mode.
        available_modes: All available modes.
    """

    current_mode_id: str
    available_modes: list[SessionMode] = field(default_factory=list)

ConfigOptionValue dataclass

A possible value for a configuration option.

Attributes:

Name Type Description
value str

Value identifier.

name str

Human-readable name.

description str | None

Optional description.

Source code in acp/acp.py
@dataclass
class ConfigOptionValue:
    """A possible value for a configuration option.

    Attributes:
        value: Value identifier.
        name: Human-readable name.
        description: Optional description.
    """

    value: str
    name: str
    description: str | None = None

ConfigOption dataclass

A session configuration option.

Attributes:

Name Type Description
id str

Unique option identifier.

name str

Human-readable label.

type str

Input control type (currently only "select").

current_value str

Currently selected value.

options list[ConfigOptionValue]

Available values.

description str | None

Optional description.

category str | None

Optional semantic category.

Source code in acp/acp.py
@dataclass
class ConfigOption:
    """A session configuration option.

    Attributes:
        id: Unique option identifier.
        name: Human-readable label.
        type: Input control type (currently only ``"select"``).
        current_value: Currently selected value.
        options: Available values.
        description: Optional description.
        category: Optional semantic category.
    """

    id: str
    name: str
    type: str
    current_value: str
    options: list[ConfigOptionValue] = field(default_factory=list)
    description: str | None = None
    category: str | None = None

NewSessionResult dataclass

Result of session/new.

Attributes:

Name Type Description
session_id str

Unique identifier for the created session.

modes SessionModeState | None

Optional mode state.

config_options list[ConfigOption] | None

Optional configuration options.

Source code in acp/acp.py
@dataclass
class NewSessionResult:
    """Result of ``session/new``.

    Attributes:
        session_id: Unique identifier for the created session.
        modes: Optional mode state.
        config_options: Optional configuration options.
    """

    session_id: str
    modes: SessionModeState | None = None
    config_options: list[ConfigOption] | None = None

LoadSessionParams dataclass

Parameters for session/load.

Attributes:

Name Type Description
session_id str

Session to resume.

cwd str

Working directory.

mcp_servers list[dict[str, Any]] | None

MCP servers to connect to.

Source code in acp/acp.py
@dataclass
class LoadSessionParams:
    """Parameters for ``session/load``.

    Attributes:
        session_id: Session to resume.
        cwd: Working directory.
        mcp_servers: MCP servers to connect to.
    """

    session_id: str
    cwd: str
    mcp_servers: list[dict[str, Any]] | None = None

PromptParams dataclass

Parameters for session/prompt.

Attributes:

Name Type Description
session_id str

Target session.

prompt list[ContentBlock]

Content blocks forming the user message.

Source code in acp/acp.py
@dataclass
class PromptParams:
    """Parameters for ``session/prompt``.

    Attributes:
        session_id: Target session.
        prompt: Content blocks forming the user message.
    """

    session_id: str
    prompt: list[ContentBlock] = field(default_factory=list)

PromptResult dataclass

Result of session/prompt.

Attributes:

Name Type Description
stop_reason StopReason

Why the agent stopped.

Source code in acp/acp.py
@dataclass
class PromptResult:
    """Result of ``session/prompt``.

    Attributes:
        stop_reason: Why the agent stopped.
    """

    stop_reason: StopReason

CancelParams dataclass

Parameters for session/cancel notification.

Attributes:

Name Type Description
session_id str

Session to cancel.

Source code in acp/acp.py
@dataclass
class CancelParams:
    """Parameters for ``session/cancel`` notification.

    Attributes:
        session_id: Session to cancel.
    """

    session_id: str

SetModeParams dataclass

Parameters for session/set_mode.

Attributes:

Name Type Description
session_id str

Target session.

mode_id str

Mode to switch to.

Source code in acp/acp.py
@dataclass
class SetModeParams:
    """Parameters for ``session/set_mode``.

    Attributes:
        session_id: Target session.
        mode_id: Mode to switch to.
    """

    session_id: str
    mode_id: str

SetConfigOptionParams dataclass

Parameters for session/set_config_option.

Attributes:

Name Type Description
session_id str

Target session.

config_id str

Configuration option id.

value str

New value.

Source code in acp/acp.py
@dataclass
class SetConfigOptionParams:
    """Parameters for ``session/set_config_option``.

    Attributes:
        session_id: Target session.
        config_id: Configuration option id.
        value: New value.
    """

    session_id: str
    config_id: str
    value: str

SetConfigOptionResult dataclass

Result of session/set_config_option.

Attributes:

Name Type Description
config_options list[ConfigOption]

Complete list of config options with current values.

Source code in acp/acp.py
@dataclass
class SetConfigOptionResult:
    """Result of ``session/set_config_option``.

    Attributes:
        config_options: Complete list of config options with current values.
    """

    config_options: list[ConfigOption] = field(default_factory=list)

ListSessionsParams dataclass

Parameters for session/list.

Attributes:

Name Type Description
cwd str | None

Optional directory filter.

cursor str | None

Optional pagination cursor.

Source code in acp/acp.py
@dataclass
class ListSessionsParams:
    """Parameters for ``session/list``.

    Attributes:
        cwd: Optional directory filter.
        cursor: Optional pagination cursor.
    """

    cwd: str | None = None
    cursor: str | None = None

SessionInfo dataclass

Metadata about an existing session.

Attributes:

Name Type Description
session_id str

Unique session identifier.

cwd str

Working directory.

title str | None

Optional human-readable title.

updated_at str | None

Optional ISO 8601 timestamp.

Source code in acp/acp.py
@dataclass
class SessionInfo:
    """Metadata about an existing session.

    Attributes:
        session_id: Unique session identifier.
        cwd: Working directory.
        title: Optional human-readable title.
        updated_at: Optional ISO 8601 timestamp.
    """

    session_id: str
    cwd: str
    title: str | None = None
    updated_at: str | None = None

ListSessionsResult dataclass

Result of session/list.

Attributes:

Name Type Description
sessions list[SessionInfo]

List of session metadata.

next_cursor str | None

Pagination cursor for the next page.

Source code in acp/acp.py
@dataclass
class ListSessionsResult:
    """Result of ``session/list``.

    Attributes:
        sessions: List of session metadata.
        next_cursor: Pagination cursor for the next page.
    """

    sessions: list[SessionInfo] = field(default_factory=list)
    next_cursor: str | None = None

ToolCallLocation dataclass

File location affected by a tool call.

Attributes:

Name Type Description
path str

Absolute file path.

line int | None

Optional line number (1-based).

Source code in acp/acp.py
@dataclass
class ToolCallLocation:
    """File location affected by a tool call.

    Attributes:
        path: Absolute file path.
        line: Optional line number (1-based).
    """

    path: str
    line: int | None = None

DiffContent dataclass

A file diff produced by a tool call.

Attributes:

Name Type Description
path str

Absolute path of the file being modified.

new_text str

New content after modification.

old_text str | None

Original content (None for new files).

type str

Discriminator (always "diff").

Source code in acp/acp.py
@dataclass
class DiffContent:
    """A file diff produced by a tool call.

    Attributes:
        path: Absolute path of the file being modified.
        new_text: New content after modification.
        old_text: Original content (``None`` for new files).
        type: Discriminator (always ``"diff"``).
    """

    path: str
    new_text: str
    old_text: str | None = None
    type: str = "diff"

TerminalContent dataclass

Reference to terminal output embedded in a tool call.

Attributes:

Name Type Description
terminal_id str

Id of the terminal created with terminal/create.

type str

Discriminator (always "terminal").

Source code in acp/acp.py
@dataclass
class TerminalContent:
    """Reference to terminal output embedded in a tool call.

    Attributes:
        terminal_id: Id of the terminal created with ``terminal/create``.
        type: Discriminator (always ``"terminal"``).
    """

    terminal_id: str
    type: str = "terminal"

ToolCallContentItem dataclass

A content item within a tool call (wraps a ContentBlock).

Attributes:

Name Type Description
content ContentBlock

The wrapped content block.

type str

Discriminator (always "content").

Source code in acp/acp.py
@dataclass
class ToolCallContentItem:
    """A content item within a tool call (wraps a ContentBlock).

    Attributes:
        content: The wrapped content block.
        type: Discriminator (always ``"content"``).
    """

    content: ContentBlock
    type: str = "content"

PermissionOption dataclass

A permission option presented to the user.

Attributes:

Name Type Description
option_id str

Unique option identifier.

name str

Human-readable label.

kind PermissionOptionKind

Permission kind hint.

Source code in acp/acp.py
@dataclass
class PermissionOption:
    """A permission option presented to the user.

    Attributes:
        option_id: Unique option identifier.
        name: Human-readable label.
        kind: Permission kind hint.
    """

    option_id: str
    name: str
    kind: PermissionOptionKind

PermissionOutcome dataclass

Outcome of a permission request.

Attributes:

Name Type Description
outcome str

"selected" or "cancelled".

option_id str | None

The selected option id (if outcome == "selected").

Source code in acp/acp.py
@dataclass
class PermissionOutcome:
    """Outcome of a permission request.

    Attributes:
        outcome: ``"selected"`` or ``"cancelled"``.
        option_id: The selected option id (if ``outcome == "selected"``).
    """

    outcome: str
    option_id: str | None = None

RequestPermissionParams dataclass

Parameters for session/request_permission (agent -> client).

Attributes:

Name Type Description
session_id str

Target session.

tool_call dict[str, Any]

Tool call update with details about the operation.

options list[PermissionOption]

Available permission options.

Source code in acp/acp.py
@dataclass
class RequestPermissionParams:
    """Parameters for ``session/request_permission`` (agent -> client).

    Attributes:
        session_id: Target session.
        tool_call: Tool call update with details about the operation.
        options: Available permission options.
    """

    session_id: str
    tool_call: dict[str, Any] = field(default_factory=dict)
    options: list[PermissionOption] = field(default_factory=list)

RequestPermissionResult dataclass

Result of session/request_permission.

Attributes:

Name Type Description
outcome PermissionOutcome

The user's decision.

Source code in acp/acp.py
@dataclass
class RequestPermissionResult:
    """Result of ``session/request_permission``.

    Attributes:
        outcome: The user's decision.
    """

    outcome: PermissionOutcome

PlanEntry dataclass

A single entry in an agent's execution plan.

Attributes:

Name Type Description
content str

Human-readable task description.

priority PlanEntryPriority

Relative importance.

status PlanEntryStatus

Current execution status.

Source code in acp/acp.py
@dataclass
class PlanEntry:
    """A single entry in an agent's execution plan.

    Attributes:
        content: Human-readable task description.
        priority: Relative importance.
        status: Current execution status.
    """

    content: str
    priority: PlanEntryPriority
    status: PlanEntryStatus

AvailableCommandInput dataclass

Input specification for a slash command.

Attributes:

Name Type Description
hint str

Placeholder text shown when no input has been provided.

Source code in acp/acp.py
@dataclass
class AvailableCommandInput:
    """Input specification for a slash command.

    Attributes:
        hint: Placeholder text shown when no input has been provided.
    """

    hint: str

AvailableCommand dataclass

A slash command advertised by the agent.

Attributes:

Name Type Description
name str

Command name (e.g. "web").

description str

Human-readable description.

input AvailableCommandInput | None

Optional input specification.

Source code in acp/acp.py
@dataclass
class AvailableCommand:
    """A slash command advertised by the agent.

    Attributes:
        name: Command name (e.g. ``"web"``).
        description: Human-readable description.
        input: Optional input specification.
    """

    name: str
    description: str
    input: AvailableCommandInput | None = None

ReadTextFileParams dataclass

Parameters for fs/read_text_file (agent -> client).

Attributes:

Name Type Description
session_id str

Target session.

path str

Absolute file path.

line int | None

Optional start line (1-based).

limit int | None

Optional max number of lines.

Source code in acp/acp.py
@dataclass
class ReadTextFileParams:
    """Parameters for ``fs/read_text_file`` (agent -> client).

    Attributes:
        session_id: Target session.
        path: Absolute file path.
        line: Optional start line (1-based).
        limit: Optional max number of lines.
    """

    session_id: str
    path: str
    line: int | None = None
    limit: int | None = None

ReadTextFileResult dataclass

Result of fs/read_text_file.

Attributes:

Name Type Description
content str

File text content.

Source code in acp/acp.py
@dataclass
class ReadTextFileResult:
    """Result of ``fs/read_text_file``.

    Attributes:
        content: File text content.
    """

    content: str

WriteTextFileParams dataclass

Parameters for fs/write_text_file (agent -> client).

Attributes:

Name Type Description
session_id str

Target session.

path str

Absolute file path.

content str

Text content to write.

Source code in acp/acp.py
@dataclass
class WriteTextFileParams:
    """Parameters for ``fs/write_text_file`` (agent -> client).

    Attributes:
        session_id: Target session.
        path: Absolute file path.
        content: Text content to write.
    """

    session_id: str
    path: str
    content: str

CreateTerminalParams dataclass

Parameters for terminal/create (agent -> client).

Attributes:

Name Type Description
session_id str

Target session.

command str

Command to execute.

args list[str] | None

Command arguments.

env list[EnvVariable] | None

Environment variables.

cwd str | None

Working directory (absolute path).

output_byte_limit int | None

Max bytes of output to retain.

Source code in acp/acp.py
@dataclass
class CreateTerminalParams:
    """Parameters for ``terminal/create`` (agent -> client).

    Attributes:
        session_id: Target session.
        command: Command to execute.
        args: Command arguments.
        env: Environment variables.
        cwd: Working directory (absolute path).
        output_byte_limit: Max bytes of output to retain.
    """

    session_id: str
    command: str
    args: list[str] | None = None
    env: list[EnvVariable] | None = None
    cwd: str | None = None
    output_byte_limit: int | None = None

CreateTerminalResult dataclass

Result of terminal/create.

Attributes:

Name Type Description
terminal_id str

Unique terminal identifier.

Source code in acp/acp.py
@dataclass
class CreateTerminalResult:
    """Result of ``terminal/create``.

    Attributes:
        terminal_id: Unique terminal identifier.
    """

    terminal_id: str

TerminalOutputParams dataclass

Parameters for terminal/output.

Attributes:

Name Type Description
session_id str

Target session.

terminal_id str

Terminal to query.

Source code in acp/acp.py
@dataclass
class TerminalOutputParams:
    """Parameters for ``terminal/output``.

    Attributes:
        session_id: Target session.
        terminal_id: Terminal to query.
    """

    session_id: str
    terminal_id: str

TerminalExitStatus dataclass

Terminal process exit status.

Attributes:

Name Type Description
exit_code int | None

Process exit code (may be None).

signal str | None

Termination signal (may be None).

Source code in acp/acp.py
@dataclass
class TerminalExitStatus:
    """Terminal process exit status.

    Attributes:
        exit_code: Process exit code (may be ``None``).
        signal: Termination signal (may be ``None``).
    """

    exit_code: int | None = None
    signal: str | None = None

TerminalOutputResult dataclass

Result of terminal/output.

Attributes:

Name Type Description
output str

Captured terminal output.

truncated bool

Whether output was truncated.

exit_status TerminalExitStatus | None

Present only if the command has exited.

Source code in acp/acp.py
@dataclass
class TerminalOutputResult:
    """Result of ``terminal/output``.

    Attributes:
        output: Captured terminal output.
        truncated: Whether output was truncated.
        exit_status: Present only if the command has exited.
    """

    output: str = ""
    truncated: bool = False
    exit_status: TerminalExitStatus | None = None

WaitForExitParams dataclass

Parameters for terminal/wait_for_exit.

Attributes:

Name Type Description
session_id str

Target session.

terminal_id str

Terminal to wait on.

Source code in acp/acp.py
@dataclass
class WaitForExitParams:
    """Parameters for ``terminal/wait_for_exit``.

    Attributes:
        session_id: Target session.
        terminal_id: Terminal to wait on.
    """

    session_id: str
    terminal_id: str

KillTerminalParams dataclass

Parameters for terminal/kill.

Attributes:

Name Type Description
session_id str

Target session.

terminal_id str

Terminal to kill.

Source code in acp/acp.py
@dataclass
class KillTerminalParams:
    """Parameters for ``terminal/kill``.

    Attributes:
        session_id: Target session.
        terminal_id: Terminal to kill.
    """

    session_id: str
    terminal_id: str

ReleaseTerminalParams dataclass

Parameters for terminal/release.

Attributes:

Name Type Description
session_id str

Target session.

terminal_id str

Terminal to release.

Source code in acp/acp.py
@dataclass
class ReleaseTerminalParams:
    """Parameters for ``terminal/release``.

    Attributes:
        session_id: Target session.
        terminal_id: Terminal to release.
    """

    session_id: str
    terminal_id: str

AgentMessageChunkUpdate dataclass

Agent text output chunk.

Attributes:

Name Type Description
content ContentBlock

The content block.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class AgentMessageChunkUpdate:
    """Agent text output chunk.

    Attributes:
        content: The content block.
        session_update: Discriminator.
    """

    content: ContentBlock
    session_update: str = "agent_message_chunk"

UserMessageChunkUpdate dataclass

Replayed user message chunk (used in session/load).

Attributes:

Name Type Description
content ContentBlock

The content block.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class UserMessageChunkUpdate:
    """Replayed user message chunk (used in ``session/load``).

    Attributes:
        content: The content block.
        session_update: Discriminator.
    """

    content: ContentBlock
    session_update: str = "user_message_chunk"

ThoughtMessageChunkUpdate dataclass

Agent internal reasoning chunk.

Attributes:

Name Type Description
content ContentBlock

The content block.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class ThoughtMessageChunkUpdate:
    """Agent internal reasoning chunk.

    Attributes:
        content: The content block.
        session_update: Discriminator.
    """

    content: ContentBlock
    session_update: str = "thought_message_chunk"

ToolCallUpdate dataclass

Initial tool call notification.

Attributes:

Name Type Description
tool_call_id str

Unique tool call identifier.

title str

Human-readable description.

kind ToolKind

Tool category.

status ToolCallStatus

Current execution status.

content list[ToolCallContent] | None

Tool call content items.

locations list[ToolCallLocation] | None

File locations affected.

raw_input dict[str, Any] | None

Raw input parameters.

raw_output dict[str, Any] | None

Raw output.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class ToolCallUpdate:
    """Initial tool call notification.

    Attributes:
        tool_call_id: Unique tool call identifier.
        title: Human-readable description.
        kind: Tool category.
        status: Current execution status.
        content: Tool call content items.
        locations: File locations affected.
        raw_input: Raw input parameters.
        raw_output: Raw output.
        session_update: Discriminator.
    """

    tool_call_id: str
    title: str
    kind: ToolKind = ToolKind.OTHER
    status: ToolCallStatus = ToolCallStatus.PENDING
    content: list[ToolCallContent] | None = None
    locations: list[ToolCallLocation] | None = None
    raw_input: dict[str, Any] | None = None
    raw_output: dict[str, Any] | None = None
    session_update: str = "tool_call"

ToolCallStatusUpdate dataclass

Tool call progress/result update.

Attributes:

Name Type Description
tool_call_id str

The tool call being updated.

status ToolCallStatus | None

New status.

content list[ToolCallContent] | None

Optional new content.

title str | None

Optional updated title.

locations list[ToolCallLocation] | None

Optional updated locations.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class ToolCallStatusUpdate:
    """Tool call progress/result update.

    Attributes:
        tool_call_id: The tool call being updated.
        status: New status.
        content: Optional new content.
        title: Optional updated title.
        locations: Optional updated locations.
        session_update: Discriminator.
    """

    tool_call_id: str
    status: ToolCallStatus | None = None
    content: list[ToolCallContent] | None = None
    title: str | None = None
    locations: list[ToolCallLocation] | None = None
    session_update: str = "tool_call_update"

PlanUpdate dataclass

Agent execution plan.

Attributes:

Name Type Description
entries list[PlanEntry]

Plan entries.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class PlanUpdate:
    """Agent execution plan.

    Attributes:
        entries: Plan entries.
        session_update: Discriminator.
    """

    entries: list[PlanEntry] = field(default_factory=list)
    session_update: str = "plan"

AvailableCommandsUpdate dataclass

Update to available slash commands.

Attributes:

Name Type Description
available_commands list[AvailableCommand]

Current list of commands.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class AvailableCommandsUpdate:
    """Update to available slash commands.

    Attributes:
        available_commands: Current list of commands.
        session_update: Discriminator.
    """

    available_commands: list[AvailableCommand] = field(default_factory=list)
    session_update: str = "available_commands_update"

CurrentModeUpdate dataclass

Notification that the agent changed its mode.

Attributes:

Name Type Description
mode_id str

New mode identifier.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class CurrentModeUpdate:
    """Notification that the agent changed its mode.

    Attributes:
        mode_id: New mode identifier.
        session_update: Discriminator.
    """

    mode_id: str
    session_update: str = "current_mode_update"

ConfigOptionUpdate dataclass

Notification that config options changed.

Attributes:

Name Type Description
config_options list[ConfigOption]

Complete configuration state.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class ConfigOptionUpdate:
    """Notification that config options changed.

    Attributes:
        config_options: Complete configuration state.
        session_update: Discriminator.
    """

    config_options: list[ConfigOption] = field(default_factory=list)
    session_update: str = "config_option_update"

SessionInfoUpdate dataclass

Update to session metadata.

Attributes:

Name Type Description
title str | None

Updated session title.

updated_at str | None

Updated timestamp.

session_update str

Discriminator.

Source code in acp/acp.py
@dataclass
class SessionInfoUpdate:
    """Update to session metadata.

    Attributes:
        title: Updated session title.
        updated_at: Updated timestamp.
        session_update: Discriminator.
    """

    title: str | None = None
    updated_at: str | None = None
    session_update: str = "session_info_update"

ACPClient

High-level async client for communicating with an ACP agent subprocess.

Usage::

client = ACPClient(["python", "-m", "my_agent"])
await client.start()
init = await client.initialize()
session = await client.new_session("/project")
async for update in client.prompt(session.session_id, "Hello"):
    print(update)
await client.stop()

Parameters:

Name Type Description Default
command list[str]

Command and arguments to spawn the agent process.

required
env dict[str, str] | None

Optional environment variables for the subprocess.

None
Source code in acp/acp.py
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
class ACPClient:
    """High-level async client for communicating with an ACP agent subprocess.

    Usage::

        client = ACPClient(["python", "-m", "my_agent"])
        await client.start()
        init = await client.initialize()
        session = await client.new_session("/project")
        async for update in client.prompt(session.session_id, "Hello"):
            print(update)
        await client.stop()

    Args:
        command: Command and arguments to spawn the agent process.
        env: Optional environment variables for the subprocess.
    """

    def __init__(
        self,
        command: list[str],
        env: dict[str, str] | None = None,
    ) -> None:
        self._command = command
        self._env = env
        self._process: asyncio.subprocess.Process | None = None
        self._transport: JSONRPCTransport | None = None
        self._pending: dict[Union[int, str], asyncio.Future[dict[str, Any]]] = {}
        self._notification_handlers: dict[str, Callable[..., Any]] = {}
        self._update_queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue()
        self._reader_task: asyncio.Task[None] | None = None
        self._request_handler: Callable[[str, dict[str, Any]], Any] | None = None

    async def start(self) -> None:
        """Launch the agent subprocess and begin reading messages."""
        self._process = await asyncio.create_subprocess_exec(
            *self._command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=self._env,
        )
        assert self._process.stdin is not None
        assert self._process.stdout is not None
        reader = asyncio.StreamReader()
        reader.set_transport(self._process.stdout._transport)  # type: ignore[union-attr]  # ty: ignore[unresolved-attribute]
        self._transport = JSONRPCTransport(
            reader=self._process.stdout,  # type: ignore[arg-type]
            writer=self._process.stdin,  # type: ignore[arg-type]
        )
        self._reader_task = asyncio.create_task(self._read_loop())

    async def stop(self) -> None:
        """Terminate the agent subprocess and clean up."""
        if self._reader_task:
            self._reader_task.cancel()
            try:
                await self._reader_task
            except asyncio.CancelledError:
                pass
        if self._transport:
            await self._transport.close()
        if self._process:
            self._process.terminate()
            try:
                await asyncio.wait_for(self._process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                self._process.kill()

    def set_request_handler(
        self, handler: Callable[[str, dict[str, Any]], Any]
    ) -> None:
        """Register a handler for incoming requests from the agent.

        This is used for requests like ``session/request_permission``,
        ``fs/read_text_file``, etc.

        Args:
            handler: Async callable ``(method, params) -> result``.
        """
        self._request_handler = handler

    def _is_response(self, msg: dict[str, Any]) -> bool:
        """Check if *msg* is a JSON-RPC response (has ``id`` and result/error)."""
        return "id" in msg and ("result" in msg or "error" in msg)

    def _resolve_pending_future(self, msg: dict[str, Any]) -> None:
        """Resolve or reject the future associated with a response message."""
        req_id = msg["id"]
        fut = self._pending.pop(req_id, None)
        if not fut or fut.done():
            return
        if "error" in msg:
            fut.set_exception(JSONRPCException(JSONRPCError.from_dict(msg["error"])))
        else:
            fut.set_result(msg.get("result") or {})

    async def _handle_notification(self, msg: dict[str, Any]) -> None:
        """Process an incoming notification from the agent."""
        method = msg["method"]
        params = msg.get("params", {})
        if method == "session/update":
            await self._update_queue.put(params)
        handler = self._notification_handlers.get(method)
        if handler:
            asyncio.create_task(handler(params))

    async def _read_loop(self) -> None:
        """Background task that reads and dispatches incoming messages."""
        assert self._transport is not None
        while True:
            msg = await self._transport.read_message()
            if msg is None:
                # EOF -- signal any waiting prompt iterators
                await self._update_queue.put(None)
                break

            if self._is_response(msg):
                self._resolve_pending_future(msg)
            elif "method" in msg and "id" not in msg:
                await self._handle_notification(msg)
            elif "method" in msg and "id" in msg:
                asyncio.create_task(self._handle_agent_request(msg))

    async def _handle_agent_request(self, msg: dict[str, Any]) -> None:
        """Handle an incoming request from the agent side."""
        assert self._transport is not None
        req_id = msg["id"]
        method = msg["method"]
        params = msg.get("params", {})
        if self._request_handler:
            try:
                result = self._request_handler(method, params)
                if asyncio.iscoroutine(result):
                    result = await result
                await self._transport.send_result(req_id, result)
            except Exception as exc:
                await self._transport.send_error(
                    req_id,
                    JSONRPCError(code=INTERNAL_ERROR, message=str(exc)),
                )
        else:
            await self._transport.send_error(
                req_id,
                JSONRPCError(
                    code=METHOD_NOT_FOUND,
                    message=f"No handler for {method}",
                ),
            )

    async def _call(self, method: str, params: Any = None) -> Any:
        """Send a request and wait for its response.

        Args:
            method: JSON-RPC method name.
            params: Method parameters.

        Returns:
            The result payload from the agent.

        Raises:
            JSONRPCException: If the agent returns an error.
        """
        assert self._transport is not None
        sent = await self._transport.send_request(method, params)
        req_id = sent["id"]
        loop = asyncio.get_running_loop()
        fut: asyncio.Future[dict[str, Any]] = loop.create_future()
        self._pending[req_id] = fut
        return await fut

    # -- Public API ---------------------------------------------------------

    async def initialize(
        self,
        params: InitializeParams | None = None,
    ) -> InitializeResult:
        """Perform the ``initialize`` handshake with the agent.

        Args:
            params: Initialization parameters (defaults provided if ``None``).

        Returns:
            The agent's initialization result.
        """
        if params is None:
            params = InitializeParams()
        raw = await self._call("initialize", to_dict(params))
        raw = raw or {}
        r = from_raw(raw)
        agent_info = None
        if "agent_info" in r:
            ai = r["agent_info"]
            agent_info = ImplementationInfo(
                name=ai.get("name", ""),
                version=ai.get("version", ""),
                title=ai.get("title"),
            )
        return InitializeResult(
            protocol_version=r.get("protocol_version", 1),
            agent_info=agent_info,
        )

    async def new_session(
        self,
        cwd: str,
        mcp_servers: list[dict[str, Any]] | None = None,
    ) -> NewSessionResult:
        """Create a new conversation session.

        Args:
            cwd: Absolute path to the working directory.
            mcp_servers: Optional MCP server configurations.

        Returns:
            The session creation result including the session id.
        """
        p: dict[str, Any] = {"cwd": cwd}
        if mcp_servers:
            p["mcpServers"] = mcp_servers
        raw = await self._call("session/new", p)
        raw = raw or {}
        r = from_raw(raw)
        return NewSessionResult(session_id=r.get("session_id", ""))

    async def load_session(
        self,
        session_id: str,
        cwd: str,
        mcp_servers: list[dict[str, Any]] | None = None,
    ) -> None:
        """Load (resume) an existing session.

        The agent will replay conversation history as ``session/update``
        notifications before responding.

        Args:
            session_id: Session to resume.
            cwd: Working directory.
            mcp_servers: Optional MCP server configurations.
        """
        p: dict[str, Any] = {"sessionId": session_id, "cwd": cwd}
        if mcp_servers:
            p["mcpServers"] = mcp_servers
        await self._call("session/load", p)

    @staticmethod
    def _build_prompt_blocks(
        text: str,
        extra_content: list[ContentBlock] | None,
    ) -> list[dict[str, Any]]:
        """Build the prompt content block list from text and extras."""
        blocks: list[dict[str, Any]] = [{"type": "text", "text": text}]
        if extra_content:
            for b in extra_content:
                blocks.append(to_dict(b))
        return blocks

    def _drain_stale_updates(self) -> None:
        """Discard any stale updates queued before a new prompt."""
        while not self._update_queue.empty():
            try:
                self._update_queue.get_nowait()
            except asyncio.QueueEmpty:
                break

    async def prompt(
        self,
        session_id: str,
        text: str,
        extra_content: list[ContentBlock] | None = None,
    ) -> AsyncIterator[dict[str, Any]]:
        """Send a prompt and yield ``session/update`` notifications.

        This is an async generator.  It yields each raw update dictionary
        until the ``session/prompt`` response is received.

        Args:
            session_id: Target session.
            text: User message text.
            extra_content: Additional content blocks to include.

        Yields:
            Raw ``session/update`` parameter dictionaries.
        """
        blocks = self._build_prompt_blocks(text, extra_content)
        params = {"sessionId": session_id, "prompt": blocks}
        self._drain_stale_updates()

        assert self._transport is not None
        sent = await self._transport.send_request("session/prompt", params)
        req_id = sent["id"]
        loop = asyncio.get_running_loop()
        prompt_fut: asyncio.Future[dict[str, Any]] = loop.create_future()
        self._pending[req_id] = prompt_fut

        while True:
            # Wait for either an update notification or the prompt response
            update_task = asyncio.create_task(self._update_queue.get())
            done, _ = await asyncio.wait(
                [update_task, prompt_fut],
                return_when=asyncio.FIRST_COMPLETED,
            )

            if update_task in done:
                update = update_task.result()
                if update is None:
                    return
                yield update
            else:
                update_task.cancel()

            if prompt_fut.done():
                for u in self._drain_remaining_updates():
                    yield u
                return

    def _drain_remaining_updates(self) -> list[dict[str, Any]]:
        """Collect any remaining non-None updates from the queue."""
        remaining: list[dict[str, Any]] = []
        while not self._update_queue.empty():
            update = self._update_queue.get_nowait()
            if update is not None:
                remaining.append(update)
        return remaining

    async def prompt_simple(
        self,
        session_id: str,
        text: str,
    ) -> tuple[list[dict[str, Any]], PromptResult]:
        """Send a prompt and collect all updates, returning them with the result.

        A simpler alternative to the async-generator ``prompt()`` method.

        Args:
            session_id: Target session.
            text: User message text.

        Returns:
            Tuple of (list of update dicts, PromptResult).
        """
        updates: list[dict[str, Any]] = []
        async for u in self.prompt(session_id, text):
            updates.append(u)
        # The prompt future should be resolved by now
        return updates, PromptResult(stop_reason=StopReason.END_TURN)

    async def cancel(self, session_id: str) -> None:
        """Cancel the current prompt turn.

        Args:
            session_id: Session to cancel.
        """
        assert self._transport is not None
        await self._transport.send_notification(
            "session/cancel", {"sessionId": session_id}
        )

    async def set_mode(self, session_id: str, mode_id: str) -> None:
        """Switch the agent operating mode.

        Args:
            session_id: Target session.
            mode_id: Mode to switch to.
        """
        await self._call(
            "session/set_mode",
            {"sessionId": session_id, "modeId": mode_id},
        )

    async def list_sessions(
        self,
        cwd: str | None = None,
        cursor: str | None = None,
    ) -> ListSessionsResult:
        """List sessions known to the agent.

        Args:
            cwd: Optional working-directory filter.
            cursor: Optional pagination cursor.

        Returns:
            List of session metadata.
        """
        p: dict[str, Any] = {}
        if cwd:
            p["cwd"] = cwd
        if cursor:
            p["cursor"] = cursor
        raw = await self._call("session/list", p)
        raw = raw or {}
        sessions: list[SessionInfo] = []
        for s in raw.get("sessions", []):
            sr = from_raw(s)
            sessions.append(
                SessionInfo(
                    session_id=sr.get("session_id", ""),
                    cwd=sr.get("cwd", ""),
                    title=sr.get("title"),
                    updated_at=sr.get("updated_at"),
                )
            )
        return ListSessionsResult(
            sessions=sessions,
            next_cursor=raw.get("nextCursor"),
        )

start() async

Launch the agent subprocess and begin reading messages.

Source code in acp/acp.py
async def start(self) -> None:
    """Launch the agent subprocess and begin reading messages."""
    self._process = await asyncio.create_subprocess_exec(
        *self._command,
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        env=self._env,
    )
    assert self._process.stdin is not None
    assert self._process.stdout is not None
    reader = asyncio.StreamReader()
    reader.set_transport(self._process.stdout._transport)  # type: ignore[union-attr]  # ty: ignore[unresolved-attribute]
    self._transport = JSONRPCTransport(
        reader=self._process.stdout,  # type: ignore[arg-type]
        writer=self._process.stdin,  # type: ignore[arg-type]
    )
    self._reader_task = asyncio.create_task(self._read_loop())

stop() async

Terminate the agent subprocess and clean up.

Source code in acp/acp.py
async def stop(self) -> None:
    """Terminate the agent subprocess and clean up."""
    if self._reader_task:
        self._reader_task.cancel()
        try:
            await self._reader_task
        except asyncio.CancelledError:
            pass
    if self._transport:
        await self._transport.close()
    if self._process:
        self._process.terminate()
        try:
            await asyncio.wait_for(self._process.wait(), timeout=5.0)
        except asyncio.TimeoutError:
            self._process.kill()

set_request_handler(handler)

Register a handler for incoming requests from the agent.

This is used for requests like session/request_permission, fs/read_text_file, etc.

Parameters:

Name Type Description Default
handler Callable[[str, dict[str, Any]], Any]

Async callable (method, params) -> result.

required
Source code in acp/acp.py
def set_request_handler(
    self, handler: Callable[[str, dict[str, Any]], Any]
) -> None:
    """Register a handler for incoming requests from the agent.

    This is used for requests like ``session/request_permission``,
    ``fs/read_text_file``, etc.

    Args:
        handler: Async callable ``(method, params) -> result``.
    """
    self._request_handler = handler

initialize(params=None) async

Perform the initialize handshake with the agent.

Parameters:

Name Type Description Default
params InitializeParams | None

Initialization parameters (defaults provided if None).

None

Returns:

Type Description
InitializeResult

The agent's initialization result.

Source code in acp/acp.py
async def initialize(
    self,
    params: InitializeParams | None = None,
) -> InitializeResult:
    """Perform the ``initialize`` handshake with the agent.

    Args:
        params: Initialization parameters (defaults provided if ``None``).

    Returns:
        The agent's initialization result.
    """
    if params is None:
        params = InitializeParams()
    raw = await self._call("initialize", to_dict(params))
    raw = raw or {}
    r = from_raw(raw)
    agent_info = None
    if "agent_info" in r:
        ai = r["agent_info"]
        agent_info = ImplementationInfo(
            name=ai.get("name", ""),
            version=ai.get("version", ""),
            title=ai.get("title"),
        )
    return InitializeResult(
        protocol_version=r.get("protocol_version", 1),
        agent_info=agent_info,
    )

new_session(cwd, mcp_servers=None) async

Create a new conversation session.

Parameters:

Name Type Description Default
cwd str

Absolute path to the working directory.

required
mcp_servers list[dict[str, Any]] | None

Optional MCP server configurations.

None

Returns:

Type Description
NewSessionResult

The session creation result including the session id.

Source code in acp/acp.py
async def new_session(
    self,
    cwd: str,
    mcp_servers: list[dict[str, Any]] | None = None,
) -> NewSessionResult:
    """Create a new conversation session.

    Args:
        cwd: Absolute path to the working directory.
        mcp_servers: Optional MCP server configurations.

    Returns:
        The session creation result including the session id.
    """
    p: dict[str, Any] = {"cwd": cwd}
    if mcp_servers:
        p["mcpServers"] = mcp_servers
    raw = await self._call("session/new", p)
    raw = raw or {}
    r = from_raw(raw)
    return NewSessionResult(session_id=r.get("session_id", ""))

load_session(session_id, cwd, mcp_servers=None) async

Load (resume) an existing session.

The agent will replay conversation history as session/update notifications before responding.

Parameters:

Name Type Description Default
session_id str

Session to resume.

required
cwd str

Working directory.

required
mcp_servers list[dict[str, Any]] | None

Optional MCP server configurations.

None
Source code in acp/acp.py
async def load_session(
    self,
    session_id: str,
    cwd: str,
    mcp_servers: list[dict[str, Any]] | None = None,
) -> None:
    """Load (resume) an existing session.

    The agent will replay conversation history as ``session/update``
    notifications before responding.

    Args:
        session_id: Session to resume.
        cwd: Working directory.
        mcp_servers: Optional MCP server configurations.
    """
    p: dict[str, Any] = {"sessionId": session_id, "cwd": cwd}
    if mcp_servers:
        p["mcpServers"] = mcp_servers
    await self._call("session/load", p)

prompt(session_id, text, extra_content=None) async

Send a prompt and yield session/update notifications.

This is an async generator. It yields each raw update dictionary until the session/prompt response is received.

Parameters:

Name Type Description Default
session_id str

Target session.

required
text str

User message text.

required
extra_content list[ContentBlock] | None

Additional content blocks to include.

None

Yields:

Type Description
AsyncIterator[dict[str, Any]]

Raw session/update parameter dictionaries.

Source code in acp/acp.py
async def prompt(
    self,
    session_id: str,
    text: str,
    extra_content: list[ContentBlock] | None = None,
) -> AsyncIterator[dict[str, Any]]:
    """Send a prompt and yield ``session/update`` notifications.

    This is an async generator.  It yields each raw update dictionary
    until the ``session/prompt`` response is received.

    Args:
        session_id: Target session.
        text: User message text.
        extra_content: Additional content blocks to include.

    Yields:
        Raw ``session/update`` parameter dictionaries.
    """
    blocks = self._build_prompt_blocks(text, extra_content)
    params = {"sessionId": session_id, "prompt": blocks}
    self._drain_stale_updates()

    assert self._transport is not None
    sent = await self._transport.send_request("session/prompt", params)
    req_id = sent["id"]
    loop = asyncio.get_running_loop()
    prompt_fut: asyncio.Future[dict[str, Any]] = loop.create_future()
    self._pending[req_id] = prompt_fut

    while True:
        # Wait for either an update notification or the prompt response
        update_task = asyncio.create_task(self._update_queue.get())
        done, _ = await asyncio.wait(
            [update_task, prompt_fut],
            return_when=asyncio.FIRST_COMPLETED,
        )

        if update_task in done:
            update = update_task.result()
            if update is None:
                return
            yield update
        else:
            update_task.cancel()

        if prompt_fut.done():
            for u in self._drain_remaining_updates():
                yield u
            return

prompt_simple(session_id, text) async

Send a prompt and collect all updates, returning them with the result.

A simpler alternative to the async-generator prompt() method.

Parameters:

Name Type Description Default
session_id str

Target session.

required
text str

User message text.

required

Returns:

Type Description
tuple[list[dict[str, Any]], PromptResult]

Tuple of (list of update dicts, PromptResult).

Source code in acp/acp.py
async def prompt_simple(
    self,
    session_id: str,
    text: str,
) -> tuple[list[dict[str, Any]], PromptResult]:
    """Send a prompt and collect all updates, returning them with the result.

    A simpler alternative to the async-generator ``prompt()`` method.

    Args:
        session_id: Target session.
        text: User message text.

    Returns:
        Tuple of (list of update dicts, PromptResult).
    """
    updates: list[dict[str, Any]] = []
    async for u in self.prompt(session_id, text):
        updates.append(u)
    # The prompt future should be resolved by now
    return updates, PromptResult(stop_reason=StopReason.END_TURN)

cancel(session_id) async

Cancel the current prompt turn.

Parameters:

Name Type Description Default
session_id str

Session to cancel.

required
Source code in acp/acp.py
async def cancel(self, session_id: str) -> None:
    """Cancel the current prompt turn.

    Args:
        session_id: Session to cancel.
    """
    assert self._transport is not None
    await self._transport.send_notification(
        "session/cancel", {"sessionId": session_id}
    )

set_mode(session_id, mode_id) async

Switch the agent operating mode.

Parameters:

Name Type Description Default
session_id str

Target session.

required
mode_id str

Mode to switch to.

required
Source code in acp/acp.py
async def set_mode(self, session_id: str, mode_id: str) -> None:
    """Switch the agent operating mode.

    Args:
        session_id: Target session.
        mode_id: Mode to switch to.
    """
    await self._call(
        "session/set_mode",
        {"sessionId": session_id, "modeId": mode_id},
    )

list_sessions(cwd=None, cursor=None) async

List sessions known to the agent.

Parameters:

Name Type Description Default
cwd str | None

Optional working-directory filter.

None
cursor str | None

Optional pagination cursor.

None

Returns:

Type Description
ListSessionsResult

List of session metadata.

Source code in acp/acp.py
async def list_sessions(
    self,
    cwd: str | None = None,
    cursor: str | None = None,
) -> ListSessionsResult:
    """List sessions known to the agent.

    Args:
        cwd: Optional working-directory filter.
        cursor: Optional pagination cursor.

    Returns:
        List of session metadata.
    """
    p: dict[str, Any] = {}
    if cwd:
        p["cwd"] = cwd
    if cursor:
        p["cursor"] = cursor
    raw = await self._call("session/list", p)
    raw = raw or {}
    sessions: list[SessionInfo] = []
    for s in raw.get("sessions", []):
        sr = from_raw(s)
        sessions.append(
            SessionInfo(
                session_id=sr.get("session_id", ""),
                cwd=sr.get("cwd", ""),
                title=sr.get("title"),
                updated_at=sr.get("updated_at"),
            )
        )
    return ListSessionsResult(
        sessions=sessions,
        next_cursor=raw.get("nextCursor"),
    )

ACPAgent

Bases: ABC

Abstract base class for implementing an ACP-compatible agent.

Subclass this and override the on_* methods. Call await agent.run() to start the stdio event loop.

Example::

class MyAgent(ACPAgent):
    async def on_initialize(self, params):
        return InitializeResult(protocol_version=1)
    async def on_new_session(self, params):
        return NewSessionResult(session_id=str(uuid.uuid4()))
    async def on_prompt(self, params):
        await self.send_update(params.session_id,
            AgentMessageChunkUpdate(content=TextContent(text="Hi!")))
        return PromptResult(stop_reason=StopReason.END_TURN)
Source code in acp/acp.py
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
class ACPAgent(ABC):
    """Abstract base class for implementing an ACP-compatible agent.

    Subclass this and override the ``on_*`` methods.  Call ``await agent.run()``
    to start the stdio event loop.

    Example::

        class MyAgent(ACPAgent):
            async def on_initialize(self, params):
                return InitializeResult(protocol_version=1)
            async def on_new_session(self, params):
                return NewSessionResult(session_id=str(uuid.uuid4()))
            async def on_prompt(self, params):
                await self.send_update(params.session_id,
                    AgentMessageChunkUpdate(content=TextContent(text="Hi!")))
                return PromptResult(stop_reason=StopReason.END_TURN)
    """

    def __init__(self) -> None:
        self._transport: JSONRPCTransport | None = None
        self._running = False

    # -- Handler methods (override these) -----------------------------------

    @abstractmethod
    async def on_initialize(self, params: InitializeParams) -> InitializeResult:
        """Handle the ``initialize`` request.

        Args:
            params: Initialization parameters from the client.

        Returns:
            Initialization result with agent capabilities.
        """
        ...

    @abstractmethod
    async def on_new_session(self, params: NewSessionParams) -> NewSessionResult:
        """Handle ``session/new``.

        Args:
            params: Session creation parameters.

        Returns:
            Result containing the new session id.
        """
        ...

    @abstractmethod
    async def on_prompt(self, params: PromptParams) -> PromptResult:
        """Handle ``session/prompt``.

        Use ``self.send_update()`` to stream updates back to the client
        before returning the final result.

        Args:
            params: Prompt parameters including user message.

        Returns:
            Result with the stop reason.
        """
        ...

    async def on_load_session(self, params: LoadSessionParams) -> None:
        """Handle ``session/load``.  Override to support session resumption.

        Args:
            params: Session load parameters.
        """
        raise JSONRPCException(
            JSONRPCError(
                code=METHOD_NOT_FOUND,
                message="session/load not supported",
            )
        )

    async def on_cancel(self, params: CancelParams) -> None:
        """Handle ``session/cancel`` notification.

        Args:
            params: Cancel parameters.
        """

    async def on_set_mode(self, params: SetModeParams) -> None:
        """Handle ``session/set_mode``.

        Args:
            params: Mode change parameters.
        """

    async def on_set_config_option(
        self, params: SetConfigOptionParams
    ) -> SetConfigOptionResult:
        """Handle ``session/set_config_option``.

        Args:
            params: Config option change parameters.

        Returns:
            Complete configuration state.
        """
        return SetConfigOptionResult()

    async def on_list_sessions(self, params: ListSessionsParams) -> ListSessionsResult:
        """Handle ``session/list``.

        Args:
            params: List sessions parameters.

        Returns:
            List of session metadata.
        """
        return ListSessionsResult()

    # -- Outgoing helpers ---------------------------------------------------

    async def send_update(self, session_id: str, update: SessionUpdate) -> None:
        """Send a ``session/update`` notification to the client.

        Args:
            session_id: Target session.
            update: The update payload.
        """
        assert self._transport is not None
        await self._transport.send_notification(
            "session/update",
            {"sessionId": session_id, "update": to_dict(update)},
        )

    async def request_permission(
        self,
        session_id: str,
        tool_call: dict[str, Any],
        options: list[PermissionOption],
    ) -> PermissionOutcome:
        """Request permission from the client for a tool call.

        Args:
            session_id: Target session.
            tool_call: Tool call details.
            options: Available permission options.

        Returns:
            The user's decision.

        Raises:
            JSONRPCException: If the client returns an error.
        """
        assert self._transport is not None
        sent = await self._transport.send_request(
            "session/request_permission",
            {
                "sessionId": session_id,
                "toolCall": tool_call,
                "options": [to_dict(o) for o in options],
            },
        )
        # We need to wait for the response in-line.  Since run() owns the
        # read loop, we store a future that run() will resolve.
        req_id = sent["id"]
        loop = asyncio.get_running_loop()
        fut: asyncio.Future[dict[str, Any]] = loop.create_future()
        self._pending_requests[req_id] = fut
        raw = await fut
        outcome = raw.get("outcome", {})
        return PermissionOutcome(
            outcome=outcome.get("outcome", "cancelled"),
            option_id=outcome.get("optionId"),
        )

    async def read_text_file(
        self,
        session_id: str,
        path: str,
        *,
        line: int | None = None,
        limit: int | None = None,
    ) -> str:
        """Read a text file via the client's ``fs/read_text_file`` method.

        Args:
            session_id: Target session.
            path: Absolute file path.
            line: Optional start line (1-based).
            limit: Optional max lines.

        Returns:
            File text content.
        """
        assert self._transport is not None
        p: dict[str, Any] = {"sessionId": session_id, "path": path}
        if line is not None:
            p["line"] = line
        if limit is not None:
            p["limit"] = limit
        sent = await self._transport.send_request("fs/read_text_file", p)
        req_id = sent["id"]
        loop = asyncio.get_running_loop()
        fut: asyncio.Future[dict[str, Any]] = loop.create_future()
        self._pending_requests[req_id] = fut
        raw = await fut
        return raw.get("content", "")

    async def write_text_file(self, session_id: str, path: str, content: str) -> None:
        """Write a text file via the client's ``fs/write_text_file`` method.

        Args:
            session_id: Target session.
            path: Absolute file path.
            content: Text content to write.
        """
        assert self._transport is not None
        sent = await self._transport.send_request(
            "fs/write_text_file",
            {"sessionId": session_id, "path": path, "content": content},
        )
        req_id = sent["id"]
        loop = asyncio.get_running_loop()
        fut: asyncio.Future[dict[str, Any]] = loop.create_future()
        self._pending_requests[req_id] = fut
        await fut

    async def create_terminal(
        self,
        session_id: str,
        command: str,
        *,
        args: list[str] | None = None,
        cwd: str | None = None,
    ) -> str:
        """Create a terminal via the client's ``terminal/create`` method.

        Args:
            session_id: Target session.
            command: Command to execute.
            args: Command arguments.
            cwd: Working directory.

        Returns:
            Terminal id.
        """
        assert self._transport is not None
        p: dict[str, Any] = {"sessionId": session_id, "command": command}
        if args:
            p["args"] = args
        if cwd:
            p["cwd"] = cwd
        sent = await self._transport.send_request("terminal/create", p)
        req_id = sent["id"]
        loop = asyncio.get_running_loop()
        fut: asyncio.Future[dict[str, Any]] = loop.create_future()
        self._pending_requests[req_id] = fut
        raw = await fut
        return raw.get("terminalId", "")

    # -- Main event loop ----------------------------------------------------

    async def run(self) -> None:
        """Run the agent, reading from stdin and writing to stdout.

        This blocks until the client closes the connection (EOF on stdin).
        """
        loop = asyncio.get_running_loop()
        reader = asyncio.StreamReader()
        read_transport, _ = await loop.connect_read_pipe(
            lambda: asyncio.StreamReaderProtocol(reader), sys.stdin
        )

        # Build a proper StreamWriter backed by a StreamReaderProtocol so
        # that drain() works correctly on all Python versions.
        write_reader = asyncio.StreamReader()
        write_protocol = asyncio.StreamReaderProtocol(write_reader)
        write_transport, _ = await loop.connect_write_pipe(
            lambda: write_protocol, sys.stdout
        )
        writer = asyncio.StreamWriter(
            write_transport,
            write_protocol,
            write_reader,
            loop,
        )

        self._transport = JSONRPCTransport(reader, writer)
        self._running = True
        self._pending_requests: dict[
            Union[int, str], asyncio.Future[dict[str, Any]]
        ] = {}

        logger.info("Agent started, waiting for messages on stdin...")
        try:
            while self._running:
                msg = await self._transport.read_message()
                if msg is None:
                    break
                asyncio.create_task(self._dispatch(msg))
        except asyncio.CancelledError:
            pass
        finally:
            self._running = False
            await self._transport.close()
            read_transport.close()

    def _resolve_agent_response(self, msg: dict[str, Any]) -> bool:
        """Resolve a response to one of our outgoing requests.

        Returns:
            ``True`` if the message was a response and was handled.
        """
        if not ("id" in msg and ("result" in msg or "error" in msg)):
            return False
        req_id = msg["id"]
        fut = self._pending_requests.pop(req_id, None)
        if not fut or fut.done():
            return True
        if "error" in msg:
            fut.set_exception(JSONRPCException(JSONRPCError.from_dict(msg["error"])))
        else:
            fut.set_result(msg.get("result") or {})
        return True

    async def _dispatch_and_respond(
        self,
        method: str | None,
        params: dict[str, Any],
        req_id: int | str | None,
    ) -> None:
        """Invoke the method handler and send back the JSON-RPC result/error."""
        assert self._transport is not None
        is_notification = req_id is None
        try:
            result = await self._handle_method(method, params)
            if not is_notification:
                assert req_id is not None
                result_dict = to_dict(result) if result is not None else None
                await self._transport.send_result(req_id, result_dict)
        except JSONRPCException as exc:
            if not is_notification:
                await self._transport.send_error(req_id, exc.error)
        except Exception as exc:
            logger.exception("Unhandled error in %s", method)
            if not is_notification:
                await self._transport.send_error(
                    req_id,
                    JSONRPCError(code=INTERNAL_ERROR, message=str(exc)),
                )

    async def _dispatch(self, msg: dict[str, Any]) -> None:
        """Route an incoming JSON-RPC message to the appropriate handler."""
        if self._resolve_agent_response(msg):
            return
        await self._dispatch_and_respond(
            msg.get("method"), msg.get("params", {}), msg.get("id")
        )

    async def _handle_method(self, method: str | None, params: dict[str, Any]) -> Any:
        """Dispatch a method call to the correct handler."""
        if method == "initialize":
            rp = from_raw(params)
            client_info = None
            if "client_info" in rp:
                ci = rp["client_info"]
                client_info = ImplementationInfo(
                    name=ci.get("name", ""),
                    version=ci.get("version", ""),
                    title=ci.get("title"),
                )
            client_caps = None
            if "client_capabilities" in rp:
                cc = rp["client_capabilities"]
                fs = None
                if "fs" in cc:
                    f = cc["fs"]
                    fs = FsCapabilities(
                        read_text_file=f.get("readTextFile", False),
                        write_text_file=f.get("writeTextFile", False),
                    )
                client_caps = ClientCapabilities(
                    fs=fs, terminal=cc.get("terminal", False)
                )
            return await self.on_initialize(
                InitializeParams(
                    protocol_version=rp.get("protocol_version", 1),
                    client_capabilities=client_caps,
                    client_info=client_info,
                )
            )

        if method == "session/new":
            rp = from_raw(params)
            return await self.on_new_session(
                NewSessionParams(
                    cwd=rp.get("cwd", "."),
                    mcp_servers=rp.get("mcp_servers"),
                )
            )

        if method == "session/load":
            rp = from_raw(params)
            return await self.on_load_session(
                LoadSessionParams(
                    session_id=rp.get("session_id", ""),
                    cwd=rp.get("cwd", "."),
                    mcp_servers=rp.get("mcp_servers"),
                )
            )

        if method == "session/prompt":
            rp = from_raw(params)
            prompt_blocks: list[ContentBlock] = []
            for block in rp.get("prompt", []):
                prompt_blocks.append(_content_from_dict(block))
            return await self.on_prompt(
                PromptParams(
                    session_id=rp.get("session_id", ""),
                    prompt=prompt_blocks,
                )
            )

        if method == "session/cancel":
            rp = from_raw(params)
            await self.on_cancel(CancelParams(session_id=rp.get("session_id", "")))
            return None

        if method == "session/set_mode":
            rp = from_raw(params)
            await self.on_set_mode(
                SetModeParams(
                    session_id=rp.get("session_id", ""),
                    mode_id=rp.get("mode_id", ""),
                )
            )
            return None

        if method == "session/set_config_option":
            rp = from_raw(params)
            return await self.on_set_config_option(
                SetConfigOptionParams(
                    session_id=rp.get("session_id", ""),
                    config_id=rp.get("config_id", ""),
                    value=rp.get("value", ""),
                )
            )

        if method == "session/list":
            rp = from_raw(params)
            return await self.on_list_sessions(
                ListSessionsParams(
                    cwd=rp.get("cwd"),
                    cursor=rp.get("cursor"),
                )
            )

        raise JSONRPCException(
            JSONRPCError(
                code=METHOD_NOT_FOUND,
                message=f"Unknown method: {method}",
            )
        )

on_initialize(params) abstractmethod async

Handle the initialize request.

Parameters:

Name Type Description Default
params InitializeParams

Initialization parameters from the client.

required

Returns:

Type Description
InitializeResult

Initialization result with agent capabilities.

Source code in acp/acp.py
@abstractmethod
async def on_initialize(self, params: InitializeParams) -> InitializeResult:
    """Handle the ``initialize`` request.

    Args:
        params: Initialization parameters from the client.

    Returns:
        Initialization result with agent capabilities.
    """
    ...

on_new_session(params) abstractmethod async

Handle session/new.

Parameters:

Name Type Description Default
params NewSessionParams

Session creation parameters.

required

Returns:

Type Description
NewSessionResult

Result containing the new session id.

Source code in acp/acp.py
@abstractmethod
async def on_new_session(self, params: NewSessionParams) -> NewSessionResult:
    """Handle ``session/new``.

    Args:
        params: Session creation parameters.

    Returns:
        Result containing the new session id.
    """
    ...

on_prompt(params) abstractmethod async

Handle session/prompt.

Use self.send_update() to stream updates back to the client before returning the final result.

Parameters:

Name Type Description Default
params PromptParams

Prompt parameters including user message.

required

Returns:

Type Description
PromptResult

Result with the stop reason.

Source code in acp/acp.py
@abstractmethod
async def on_prompt(self, params: PromptParams) -> PromptResult:
    """Handle ``session/prompt``.

    Use ``self.send_update()`` to stream updates back to the client
    before returning the final result.

    Args:
        params: Prompt parameters including user message.

    Returns:
        Result with the stop reason.
    """
    ...

on_load_session(params) async

Handle session/load. Override to support session resumption.

Parameters:

Name Type Description Default
params LoadSessionParams

Session load parameters.

required
Source code in acp/acp.py
async def on_load_session(self, params: LoadSessionParams) -> None:
    """Handle ``session/load``.  Override to support session resumption.

    Args:
        params: Session load parameters.
    """
    raise JSONRPCException(
        JSONRPCError(
            code=METHOD_NOT_FOUND,
            message="session/load not supported",
        )
    )

on_cancel(params) async

Handle session/cancel notification.

Parameters:

Name Type Description Default
params CancelParams

Cancel parameters.

required
Source code in acp/acp.py
async def on_cancel(self, params: CancelParams) -> None:
    """Handle ``session/cancel`` notification.

    Args:
        params: Cancel parameters.
    """

on_set_mode(params) async

Handle session/set_mode.

Parameters:

Name Type Description Default
params SetModeParams

Mode change parameters.

required
Source code in acp/acp.py
async def on_set_mode(self, params: SetModeParams) -> None:
    """Handle ``session/set_mode``.

    Args:
        params: Mode change parameters.
    """

on_set_config_option(params) async

Handle session/set_config_option.

Parameters:

Name Type Description Default
params SetConfigOptionParams

Config option change parameters.

required

Returns:

Type Description
SetConfigOptionResult

Complete configuration state.

Source code in acp/acp.py
async def on_set_config_option(
    self, params: SetConfigOptionParams
) -> SetConfigOptionResult:
    """Handle ``session/set_config_option``.

    Args:
        params: Config option change parameters.

    Returns:
        Complete configuration state.
    """
    return SetConfigOptionResult()

on_list_sessions(params) async

Handle session/list.

Parameters:

Name Type Description Default
params ListSessionsParams

List sessions parameters.

required

Returns:

Type Description
ListSessionsResult

List of session metadata.

Source code in acp/acp.py
async def on_list_sessions(self, params: ListSessionsParams) -> ListSessionsResult:
    """Handle ``session/list``.

    Args:
        params: List sessions parameters.

    Returns:
        List of session metadata.
    """
    return ListSessionsResult()

send_update(session_id, update) async

Send a session/update notification to the client.

Parameters:

Name Type Description Default
session_id str

Target session.

required
update SessionUpdate

The update payload.

required
Source code in acp/acp.py
async def send_update(self, session_id: str, update: SessionUpdate) -> None:
    """Send a ``session/update`` notification to the client.

    Args:
        session_id: Target session.
        update: The update payload.
    """
    assert self._transport is not None
    await self._transport.send_notification(
        "session/update",
        {"sessionId": session_id, "update": to_dict(update)},
    )

request_permission(session_id, tool_call, options) async

Request permission from the client for a tool call.

Parameters:

Name Type Description Default
session_id str

Target session.

required
tool_call dict[str, Any]

Tool call details.

required
options list[PermissionOption]

Available permission options.

required

Returns:

Type Description
PermissionOutcome

The user's decision.

Raises:

Type Description
JSONRPCException

If the client returns an error.

Source code in acp/acp.py
async def request_permission(
    self,
    session_id: str,
    tool_call: dict[str, Any],
    options: list[PermissionOption],
) -> PermissionOutcome:
    """Request permission from the client for a tool call.

    Args:
        session_id: Target session.
        tool_call: Tool call details.
        options: Available permission options.

    Returns:
        The user's decision.

    Raises:
        JSONRPCException: If the client returns an error.
    """
    assert self._transport is not None
    sent = await self._transport.send_request(
        "session/request_permission",
        {
            "sessionId": session_id,
            "toolCall": tool_call,
            "options": [to_dict(o) for o in options],
        },
    )
    # We need to wait for the response in-line.  Since run() owns the
    # read loop, we store a future that run() will resolve.
    req_id = sent["id"]
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[dict[str, Any]] = loop.create_future()
    self._pending_requests[req_id] = fut
    raw = await fut
    outcome = raw.get("outcome", {})
    return PermissionOutcome(
        outcome=outcome.get("outcome", "cancelled"),
        option_id=outcome.get("optionId"),
    )

read_text_file(session_id, path, *, line=None, limit=None) async

Read a text file via the client's fs/read_text_file method.

Parameters:

Name Type Description Default
session_id str

Target session.

required
path str

Absolute file path.

required
line int | None

Optional start line (1-based).

None
limit int | None

Optional max lines.

None

Returns:

Type Description
str

File text content.

Source code in acp/acp.py
async def read_text_file(
    self,
    session_id: str,
    path: str,
    *,
    line: int | None = None,
    limit: int | None = None,
) -> str:
    """Read a text file via the client's ``fs/read_text_file`` method.

    Args:
        session_id: Target session.
        path: Absolute file path.
        line: Optional start line (1-based).
        limit: Optional max lines.

    Returns:
        File text content.
    """
    assert self._transport is not None
    p: dict[str, Any] = {"sessionId": session_id, "path": path}
    if line is not None:
        p["line"] = line
    if limit is not None:
        p["limit"] = limit
    sent = await self._transport.send_request("fs/read_text_file", p)
    req_id = sent["id"]
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[dict[str, Any]] = loop.create_future()
    self._pending_requests[req_id] = fut
    raw = await fut
    return raw.get("content", "")

write_text_file(session_id, path, content) async

Write a text file via the client's fs/write_text_file method.

Parameters:

Name Type Description Default
session_id str

Target session.

required
path str

Absolute file path.

required
content str

Text content to write.

required
Source code in acp/acp.py
async def write_text_file(self, session_id: str, path: str, content: str) -> None:
    """Write a text file via the client's ``fs/write_text_file`` method.

    Args:
        session_id: Target session.
        path: Absolute file path.
        content: Text content to write.
    """
    assert self._transport is not None
    sent = await self._transport.send_request(
        "fs/write_text_file",
        {"sessionId": session_id, "path": path, "content": content},
    )
    req_id = sent["id"]
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[dict[str, Any]] = loop.create_future()
    self._pending_requests[req_id] = fut
    await fut

create_terminal(session_id, command, *, args=None, cwd=None) async

Create a terminal via the client's terminal/create method.

Parameters:

Name Type Description Default
session_id str

Target session.

required
command str

Command to execute.

required
args list[str] | None

Command arguments.

None
cwd str | None

Working directory.

None

Returns:

Type Description
str

Terminal id.

Source code in acp/acp.py
async def create_terminal(
    self,
    session_id: str,
    command: str,
    *,
    args: list[str] | None = None,
    cwd: str | None = None,
) -> str:
    """Create a terminal via the client's ``terminal/create`` method.

    Args:
        session_id: Target session.
        command: Command to execute.
        args: Command arguments.
        cwd: Working directory.

    Returns:
        Terminal id.
    """
    assert self._transport is not None
    p: dict[str, Any] = {"sessionId": session_id, "command": command}
    if args:
        p["args"] = args
    if cwd:
        p["cwd"] = cwd
    sent = await self._transport.send_request("terminal/create", p)
    req_id = sent["id"]
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[dict[str, Any]] = loop.create_future()
    self._pending_requests[req_id] = fut
    raw = await fut
    return raw.get("terminalId", "")

run() async

Run the agent, reading from stdin and writing to stdout.

This blocks until the client closes the connection (EOF on stdin).

Source code in acp/acp.py
async def run(self) -> None:
    """Run the agent, reading from stdin and writing to stdout.

    This blocks until the client closes the connection (EOF on stdin).
    """
    loop = asyncio.get_running_loop()
    reader = asyncio.StreamReader()
    read_transport, _ = await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader), sys.stdin
    )

    # Build a proper StreamWriter backed by a StreamReaderProtocol so
    # that drain() works correctly on all Python versions.
    write_reader = asyncio.StreamReader()
    write_protocol = asyncio.StreamReaderProtocol(write_reader)
    write_transport, _ = await loop.connect_write_pipe(
        lambda: write_protocol, sys.stdout
    )
    writer = asyncio.StreamWriter(
        write_transport,
        write_protocol,
        write_reader,
        loop,
    )

    self._transport = JSONRPCTransport(reader, writer)
    self._running = True
    self._pending_requests: dict[
        Union[int, str], asyncio.Future[dict[str, Any]]
    ] = {}

    logger.info("Agent started, waiting for messages on stdin...")
    try:
        while self._running:
            msg = await self._transport.read_message()
            if msg is None:
                break
            asyncio.create_task(self._dispatch(msg))
    except asyncio.CancelledError:
        pass
    finally:
        self._running = False
        await self._transport.close()
        read_transport.close()

to_dict(obj)

Recursively convert a dataclass instance to a JSON-friendly dict.

  • None values and empty collections are omitted.
  • Snake-case field names are converted to camelCase per the ACP spec.
  • Enum values are serialized as their .value.

Parameters:

Name Type Description Default
obj Any

A dataclass instance, dict, list, or primitive.

required

Returns:

Type Description
Any

A JSON-serializable value.

Source code in acp/acp.py
def to_dict(obj: Any) -> Any:
    """Recursively convert a dataclass instance to a JSON-friendly dict.

    * ``None`` values and empty collections are omitted.
    * Snake-case field names are converted to camelCase per the ACP spec.
    * Enum values are serialized as their ``.value``.

    Args:
        obj: A dataclass instance, dict, list, or primitive.

    Returns:
        A JSON-serializable value.
    """
    if obj is None:
        return None
    if isinstance(obj, Enum):
        return obj.value
    if isinstance(obj, (str, int, float, bool)):
        return obj
    if isinstance(obj, dict):
        return {k: to_dict(v) for k, v in obj.items() if v is not None}
    if isinstance(obj, (list, tuple)):
        return [to_dict(v) for v in obj]
    if hasattr(obj, "__dataclass_fields__"):
        result: dict[str, Any] = {}
        for f in fields(obj):
            val = getattr(obj, f.name)
            if val is None:
                continue
            if isinstance(val, (list, tuple)) and len(val) == 0:
                continue
            if isinstance(val, dict) and len(val) == 0:
                continue
            result[_to_camel(f.name)] = to_dict(val)
        return result
    return obj

from_raw(raw)

Convert camelCase keys in raw to snake_case.

Parameters:

Name Type Description Default
raw dict[str, Any]

A dictionary with camelCase keys.

required

Returns:

Type Description
dict[str, Any]

A new dictionary with snake_case keys.

Source code in acp/acp.py
def from_raw(raw: dict[str, Any]) -> dict[str, Any]:
    """Convert camelCase keys in *raw* to snake_case.

    Args:
        raw: A dictionary with camelCase keys.

    Returns:
        A new dictionary with snake_case keys.
    """
    return {_to_snake(k): v for k, v in raw.items()}