Skip to content

A2A API Reference

Auto-generated API documentation for the A2A module.

a2a

A2A (Agent-to-Agent Protocol) - Zero-dependency Python implementation.

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

A pure-stdlib implementation of Google's A2A protocol (v1.0) for agent-to-agent communication. Covers the JSON-RPC 2.0 binding with SSE streaming, an HTTP client, an HTTP server, and an in-memory task store.

Protocol reference

https://github.com/a2aproject/A2A https://a2a-protocol.org/specification

Requires

Python >= 3.10, no external packages.

Sections
  1. Protocol Data Types - dataclass models for the canonical A2A data model
  2. JSON-RPC 2.0 Layer - request / response / error / dispatcher
  3. SSE Utilities - server-sent events writer and parser
  4. A2A Client - urllib-based client with SSE streaming
  5. A2A Server - http.server-based server with SSE support
  6. Task Management - in-memory TaskStore and TaskManager

Example usage is provided in the if __name__ == "__main__" block at the bottom of this file.

JSONRPCDispatcher

Routes JSON-RPC method calls to registered handler functions.

Example::

dispatcher = JSONRPCDispatcher()

@dispatcher.register("SendMessage")
def handle_send(params):
    ...
    return result_dict
Source code in jsonrpc/jsonrpc.py
class JSONRPCDispatcher:
    """Routes JSON-RPC method calls to registered handler functions.

    Example::

        dispatcher = JSONRPCDispatcher()

        @dispatcher.register("SendMessage")
        def handle_send(params):
            ...
            return result_dict
    """

    def __init__(self) -> None:
        self._handlers: dict[str, MethodHandler] = {}

    def register(self, method: str) -> Callable[[MethodHandler], MethodHandler]:
        """Decorator to register a handler for *method*.

        Args:
            method: JSON-RPC method name (e.g. ``"SendMessage"``).

        Returns:
            The original handler function, unmodified.
        """

        def decorator(fn: MethodHandler) -> MethodHandler:
            self._handlers[method] = fn
            return fn

        return decorator

    def dispatch(
        self, request: JSONRPCRequest
    ) -> Union[JSONRPCResponse, Iterator[JSONRPCResponse]]:
        """Dispatch a parsed JSON-RPC request to the appropriate handler.

        Catches ``JSONRPCException`` (and subclasses) raised by handlers and
        converts them to error responses.

        Args:
            request: The parsed JSON-RPC request.

        Returns:
            A single ``JSONRPCResponse`` or, for streaming methods, a
            generator yielding ``JSONRPCResponse`` objects.
        """
        handler = self._handlers.get(request.method)
        if handler is None:
            return JSONRPCResponse(
                id=request.id,
                error=JSONRPCError(
                    code=METHOD_NOT_FOUND,
                    message=f"Method not found: {request.method}",
                ),
            )
        try:
            result = handler(request.params or {})
            if hasattr(result, "__next__"):
                return self._stream_wrap(request.id, result)
            return JSONRPCResponse.success(request.id, result)
        except JSONRPCException as exc:
            return JSONRPCResponse.from_error(request.id, exc)
        except Exception as exc:
            logger.exception("Unhandled error in handler %s", request.method)
            return JSONRPCResponse(
                id=request.id,
                error=JSONRPCError(code=INTERNAL_ERROR, message=str(exc)),
            )

    @staticmethod
    def _stream_wrap(request_id: Any, gen: Iterator[Any]) -> Iterator[JSONRPCResponse]:
        """Wrap a generator so each yielded value becomes a JSONRPCResponse."""
        try:
            for item in gen:
                yield JSONRPCResponse.success(request_id, item)
        except JSONRPCException as exc:
            yield JSONRPCResponse.from_error(request_id, exc)
        except Exception as exc:
            logger.exception("Unhandled error in streaming handler")
            yield JSONRPCResponse(
                id=request_id,
                error=JSONRPCError(code=INTERNAL_ERROR, message=str(exc)),
            )

register(method)

Decorator to register a handler for method.

Parameters:

Name Type Description Default
method str

JSON-RPC method name (e.g. "SendMessage").

required

Returns:

Type Description
Callable[[MethodHandler], MethodHandler]

The original handler function, unmodified.

Source code in jsonrpc/jsonrpc.py
def register(self, method: str) -> Callable[[MethodHandler], MethodHandler]:
    """Decorator to register a handler for *method*.

    Args:
        method: JSON-RPC method name (e.g. ``"SendMessage"``).

    Returns:
        The original handler function, unmodified.
    """

    def decorator(fn: MethodHandler) -> MethodHandler:
        self._handlers[method] = fn
        return fn

    return decorator

dispatch(request)

Dispatch a parsed JSON-RPC request to the appropriate handler.

Catches JSONRPCException (and subclasses) raised by handlers and converts them to error responses.

Parameters:

Name Type Description Default
request JSONRPCRequest

The parsed JSON-RPC request.

required

Returns:

Type Description
Union[JSONRPCResponse, Iterator[JSONRPCResponse]]

A single JSONRPCResponse or, for streaming methods, a

Union[JSONRPCResponse, Iterator[JSONRPCResponse]]

generator yielding JSONRPCResponse objects.

Source code in jsonrpc/jsonrpc.py
def dispatch(
    self, request: JSONRPCRequest
) -> Union[JSONRPCResponse, Iterator[JSONRPCResponse]]:
    """Dispatch a parsed JSON-RPC request to the appropriate handler.

    Catches ``JSONRPCException`` (and subclasses) raised by handlers and
    converts them to error responses.

    Args:
        request: The parsed JSON-RPC request.

    Returns:
        A single ``JSONRPCResponse`` or, for streaming methods, a
        generator yielding ``JSONRPCResponse`` objects.
    """
    handler = self._handlers.get(request.method)
    if handler is None:
        return JSONRPCResponse(
            id=request.id,
            error=JSONRPCError(
                code=METHOD_NOT_FOUND,
                message=f"Method not found: {request.method}",
            ),
        )
    try:
        result = handler(request.params or {})
        if hasattr(result, "__next__"):
            return self._stream_wrap(request.id, result)
        return JSONRPCResponse.success(request.id, result)
    except JSONRPCException as exc:
        return JSONRPCResponse.from_error(request.id, exc)
    except Exception as exc:
        logger.exception("Unhandled error in handler %s", request.method)
        return JSONRPCResponse(
            id=request.id,
            error=JSONRPCError(code=INTERNAL_ERROR, message=str(exc)),
        )

JSONRPCError dataclass

A JSON-RPC 2.0 error object.

Attributes:

Name Type Description
code int

Numeric error code.

message str

Human-readable error message.

data Any

Optional additional error data.

Source code in jsonrpc/jsonrpc.py
@dataclass
class JSONRPCError:
    """A JSON-RPC 2.0 error object.

    Attributes:
        code: Numeric error code.
        message: Human-readable error message.
        data: Optional additional error data.
    """

    code: int = INTERNAL_ERROR
    message: str = "Internal error"
    data: Any = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a dictionary."""
        d: dict[str, Any] = {"code": self.code, "message": self.message}
        if self.data is not None:
            d["data"] = self.data
        return d

    @classmethod
    def from_dict(cls, raw: dict[str, Any]) -> JSONRPCError:
        """Deserialize from a dictionary."""
        return cls(
            code=raw["code"],
            message=raw["message"],
            data=raw.get("data"),
        )

to_dict()

Serialize to a dictionary.

Source code in jsonrpc/jsonrpc.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dictionary."""
    d: dict[str, Any] = {"code": self.code, "message": self.message}
    if self.data is not None:
        d["data"] = self.data
    return d

from_dict(raw) classmethod

Deserialize from a dictionary.

Source code in jsonrpc/jsonrpc.py
@classmethod
def from_dict(cls, raw: dict[str, Any]) -> JSONRPCError:
    """Deserialize from a dictionary."""
    return cls(
        code=raw["code"],
        message=raw["message"],
        data=raw.get("data"),
    )

JSONRPCRequest dataclass

A JSON-RPC 2.0 request object.

Attributes:

Name Type Description
method str

The RPC method name.

params dict[str, Any] | None

Method parameters.

id Union[str, int, None]

Request identifier (None for notifications).

jsonrpc str

Protocol version (always "2.0").

Source code in jsonrpc/jsonrpc.py
@dataclass
class JSONRPCRequest:
    """A JSON-RPC 2.0 request object.

    Attributes:
        method: The RPC method name.
        params: Method parameters.
        id: Request identifier (``None`` for notifications).
        jsonrpc: Protocol version (always ``"2.0"``).
    """

    method: str = ""
    params: dict[str, Any] | None = None
    id: Union[str, int, None] = None
    jsonrpc: str = JSONRPC_VERSION

    @property
    def is_notification(self) -> bool:
        """Whether this request is a notification (no ``id``)."""
        return self.id is None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a dictionary."""
        d: dict[str, Any] = {"jsonrpc": self.jsonrpc, "method": self.method}
        if self.id is not None:
            d["id"] = self.id
        if self.params is not None:
            d["params"] = self.params
        return d

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> JSONRPCRequest:
        """Deserialize from a dictionary."""
        return cls(
            method=d.get("method", ""),
            params=d.get("params"),
            id=d.get("id"),
            jsonrpc=d.get("jsonrpc", JSONRPC_VERSION),
        )

is_notification property

Whether this request is a notification (no id).

to_dict()

Serialize to a dictionary.

Source code in jsonrpc/jsonrpc.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dictionary."""
    d: dict[str, Any] = {"jsonrpc": self.jsonrpc, "method": self.method}
    if self.id is not None:
        d["id"] = self.id
    if self.params is not None:
        d["params"] = self.params
    return d

from_dict(d) classmethod

Deserialize from a dictionary.

Source code in jsonrpc/jsonrpc.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> JSONRPCRequest:
    """Deserialize from a dictionary."""
    return cls(
        method=d.get("method", ""),
        params=d.get("params"),
        id=d.get("id"),
        jsonrpc=d.get("jsonrpc", JSONRPC_VERSION),
    )

JSONRPCResponse dataclass

A JSON-RPC 2.0 response object.

Exactly one of result or error should be set.

Attributes:

Name Type Description
id Union[str, int, None]

Matching request identifier.

result Any

Successful result payload.

error JSONRPCError | None

Error details on failure.

jsonrpc str

Protocol version (always "2.0").

Source code in jsonrpc/jsonrpc.py
@dataclass
class JSONRPCResponse:
    """A JSON-RPC 2.0 response object.

    Exactly one of ``result`` or ``error`` should be set.

    Attributes:
        id: Matching request identifier.
        result: Successful result payload.
        error: Error details on failure.
        jsonrpc: Protocol version (always ``"2.0"``).
    """

    id: Union[str, int, None] = None
    result: Any = None
    error: JSONRPCError | None = None
    jsonrpc: str = JSONRPC_VERSION

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a dictionary."""
        d: dict[str, Any] = {"jsonrpc": self.jsonrpc, "id": self.id}
        if self.error is not None:
            d["error"] = self.error.to_dict()
        else:
            d["result"] = self.result
        return d

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> JSONRPCResponse:
        """Deserialize from a dictionary."""
        error_raw = d.get("error")
        return cls(
            id=d.get("id"),
            result=d.get("result"),
            error=JSONRPCError.from_dict(error_raw) if error_raw else None,
            jsonrpc=d.get("jsonrpc", JSONRPC_VERSION),
        )

    @classmethod
    def success(cls, request_id: Any, result: Any) -> JSONRPCResponse:
        """Create a successful response."""
        return cls(id=request_id, result=result)

    @classmethod
    def from_error(cls, request_id: Any, error: Any) -> JSONRPCResponse:
        """Create an error response.

        Args:
            request_id: The id of the original request.
            error: A ``JSONRPCError`` dataclass, a ``JSONRPCException``
                (via its ``.error`` attribute), or any object with
                ``.code``, ``.rpc_message``, and ``.data`` attributes.
        """
        if isinstance(error, JSONRPCError):
            return cls(id=request_id, error=error)
        if isinstance(error, JSONRPCException):
            return cls(id=request_id, error=error.error)
        # Duck-typed protocol error (e.g. A2AError with code/rpc_message/data)
        return cls(
            id=request_id,
            error=JSONRPCError(
                code=getattr(error, "code", INTERNAL_ERROR),
                message=getattr(error, "rpc_message", str(error)),
                data=getattr(error, "data", None),
            ),
        )

to_dict()

Serialize to a dictionary.

Source code in jsonrpc/jsonrpc.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a dictionary."""
    d: dict[str, Any] = {"jsonrpc": self.jsonrpc, "id": self.id}
    if self.error is not None:
        d["error"] = self.error.to_dict()
    else:
        d["result"] = self.result
    return d

from_dict(d) classmethod

Deserialize from a dictionary.

Source code in jsonrpc/jsonrpc.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> JSONRPCResponse:
    """Deserialize from a dictionary."""
    error_raw = d.get("error")
    return cls(
        id=d.get("id"),
        result=d.get("result"),
        error=JSONRPCError.from_dict(error_raw) if error_raw else None,
        jsonrpc=d.get("jsonrpc", JSONRPC_VERSION),
    )

success(request_id, result) classmethod

Create a successful response.

Source code in jsonrpc/jsonrpc.py
@classmethod
def success(cls, request_id: Any, result: Any) -> JSONRPCResponse:
    """Create a successful response."""
    return cls(id=request_id, result=result)

from_error(request_id, error) classmethod

Create an error response.

Parameters:

Name Type Description Default
request_id Any

The id of the original request.

required
error Any

A JSONRPCError dataclass, a JSONRPCException (via its .error attribute), or any object with .code, .rpc_message, and .data attributes.

required
Source code in jsonrpc/jsonrpc.py
@classmethod
def from_error(cls, request_id: Any, error: Any) -> JSONRPCResponse:
    """Create an error response.

    Args:
        request_id: The id of the original request.
        error: A ``JSONRPCError`` dataclass, a ``JSONRPCException``
            (via its ``.error`` attribute), or any object with
            ``.code``, ``.rpc_message``, and ``.data`` attributes.
    """
    if isinstance(error, JSONRPCError):
        return cls(id=request_id, error=error)
    if isinstance(error, JSONRPCException):
        return cls(id=request_id, error=error.error)
    # Duck-typed protocol error (e.g. A2AError with code/rpc_message/data)
    return cls(
        id=request_id,
        error=JSONRPCError(
            code=getattr(error, "code", INTERNAL_ERROR),
            message=getattr(error, "rpc_message", str(error)),
            data=getattr(error, "data", None),
        ),
    )

TaskState

Bases: str, Enum

Lifecycle states of a Task (mirrors TaskState proto enum).

Source code in a2a/a2a.py
class TaskState(str, enum.Enum):
    """Lifecycle states of a Task (mirrors ``TaskState`` proto enum)."""

    UNSPECIFIED = "TASK_STATE_UNSPECIFIED"
    SUBMITTED = "TASK_STATE_SUBMITTED"
    WORKING = "TASK_STATE_WORKING"
    COMPLETED = "TASK_STATE_COMPLETED"
    FAILED = "TASK_STATE_FAILED"
    CANCELED = "TASK_STATE_CANCELED"
    INPUT_REQUIRED = "TASK_STATE_INPUT_REQUIRED"
    REJECTED = "TASK_STATE_REJECTED"
    AUTH_REQUIRED = "TASK_STATE_AUTH_REQUIRED"

    def is_terminal(self) -> bool:
        """Return True if the state is terminal (no further transitions)."""
        return self in (
            TaskState.COMPLETED,
            TaskState.FAILED,
            TaskState.CANCELED,
            TaskState.REJECTED,
        )

is_terminal()

Return True if the state is terminal (no further transitions).

Source code in a2a/a2a.py
def is_terminal(self) -> bool:
    """Return True if the state is terminal (no further transitions)."""
    return self in (
        TaskState.COMPLETED,
        TaskState.FAILED,
        TaskState.CANCELED,
        TaskState.REJECTED,
    )

Role

Bases: str, Enum

Sender role for a Message.

Source code in a2a/a2a.py
class Role(str, enum.Enum):
    """Sender role for a Message."""

    UNSPECIFIED = "ROLE_UNSPECIFIED"
    USER = "ROLE_USER"
    AGENT = "ROLE_AGENT"

Part dataclass

The smallest unit of content inside a Message or Artifact.

Exactly one of text, raw, url, or data should be set, corresponding to the oneof content in the proto definition.

Attributes:

Name Type Description
text str | None

Plain-text content.

raw str | None

Base64-encoded binary content.

url str | None

URL pointing to file content.

data Any | None

Arbitrary structured data (JSON value).

metadata dict[str, Any] | None

Optional key-value metadata.

filename str | None

Optional filename hint.

media_type str | None

MIME type of the content.

Source code in a2a/a2a.py
@dataclass
class Part:
    """The smallest unit of content inside a Message or Artifact.

    Exactly one of ``text``, ``raw``, ``url``, or ``data`` should be set,
    corresponding to the ``oneof content`` in the proto definition.

    Attributes:
        text: Plain-text content.
        raw: Base64-encoded binary content.
        url: URL pointing to file content.
        data: Arbitrary structured data (JSON value).
        metadata: Optional key-value metadata.
        filename: Optional filename hint.
        media_type: MIME type of the content.
    """

    text: str | None = None
    raw: str | None = None
    url: str | None = None
    data: Any | None = None
    metadata: dict[str, Any] | None = None
    filename: str | None = None
    media_type: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Part":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.text = _get("text")
        obj.raw = _get("raw")
        obj.url = _get("url")
        obj.data = _get("data")
        obj.metadata = _get("metadata")
        obj.filename = _get("filename")
        obj.media_type = _get("mediaType")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Part":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.text = _get("text")
    obj.raw = _get("raw")
    obj.url = _get("url")
    obj.data = _get("data")
    obj.metadata = _get("metadata")
    obj.filename = _get("filename")
    obj.media_type = _get("mediaType")
    return obj

Message dataclass

A single communication turn between client and agent.

Attributes:

Name Type Description
message_id str

Unique identifier for this message.

role Role

The sender role (user or agent).

parts list[Part]

Content parts of the message.

context_id str | None

Optional context grouping identifier.

task_id str | None

Optional associated task identifier.

metadata dict[str, Any] | None

Optional key-value metadata.

extensions list[str] | None

Extension URIs active for this message.

reference_task_ids list[str] | None

Task IDs referenced for additional context.

Source code in a2a/a2a.py
@dataclass
class Message:
    """A single communication turn between client and agent.

    Attributes:
        message_id: Unique identifier for this message.
        role: The sender role (user or agent).
        parts: Content parts of the message.
        context_id: Optional context grouping identifier.
        task_id: Optional associated task identifier.
        metadata: Optional key-value metadata.
        extensions: Extension URIs active for this message.
        reference_task_ids: Task IDs referenced for additional context.
    """

    message_id: str = ""
    role: Role = Role.USER
    parts: list[Part] = field(default_factory=list)
    context_id: str | None = None
    task_id: str | None = None
    metadata: dict[str, Any] | None = None
    extensions: list[str] | None = None
    reference_task_ids: list[str] | None = None

    def __post_init__(self) -> None:
        if not self.message_id:
            self.message_id = str(uuid.uuid4())

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Message":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        msg_id = _get("messageId", "")
        obj.message_id = msg_id if msg_id else str(uuid.uuid4())
        role_val = _get("role", "ROLE_USER")
        obj.role = (
            _ROLE_BY_VALUE.get(role_val) if isinstance(role_val, str) else role_val
        )  # type: ignore[assignment]
        raw_parts = _get("parts")
        obj.parts = [Part.from_dict(p) for p in raw_parts] if raw_parts else []
        obj.context_id = _get("contextId")
        obj.task_id = _get("taskId")
        obj.metadata = _get("metadata")
        obj.extensions = _get("extensions")
        obj.reference_task_ids = _get("referenceTaskIds")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Message":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    msg_id = _get("messageId", "")
    obj.message_id = msg_id if msg_id else str(uuid.uuid4())
    role_val = _get("role", "ROLE_USER")
    obj.role = (
        _ROLE_BY_VALUE.get(role_val) if isinstance(role_val, str) else role_val
    )  # type: ignore[assignment]
    raw_parts = _get("parts")
    obj.parts = [Part.from_dict(p) for p in raw_parts] if raw_parts else []
    obj.context_id = _get("contextId")
    obj.task_id = _get("taskId")
    obj.metadata = _get("metadata")
    obj.extensions = _get("extensions")
    obj.reference_task_ids = _get("referenceTaskIds")
    return obj

Artifact dataclass

An output produced by the agent as a result of task processing.

Attributes:

Name Type Description
artifact_id str

Unique identifier within a task.

parts list[Part]

Content parts of the artifact.

name str | None

Human-readable name.

description str | None

Human-readable description.

metadata dict[str, Any] | None

Optional key-value metadata.

extensions list[str] | None

Extension URIs relevant to this artifact.

Source code in a2a/a2a.py
@dataclass
class Artifact:
    """An output produced by the agent as a result of task processing.

    Attributes:
        artifact_id: Unique identifier within a task.
        parts: Content parts of the artifact.
        name: Human-readable name.
        description: Human-readable description.
        metadata: Optional key-value metadata.
        extensions: Extension URIs relevant to this artifact.
    """

    artifact_id: str = ""
    parts: list[Part] = field(default_factory=list)
    name: str | None = None
    description: str | None = None
    metadata: dict[str, Any] | None = None
    extensions: list[str] | None = None

    def __post_init__(self) -> None:
        if not self.artifact_id:
            self.artifact_id = str(uuid.uuid4())

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Artifact":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        art_id = _get("artifactId", "")
        obj.artifact_id = art_id if art_id else str(uuid.uuid4())
        raw_parts = _get("parts")
        obj.parts = [Part.from_dict(p) for p in raw_parts] if raw_parts else []
        obj.name = _get("name")
        obj.description = _get("description")
        obj.metadata = _get("metadata")
        obj.extensions = _get("extensions")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Artifact":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    art_id = _get("artifactId", "")
    obj.artifact_id = art_id if art_id else str(uuid.uuid4())
    raw_parts = _get("parts")
    obj.parts = [Part.from_dict(p) for p in raw_parts] if raw_parts else []
    obj.name = _get("name")
    obj.description = _get("description")
    obj.metadata = _get("metadata")
    obj.extensions = _get("extensions")
    return obj

TaskStatus dataclass

Current status of a Task.

Attributes:

Name Type Description
state TaskState

The lifecycle state.

message Message | None

An optional message associated with the status.

timestamp str | None

ISO 8601 timestamp when the status was recorded.

Source code in a2a/a2a.py
@dataclass
class TaskStatus:
    """Current status of a Task.

    Attributes:
        state: The lifecycle state.
        message: An optional message associated with the status.
        timestamp: ISO 8601 timestamp when the status was recorded.
    """

    state: TaskState = TaskState.SUBMITTED
    message: Message | None = None
    timestamp: str | None = None

    def __post_init__(self) -> None:
        if self.timestamp is None:
            self.timestamp = _now_iso()

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "TaskStatus":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        state_val = _get("state", "TASK_STATE_UNSPECIFIED")
        obj.state = (
            _TASKSTATE_BY_VALUE.get(state_val)
            if isinstance(state_val, str)
            else state_val
        )  # type: ignore[assignment]
        msg = _get("message")
        obj.message = Message.from_dict(msg) if msg else None
        ts = _get("timestamp")
        obj.timestamp = ts if ts is not None else _now_iso()
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "TaskStatus":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    state_val = _get("state", "TASK_STATE_UNSPECIFIED")
    obj.state = (
        _TASKSTATE_BY_VALUE.get(state_val)
        if isinstance(state_val, str)
        else state_val
    )  # type: ignore[assignment]
    msg = _get("message")
    obj.message = Message.from_dict(msg) if msg else None
    ts = _get("timestamp")
    obj.timestamp = ts if ts is not None else _now_iso()
    return obj

Task dataclass

The fundamental unit of work managed by A2A.

Attributes:

Name Type Description
id str

Server-generated unique identifier.

status TaskStatus

Current task status.

context_id str | None

Optional context grouping identifier.

artifacts list[Artifact] | None

Output artifacts produced so far.

history list[Message] | None

Message history for the task.

metadata dict[str, Any] | None

Optional key-value metadata.

Source code in a2a/a2a.py
@dataclass
class Task:
    """The fundamental unit of work managed by A2A.

    Attributes:
        id: Server-generated unique identifier.
        status: Current task status.
        context_id: Optional context grouping identifier.
        artifacts: Output artifacts produced so far.
        history: Message history for the task.
        metadata: Optional key-value metadata.
    """

    id: str = ""
    status: TaskStatus = field(default_factory=TaskStatus)
    context_id: str | None = None
    artifacts: list[Artifact] | None = None
    history: list[Message] | None = None
    metadata: dict[str, Any] | None = None

    def __post_init__(self) -> None:
        if not self.id:
            self.id = str(uuid.uuid4())

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "Task":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        task_id = _get("id", "")
        obj.id = task_id if task_id else str(uuid.uuid4())
        status_raw = _get("status")
        obj.status = TaskStatus.from_dict(status_raw) if status_raw else TaskStatus()
        obj.context_id = _get("contextId")
        artifacts_raw = _get("artifacts")
        obj.artifacts = (
            [Artifact.from_dict(a) for a in artifacts_raw]
            if artifacts_raw is not None
            else None
        )
        history_raw = _get("history")
        obj.history = (
            [Message.from_dict(m) for m in history_raw]
            if history_raw is not None
            else None
        )
        obj.metadata = _get("metadata")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Task":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    task_id = _get("id", "")
    obj.id = task_id if task_id else str(uuid.uuid4())
    status_raw = _get("status")
    obj.status = TaskStatus.from_dict(status_raw) if status_raw else TaskStatus()
    obj.context_id = _get("contextId")
    artifacts_raw = _get("artifacts")
    obj.artifacts = (
        [Artifact.from_dict(a) for a in artifacts_raw]
        if artifacts_raw is not None
        else None
    )
    history_raw = _get("history")
    obj.history = (
        [Message.from_dict(m) for m in history_raw]
        if history_raw is not None
        else None
    )
    obj.metadata = _get("metadata")
    return obj

TaskStatusUpdateEvent dataclass

Event indicating a change in task status.

Attributes:

Name Type Description
task_id str

The task that changed.

context_id str

The context the task belongs to.

status TaskStatus

The new status.

metadata dict[str, Any] | None

Optional metadata.

Source code in a2a/a2a.py
@dataclass
class TaskStatusUpdateEvent:
    """Event indicating a change in task status.

    Attributes:
        task_id: The task that changed.
        context_id: The context the task belongs to.
        status: The new status.
        metadata: Optional metadata.
    """

    task_id: str = ""
    context_id: str = ""
    status: TaskStatus = field(default_factory=TaskStatus)
    metadata: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "TaskStatusUpdateEvent":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.task_id = _get("taskId", "")
        obj.context_id = _get("contextId", "")
        status_raw = _get("status")
        obj.status = TaskStatus.from_dict(status_raw) if status_raw else TaskStatus()
        obj.metadata = _get("metadata")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "TaskStatusUpdateEvent":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.task_id = _get("taskId", "")
    obj.context_id = _get("contextId", "")
    status_raw = _get("status")
    obj.status = TaskStatus.from_dict(status_raw) if status_raw else TaskStatus()
    obj.metadata = _get("metadata")
    return obj

TaskArtifactUpdateEvent dataclass

Event indicating an artifact update on a task.

Attributes:

Name Type Description
task_id str

The task for this artifact.

context_id str

The context the task belongs to.

artifact Artifact

The artifact that was generated or updated.

append bool

If True, content appends to a previous artifact with the same ID.

last_chunk bool

If True, this is the final chunk of the artifact.

metadata dict[str, Any] | None

Optional metadata.

Source code in a2a/a2a.py
@dataclass
class TaskArtifactUpdateEvent:
    """Event indicating an artifact update on a task.

    Attributes:
        task_id: The task for this artifact.
        context_id: The context the task belongs to.
        artifact: The artifact that was generated or updated.
        append: If True, content appends to a previous artifact with the same ID.
        last_chunk: If True, this is the final chunk of the artifact.
        metadata: Optional metadata.
    """

    task_id: str = ""
    context_id: str = ""
    artifact: Artifact = field(default_factory=Artifact)
    append: bool = False
    last_chunk: bool = False
    metadata: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "TaskArtifactUpdateEvent":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.task_id = _get("taskId", "")
        obj.context_id = _get("contextId", "")
        art_raw = _get("artifact")
        obj.artifact = Artifact.from_dict(art_raw) if art_raw else Artifact()
        obj.append = _get("append", False)
        obj.last_chunk = _get("lastChunk", False)
        obj.metadata = _get("metadata")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "TaskArtifactUpdateEvent":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.task_id = _get("taskId", "")
    obj.context_id = _get("contextId", "")
    art_raw = _get("artifact")
    obj.artifact = Artifact.from_dict(art_raw) if art_raw else Artifact()
    obj.append = _get("append", False)
    obj.last_chunk = _get("lastChunk", False)
    obj.metadata = _get("metadata")
    return obj

StreamResponse dataclass

Wrapper for streaming responses (oneof semantics).

Exactly one of the four fields should be set.

Attributes:

Name Type Description
task Task | None

A Task object with current state.

message Message | None

A Message object.

status_update TaskStatusUpdateEvent | None

A TaskStatusUpdateEvent.

artifact_update TaskArtifactUpdateEvent | None

A TaskArtifactUpdateEvent.

Source code in a2a/a2a.py
@dataclass
class StreamResponse:
    """Wrapper for streaming responses (oneof semantics).

    Exactly one of the four fields should be set.

    Attributes:
        task: A Task object with current state.
        message: A Message object.
        status_update: A TaskStatusUpdateEvent.
        artifact_update: A TaskArtifactUpdateEvent.
    """

    task: Task | None = None
    message: Message | None = None
    status_update: TaskStatusUpdateEvent | None = None
    artifact_update: TaskArtifactUpdateEvent | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        if self.task is not None:
            return {"task": self.task.to_dict()}
        if self.message is not None:
            return {"message": self.message.to_dict()}
        if self.status_update is not None:
            return {"statusUpdate": self.status_update.to_dict()}
        if self.artifact_update is not None:
            return {"artifactUpdate": self.artifact_update.to_dict()}
        return {}

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "StreamResponse":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        task_d = _get("task")
        msg_d = _get("message")
        su_d = _get("statusUpdate")
        au_d = _get("artifactUpdate")
        obj.task = Task.from_dict(task_d) if task_d else None
        obj.message = Message.from_dict(msg_d) if msg_d else None
        obj.status_update = TaskStatusUpdateEvent.from_dict(su_d) if su_d else None
        obj.artifact_update = TaskArtifactUpdateEvent.from_dict(au_d) if au_d else None
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    if self.task is not None:
        return {"task": self.task.to_dict()}
    if self.message is not None:
        return {"message": self.message.to_dict()}
    if self.status_update is not None:
        return {"statusUpdate": self.status_update.to_dict()}
    if self.artifact_update is not None:
        return {"artifactUpdate": self.artifact_update.to_dict()}
    return {}

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "StreamResponse":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    task_d = _get("task")
    msg_d = _get("message")
    su_d = _get("statusUpdate")
    au_d = _get("artifactUpdate")
    obj.task = Task.from_dict(task_d) if task_d else None
    obj.message = Message.from_dict(msg_d) if msg_d else None
    obj.status_update = TaskStatusUpdateEvent.from_dict(su_d) if su_d else None
    obj.artifact_update = TaskArtifactUpdateEvent.from_dict(au_d) if au_d else None
    return obj

AuthenticationInfo dataclass

Authentication details for push notifications.

Attributes:

Name Type Description
scheme str

HTTP authentication scheme (e.g. "Bearer").

credentials str | None

The credential string.

Source code in a2a/a2a.py
@dataclass
class AuthenticationInfo:
    """Authentication details for push notifications.

    Attributes:
        scheme: HTTP authentication scheme (e.g. "Bearer").
        credentials: The credential string.
    """

    scheme: str = "Bearer"
    credentials: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AuthenticationInfo":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        obj.scheme = d.get("scheme", "Bearer")
        obj.credentials = d.get("credentials")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AuthenticationInfo":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    obj.scheme = d.get("scheme", "Bearer")
    obj.credentials = d.get("credentials")
    return obj

PushNotificationConfig dataclass

Configuration for push notification delivery.

Attributes:

Name Type Description
url str

Webhook URL where notifications are sent.

id str | None

Unique configuration identifier.

task_id str | None

The associated task ID.

token str | None

A client-provided token for verification.

authentication AuthenticationInfo | None

Authentication info for the webhook.

Source code in a2a/a2a.py
@dataclass
class PushNotificationConfig:
    """Configuration for push notification delivery.

    Attributes:
        url: Webhook URL where notifications are sent.
        id: Unique configuration identifier.
        task_id: The associated task ID.
        token: A client-provided token for verification.
        authentication: Authentication info for the webhook.
    """

    url: str = ""
    id: str | None = None
    task_id: str | None = None
    token: str | None = None
    authentication: AuthenticationInfo | None = None

    def __post_init__(self) -> None:
        if not self.id:
            self.id = str(uuid.uuid4())

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "PushNotificationConfig":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.url = _get("url", "")
        pnc_id = _get("id")
        obj.id = pnc_id if pnc_id else str(uuid.uuid4())
        obj.task_id = _get("taskId")
        obj.token = _get("token")
        auth = _get("authentication")
        obj.authentication = AuthenticationInfo.from_dict(auth) if auth else None
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "PushNotificationConfig":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.url = _get("url", "")
    pnc_id = _get("id")
    obj.id = pnc_id if pnc_id else str(uuid.uuid4())
    obj.task_id = _get("taskId")
    obj.token = _get("token")
    auth = _get("authentication")
    obj.authentication = AuthenticationInfo.from_dict(auth) if auth else None
    return obj

SendMessageConfiguration dataclass

Configuration accompanying a SendMessage request.

Attributes:

Name Type Description
accepted_output_modes list[str] | None

Media types the client can accept.

push_notification_config PushNotificationConfig | None

Optional push notification setup.

history_length int | None

Max number of history messages to return.

return_immediately bool

If True, return without waiting for completion.

Source code in a2a/a2a.py
@dataclass
class SendMessageConfiguration:
    """Configuration accompanying a SendMessage request.

    Attributes:
        accepted_output_modes: Media types the client can accept.
        push_notification_config: Optional push notification setup.
        history_length: Max number of history messages to return.
        return_immediately: If True, return without waiting for completion.
    """

    accepted_output_modes: list[str] | None = None
    push_notification_config: PushNotificationConfig | None = None
    history_length: int | None = None
    return_immediately: bool = False

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "SendMessageConfiguration":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.accepted_output_modes = _get("acceptedOutputModes")
        pnc = _get("pushNotificationConfig") or _get("taskPushNotificationConfig")
        obj.push_notification_config = (
            PushNotificationConfig.from_dict(pnc) if pnc else None
        )
        obj.history_length = _get("historyLength")
        obj.return_immediately = _get("returnImmediately", False)
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "SendMessageConfiguration":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.accepted_output_modes = _get("acceptedOutputModes")
    pnc = _get("pushNotificationConfig") or _get("taskPushNotificationConfig")
    obj.push_notification_config = (
        PushNotificationConfig.from_dict(pnc) if pnc else None
    )
    obj.history_length = _get("historyLength")
    obj.return_immediately = _get("returnImmediately", False)
    return obj

SendMessageRequest dataclass

Request object for the SendMessage / SendStreamingMessage operations.

Attributes:

Name Type Description
message Message

The message to send.

configuration SendMessageConfiguration | None

Optional send configuration.

metadata dict[str, Any] | None

Optional key-value metadata.

Source code in a2a/a2a.py
@dataclass
class SendMessageRequest:
    """Request object for the SendMessage / SendStreamingMessage operations.

    Attributes:
        message: The message to send.
        configuration: Optional send configuration.
        metadata: Optional key-value metadata.
    """

    message: Message = field(default_factory=Message)
    configuration: SendMessageConfiguration | None = None
    metadata: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "SendMessageRequest":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        msg_raw = _get("message")
        obj.message = Message.from_dict(msg_raw) if msg_raw else Message()
        cfg = _get("configuration")
        obj.configuration = SendMessageConfiguration.from_dict(cfg) if cfg else None
        obj.metadata = _get("metadata")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "SendMessageRequest":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    msg_raw = _get("message")
    obj.message = Message.from_dict(msg_raw) if msg_raw else Message()
    cfg = _get("configuration")
    obj.configuration = SendMessageConfiguration.from_dict(cfg) if cfg else None
    obj.metadata = _get("metadata")
    return obj

SendMessageResponse dataclass

Response for SendMessage (oneof task | message).

Attributes:

Name Type Description
task Task | None

The task created or updated.

message Message | None

A direct response message.

Source code in a2a/a2a.py
@dataclass
class SendMessageResponse:
    """Response for SendMessage (oneof task | message).

    Attributes:
        task: The task created or updated.
        message: A direct response message.
    """

    task: Task | None = None
    message: Message | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        if self.task is not None:
            return {"task": self.task.to_dict()}
        if self.message is not None:
            return {"message": self.message.to_dict()}
        return {}

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "SendMessageResponse":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        task_d = d.get("task")
        msg_d = d.get("message")
        obj.task = Task.from_dict(task_d) if task_d else None
        obj.message = Message.from_dict(msg_d) if msg_d else None
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    if self.task is not None:
        return {"task": self.task.to_dict()}
    if self.message is not None:
        return {"message": self.message.to_dict()}
    return {}

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "SendMessageResponse":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    task_d = d.get("task")
    msg_d = d.get("message")
    obj.task = Task.from_dict(task_d) if task_d else None
    obj.message = Message.from_dict(msg_d) if msg_d else None
    return obj

AgentProvider dataclass

Service provider information for an agent.

Attributes:

Name Type Description
organization str

Provider organization name.

url str

Provider URL.

Source code in a2a/a2a.py
@dataclass
class AgentProvider:
    """Service provider information for an agent.

    Attributes:
        organization: Provider organization name.
        url: Provider URL.
    """

    organization: str = ""
    url: str = ""

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AgentProvider":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        obj.organization = d.get("organization", "")
        obj.url = d.get("url", "")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentProvider":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    obj.organization = d.get("organization", "")
    obj.url = d.get("url", "")
    return obj

AgentExtension dataclass

Declaration of a protocol extension supported by an agent.

Attributes:

Name Type Description
uri str

Unique URI identifying the extension.

description str | None

Human-readable description.

required bool

Whether the client must support this extension.

params dict[str, Any] | None

Extension-specific configuration.

Source code in a2a/a2a.py
@dataclass
class AgentExtension:
    """Declaration of a protocol extension supported by an agent.

    Attributes:
        uri: Unique URI identifying the extension.
        description: Human-readable description.
        required: Whether the client must support this extension.
        params: Extension-specific configuration.
    """

    uri: str = ""
    description: str | None = None
    required: bool = False
    params: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AgentExtension":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.uri = _get("uri", "")
        obj.description = _get("description")
        obj.required = _get("required", False)
        obj.params = _get("params")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentExtension":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.uri = _get("uri", "")
    obj.description = _get("description")
    obj.required = _get("required", False)
    obj.params = _get("params")
    return obj

AgentCapabilities dataclass

Optional capabilities supported by an agent.

Attributes:

Name Type Description
streaming bool | None

Whether the agent supports streaming.

push_notifications bool | None

Whether the agent supports push notifications.

extensions list[AgentExtension] | None

List of supported protocol extensions.

extended_agent_card bool | None

Whether an authenticated extended card is available.

Source code in a2a/a2a.py
@dataclass
class AgentCapabilities:
    """Optional capabilities supported by an agent.

    Attributes:
        streaming: Whether the agent supports streaming.
        push_notifications: Whether the agent supports push notifications.
        extensions: List of supported protocol extensions.
        extended_agent_card: Whether an authenticated extended card is available.
    """

    streaming: bool | None = None
    push_notifications: bool | None = None
    extensions: list[AgentExtension] | None = None
    extended_agent_card: bool | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AgentCapabilities":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.streaming = _get("streaming")
        obj.push_notifications = _get("pushNotifications")
        exts = _get("extensions")
        obj.extensions = [AgentExtension.from_dict(e) for e in exts] if exts else None
        obj.extended_agent_card = _get("extendedAgentCard")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentCapabilities":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.streaming = _get("streaming")
    obj.push_notifications = _get("pushNotifications")
    exts = _get("extensions")
    obj.extensions = [AgentExtension.from_dict(e) for e in exts] if exts else None
    obj.extended_agent_card = _get("extendedAgentCard")
    return obj

AgentSkill dataclass

A distinct capability that an agent can perform.

Attributes:

Name Type Description
id str

Unique skill identifier.

name str

Human-readable name.

description str

Detailed description.

tags list[str]

Keywords describing the skill.

examples list[str] | None

Example prompts or scenarios.

input_modes list[str] | None

Supported input media types (overrides agent defaults).

output_modes list[str] | None

Supported output media types (overrides agent defaults).

Source code in a2a/a2a.py
@dataclass
class AgentSkill:
    """A distinct capability that an agent can perform.

    Attributes:
        id: Unique skill identifier.
        name: Human-readable name.
        description: Detailed description.
        tags: Keywords describing the skill.
        examples: Example prompts or scenarios.
        input_modes: Supported input media types (overrides agent defaults).
        output_modes: Supported output media types (overrides agent defaults).
    """

    id: str = ""
    name: str = ""
    description: str = ""
    tags: list[str] = field(default_factory=list)
    examples: list[str] | None = None
    input_modes: list[str] | None = None
    output_modes: list[str] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AgentSkill":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.id = _get("id", "")
        obj.name = _get("name", "")
        obj.description = _get("description", "")
        obj.tags = _get("tags", [])
        obj.examples = _get("examples")
        obj.input_modes = _get("inputModes")
        obj.output_modes = _get("outputModes")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentSkill":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.id = _get("id", "")
    obj.name = _get("name", "")
    obj.description = _get("description", "")
    obj.tags = _get("tags", [])
    obj.examples = _get("examples")
    obj.input_modes = _get("inputModes")
    obj.output_modes = _get("outputModes")
    return obj

AgentInterface dataclass

A supported protocol interface for an agent.

Attributes:

Name Type Description
url str

The URL where the interface is available.

protocol_binding str

Protocol binding type (JSONRPC, GRPC, HTTP+JSON).

protocol_version str

A2A protocol version (e.g. "1.0").

tenant str | None

Optional tenant identifier.

Source code in a2a/a2a.py
@dataclass
class AgentInterface:
    """A supported protocol interface for an agent.

    Attributes:
        url: The URL where the interface is available.
        protocol_binding: Protocol binding type (JSONRPC, GRPC, HTTP+JSON).
        protocol_version: A2A protocol version (e.g. "1.0").
        tenant: Optional tenant identifier.
    """

    url: str = ""
    protocol_binding: str = "JSONRPC"
    protocol_version: str = "1.0"
    tenant: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AgentInterface":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.url = _get("url", "")
        obj.protocol_binding = _get("protocolBinding", "JSONRPC")
        obj.protocol_version = _get("protocolVersion", "1.0")
        obj.tenant = _get("tenant")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentInterface":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.url = _get("url", "")
    obj.protocol_binding = _get("protocolBinding", "JSONRPC")
    obj.protocol_version = _get("protocolVersion", "1.0")
    obj.tenant = _get("tenant")
    return obj

AgentCard dataclass

Self-describing manifest for an A2A agent.

Attributes:

Name Type Description
name str

Human-readable agent name.

description str

Agent purpose description.

version str

Agent version string.

supported_interfaces list[AgentInterface]

Ordered list of supported protocol interfaces.

default_input_modes list[str]

Supported input media types.

default_output_modes list[str]

Supported output media types.

skills list[AgentSkill]

Agent capabilities / skills.

capabilities AgentCapabilities | None

Optional capability flags.

provider AgentProvider | None

Service provider info.

documentation_url str | None

Link to additional docs.

security_schemes dict[str, Any] | None

Security scheme definitions.

security_requirements list[dict[str, Any]] | None

Security requirements.

icon_url str | None

URL to an agent icon.

Source code in a2a/a2a.py
@dataclass
class AgentCard:
    """Self-describing manifest for an A2A agent.

    Attributes:
        name: Human-readable agent name.
        description: Agent purpose description.
        version: Agent version string.
        supported_interfaces: Ordered list of supported protocol interfaces.
        default_input_modes: Supported input media types.
        default_output_modes: Supported output media types.
        skills: Agent capabilities / skills.
        capabilities: Optional capability flags.
        provider: Service provider info.
        documentation_url: Link to additional docs.
        security_schemes: Security scheme definitions.
        security_requirements: Security requirements.
        icon_url: URL to an agent icon.
    """

    name: str = ""
    description: str = ""
    version: str = "1.0.0"
    supported_interfaces: list[AgentInterface] = field(default_factory=list)
    default_input_modes: list[str] = field(default_factory=lambda: ["text/plain"])
    default_output_modes: list[str] = field(default_factory=lambda: ["text/plain"])
    skills: list[AgentSkill] = field(default_factory=list)
    capabilities: AgentCapabilities | None = None
    provider: AgentProvider | None = None
    documentation_url: str | None = None
    security_schemes: dict[str, Any] | None = None
    security_requirements: list[dict[str, Any]] | None = None
    icon_url: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize to a camelCase dictionary."""
        return _serialize(self)

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> "AgentCard":
        """Deserialize from a camelCase dictionary."""
        obj = object.__new__(cls)
        _get = d.get
        obj.name = _get("name", "")
        obj.description = _get("description", "")
        obj.version = _get("version", "1.0.0")
        interfaces = _get("supportedInterfaces")
        obj.supported_interfaces = (
            [AgentInterface.from_dict(i) for i in interfaces] if interfaces else []
        )
        obj.default_input_modes = _get("defaultInputModes", ["text/plain"])
        obj.default_output_modes = _get("defaultOutputModes", ["text/plain"])
        skills = _get("skills")
        obj.skills = [AgentSkill.from_dict(s) for s in skills] if skills else []
        caps = _get("capabilities")
        obj.capabilities = AgentCapabilities.from_dict(caps) if caps else None
        provider = _get("provider")
        obj.provider = AgentProvider.from_dict(provider) if provider else None
        obj.documentation_url = _get("documentationUrl")
        obj.security_schemes = _get("securitySchemes")
        obj.security_requirements = _get("securityRequirements")
        obj.icon_url = _get("iconUrl")
        return obj

to_dict()

Serialize to a camelCase dictionary.

Source code in a2a/a2a.py
def to_dict(self) -> dict[str, Any]:
    """Serialize to a camelCase dictionary."""
    return _serialize(self)

from_dict(d) classmethod

Deserialize from a camelCase dictionary.

Source code in a2a/a2a.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "AgentCard":
    """Deserialize from a camelCase dictionary."""
    obj = object.__new__(cls)
    _get = d.get
    obj.name = _get("name", "")
    obj.description = _get("description", "")
    obj.version = _get("version", "1.0.0")
    interfaces = _get("supportedInterfaces")
    obj.supported_interfaces = (
        [AgentInterface.from_dict(i) for i in interfaces] if interfaces else []
    )
    obj.default_input_modes = _get("defaultInputModes", ["text/plain"])
    obj.default_output_modes = _get("defaultOutputModes", ["text/plain"])
    skills = _get("skills")
    obj.skills = [AgentSkill.from_dict(s) for s in skills] if skills else []
    caps = _get("capabilities")
    obj.capabilities = AgentCapabilities.from_dict(caps) if caps else None
    provider = _get("provider")
    obj.provider = AgentProvider.from_dict(provider) if provider else None
    obj.documentation_url = _get("documentationUrl")
    obj.security_schemes = _get("securitySchemes")
    obj.security_requirements = _get("securityRequirements")
    obj.icon_url = _get("iconUrl")
    return obj

A2AError

Bases: JSONRPCException

Base class for A2A protocol errors.

Source code in a2a/a2a.py
class A2AError(JSONRPCException):
    """Base class for A2A protocol errors."""

    code: int = INTERNAL_ERROR
    default_message: str = "Internal error"

    def __init__(self, message: str | None = None, data: Any = None):
        self.rpc_message = message or self.default_message
        self.data = data
        super().__init__(
            JSONRPCError(code=self.code, message=self.rpc_message, data=self.data)
        )

TaskNotFoundError

Bases: A2AError

The specified task ID does not exist or is not accessible.

Source code in a2a/a2a.py
class TaskNotFoundError(A2AError):
    """The specified task ID does not exist or is not accessible."""

    code = -32001
    default_message = "Task not found"

TaskNotCancelableError

Bases: A2AError

The task is not in a cancelable state.

Source code in a2a/a2a.py
class TaskNotCancelableError(A2AError):
    """The task is not in a cancelable state."""

    code = -32002
    default_message = "Task not cancelable"

PushNotificationNotSupportedError

Bases: A2AError

Push notification features are not supported by this agent.

Source code in a2a/a2a.py
class PushNotificationNotSupportedError(A2AError):
    """Push notification features are not supported by this agent."""

    code = -32003
    default_message = "Push notifications not supported"

UnsupportedOperationError

Bases: A2AError

The requested operation is not supported.

Source code in a2a/a2a.py
class UnsupportedOperationError(A2AError):
    """The requested operation is not supported."""

    code = -32004
    default_message = "Unsupported operation"

ContentTypeNotSupportedError

Bases: A2AError

A media type in the request is not supported.

Source code in a2a/a2a.py
class ContentTypeNotSupportedError(A2AError):
    """A media type in the request is not supported."""

    code = -32005
    default_message = "Content type not supported"

InvalidAgentResponseError

Bases: A2AError

The agent returned a response that does not conform to the spec.

Source code in a2a/a2a.py
class InvalidAgentResponseError(A2AError):
    """The agent returned a response that does not conform to the spec."""

    code = -32006
    default_message = "Invalid agent response"

A2AClient

HTTP client for the A2A JSON-RPC protocol binding.

Uses only urllib.request and http.client from the standard library.

Parameters:

Name Type Description Default
agent_card_url str | None

Full URL to the agent card JSON endpoint. If not provided, it is derived from base_url.

None
base_url str | None

The base JSON-RPC endpoint URL. If not provided, it is derived from the agent card once fetched.

None
headers dict[str, str] | None

Extra HTTP headers to include in every request.

None

Example::

client = A2AClient(base_url="http://localhost:8000")
card = client.get_agent_card()
resp = client.send_message("Hello, agent!")
Source code in a2a/a2a.py
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
class A2AClient:
    """HTTP client for the A2A JSON-RPC protocol binding.

    Uses only ``urllib.request`` and ``http.client`` from the standard library.

    Args:
        agent_card_url: Full URL to the agent card JSON endpoint.  If not
            provided, it is derived from *base_url*.
        base_url: The base JSON-RPC endpoint URL.  If not provided, it is
            derived from the agent card once fetched.
        headers: Extra HTTP headers to include in every request.

    Example::

        client = A2AClient(base_url="http://localhost:8000")
        card = client.get_agent_card()
        resp = client.send_message("Hello, agent!")
    """

    def __init__(
        self,
        agent_card_url: str | None = None,
        base_url: str | None = None,
        headers: dict[str, str] | None = None,
    ):
        self._base_url = base_url.rstrip("/") if base_url else None
        self._agent_card_url = agent_card_url
        self._headers = headers or {}
        self._agent_card: AgentCard | None = None

    # --- Agent Card ---

    def get_agent_card(self) -> AgentCard:
        """Fetch and cache the agent card.

        Returns:
            The ``AgentCard`` published by the remote agent.

        Raises:
            urllib.error.URLError: On network failure.
        """
        if self._agent_card is not None:
            return self._agent_card

        url = self._agent_card_url
        if url is None:
            if self._base_url is None:
                raise ValueError("Either agent_card_url or base_url must be provided")
            parsed = urllib.parse.urlparse(self._base_url)
            url = f"{parsed.scheme}://{parsed.netloc}/.well-known/agent-card.json"

        req = urllib.request.Request(url, headers=self._headers)
        with urllib.request.urlopen(req) as resp:
            data = json.loads(resp.read().decode("utf-8"))
        self._agent_card = AgentCard.from_dict(data)

        # Derive base_url from card if not set
        if self._base_url is None and self._agent_card.supported_interfaces:
            self._base_url = self._agent_card.supported_interfaces[0].url
        return self._agent_card

    # --- Low-level RPC ---

    def _rpc_url(self) -> str:
        """Return the JSON-RPC endpoint URL."""
        if self._base_url is None:
            raise ValueError("base_url is not configured")
        return self._base_url

    def _make_rpc_request(
        self,
        method: str,
        params: dict[str, Any] | None = None,
        request_id: Union[str, int, None] = None,
    ) -> JSONRPCResponse:
        """Send a JSON-RPC request and return the parsed response.

        Args:
            method: JSON-RPC method name.
            params: Method parameters.
            request_id: Optional request ID (defaults to auto-generated int).

        Returns:
            Parsed ``JSONRPCResponse``.
        """
        if request_id is None:
            request_id = id(params) & 0xFFFFFF
        rpc_req = JSONRPCRequest(method=method, params=params, id=request_id)
        body = json.dumps(rpc_req.to_dict()).encode("utf-8")
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            **self._headers,
        }
        req = urllib.request.Request(
            self._rpc_url(), data=body, headers=headers, method="POST"
        )
        with urllib.request.urlopen(req) as resp:
            resp_data = json.loads(resp.read().decode("utf-8"))
        rpc_resp = JSONRPCResponse(
            id=resp_data.get("id"),
            jsonrpc=resp_data.get("jsonrpc", "2.0"),
        )
        if "error" in resp_data and resp_data["error"] is not None:
            err = resp_data["error"]
            rpc_resp.error = JSONRPCError(
                code=err.get("code", -32603),
                message=err.get("message", "Unknown error"),
                data=err.get("data"),
            )
        else:
            rpc_resp.result = resp_data.get("result")
        return rpc_resp

    def _make_sse_request(
        self,
        method: str,
        params: dict[str, Any] | None = None,
        request_id: Union[str, int, None] = None,
    ) -> Iterator[dict[str, Any]]:
        """Send a JSON-RPC request and stream SSE responses.

        Uses ``http.client`` for chunked/streaming reads.

        Args:
            method: JSON-RPC method name.
            params: Method parameters.
            request_id: Optional request ID.

        Yields:
            Parsed JSON dictionaries from each SSE ``data:`` event.
        """
        if request_id is None:
            request_id = id(params) & 0xFFFFFF
        rpc_req = JSONRPCRequest(method=method, params=params, id=request_id)
        body = json.dumps(rpc_req.to_dict()).encode("utf-8")
        parsed = urllib.parse.urlparse(self._rpc_url())
        host = parsed.hostname or "localhost"

        if parsed.scheme == "https":
            conn = http.client.HTTPSConnection(host, parsed.port)
        else:
            conn = http.client.HTTPConnection(host, parsed.port)

        path = parsed.path or "/"
        headers = {
            "Content-Type": "application/json",
            "Accept": "text/event-stream",
            **self._headers,
        }
        try:
            conn.request("POST", path, body=body, headers=headers)
            response = conn.getresponse()
            yield from sse_decode_stream(response)
        finally:
            conn.close()

    # --- High-level API ---

    def send_message(
        self,
        text: str,
        *,
        task_id: str | None = None,
        context_id: str | None = None,
        configuration: SendMessageConfiguration | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> SendMessageResponse:
        """Send a text message to the agent (blocking).

        Args:
            text: The message text.
            task_id: Optional task to continue.
            context_id: Optional context to associate with.
            configuration: Optional send configuration.
            metadata: Optional request metadata.

        Returns:
            A ``SendMessageResponse`` containing either a Task or Message.
        """
        msg = Message(
            role=Role.USER,
            parts=[Part(text=text)],
            task_id=task_id,
            context_id=context_id,
        )
        req = SendMessageRequest(
            message=msg,
            configuration=configuration,
            metadata=metadata,
        )
        rpc_resp = self._make_rpc_request("SendMessage", req.to_dict())
        if rpc_resp.error:
            raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
        return SendMessageResponse.from_dict(rpc_resp.result or {})

    def send_message_streaming(
        self,
        text: str,
        *,
        task_id: str | None = None,
        context_id: str | None = None,
        configuration: SendMessageConfiguration | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> Iterator[StreamResponse]:
        """Send a text message and stream responses via SSE.

        Args:
            text: The message text.
            task_id: Optional task to continue.
            context_id: Optional context to associate with.
            configuration: Optional send configuration.
            metadata: Optional request metadata.

        Yields:
            ``StreamResponse`` objects as they arrive.
        """
        msg = Message(
            role=Role.USER,
            parts=[Part(text=text)],
            task_id=task_id,
            context_id=context_id,
        )
        req = SendMessageRequest(
            message=msg,
            configuration=configuration,
            metadata=metadata,
        )
        for event in self._make_sse_request("SendStreamingMessage", req.to_dict()):
            result = event.get("result")
            if result:
                yield StreamResponse.from_dict(result)

    def get_task(
        self,
        task_id: str,
        *,
        history_length: int | None = None,
    ) -> Task:
        """Retrieve the current state of a task.

        Args:
            task_id: The task identifier.
            history_length: Optional max number of history messages to return.

        Returns:
            The current ``Task`` object.
        """
        params: dict[str, Any] = {"id": task_id}
        if history_length is not None:
            params["historyLength"] = history_length
        rpc_resp = self._make_rpc_request("GetTask", params)
        if rpc_resp.error:
            raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
        return Task.from_dict(rpc_resp.result or {})

    def list_tasks(
        self,
        *,
        context_id: str | None = None,
        status: TaskState | None = None,
        page_size: int | None = None,
        page_token: str | None = None,
    ) -> dict[str, Any]:
        """List tasks with optional filtering.

        Args:
            context_id: Filter by context.
            status: Filter by task state.
            page_size: Maximum results per page.
            page_token: Cursor for pagination.

        Returns:
            Raw result dictionary with ``tasks``, ``nextPageToken``, etc.
        """
        params: dict[str, Any] = {}
        if context_id:
            params["contextId"] = context_id
        if status:
            params["status"] = status.value
        if page_size is not None:
            params["pageSize"] = page_size
        if page_token:
            params["pageToken"] = page_token
        rpc_resp = self._make_rpc_request("ListTasks", params)
        if rpc_resp.error:
            raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
        return rpc_resp.result or {}

    def cancel_task(self, task_id: str) -> Task:
        """Request cancellation of a task.

        Args:
            task_id: The task identifier.

        Returns:
            The updated ``Task`` with cancellation status.
        """
        rpc_resp = self._make_rpc_request("CancelTask", {"id": task_id})
        if rpc_resp.error:
            raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
        return Task.from_dict(rpc_resp.result or {})

get_agent_card()

Fetch and cache the agent card.

Returns:

Type Description
AgentCard

The AgentCard published by the remote agent.

Raises:

Type Description
URLError

On network failure.

Source code in a2a/a2a.py
def get_agent_card(self) -> AgentCard:
    """Fetch and cache the agent card.

    Returns:
        The ``AgentCard`` published by the remote agent.

    Raises:
        urllib.error.URLError: On network failure.
    """
    if self._agent_card is not None:
        return self._agent_card

    url = self._agent_card_url
    if url is None:
        if self._base_url is None:
            raise ValueError("Either agent_card_url or base_url must be provided")
        parsed = urllib.parse.urlparse(self._base_url)
        url = f"{parsed.scheme}://{parsed.netloc}/.well-known/agent-card.json"

    req = urllib.request.Request(url, headers=self._headers)
    with urllib.request.urlopen(req) as resp:
        data = json.loads(resp.read().decode("utf-8"))
    self._agent_card = AgentCard.from_dict(data)

    # Derive base_url from card if not set
    if self._base_url is None and self._agent_card.supported_interfaces:
        self._base_url = self._agent_card.supported_interfaces[0].url
    return self._agent_card

send_message(text, *, task_id=None, context_id=None, configuration=None, metadata=None)

Send a text message to the agent (blocking).

Parameters:

Name Type Description Default
text str

The message text.

required
task_id str | None

Optional task to continue.

None
context_id str | None

Optional context to associate with.

None
configuration SendMessageConfiguration | None

Optional send configuration.

None
metadata dict[str, Any] | None

Optional request metadata.

None

Returns:

Type Description
SendMessageResponse

A SendMessageResponse containing either a Task or Message.

Source code in a2a/a2a.py
def send_message(
    self,
    text: str,
    *,
    task_id: str | None = None,
    context_id: str | None = None,
    configuration: SendMessageConfiguration | None = None,
    metadata: dict[str, Any] | None = None,
) -> SendMessageResponse:
    """Send a text message to the agent (blocking).

    Args:
        text: The message text.
        task_id: Optional task to continue.
        context_id: Optional context to associate with.
        configuration: Optional send configuration.
        metadata: Optional request metadata.

    Returns:
        A ``SendMessageResponse`` containing either a Task or Message.
    """
    msg = Message(
        role=Role.USER,
        parts=[Part(text=text)],
        task_id=task_id,
        context_id=context_id,
    )
    req = SendMessageRequest(
        message=msg,
        configuration=configuration,
        metadata=metadata,
    )
    rpc_resp = self._make_rpc_request("SendMessage", req.to_dict())
    if rpc_resp.error:
        raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
    return SendMessageResponse.from_dict(rpc_resp.result or {})

send_message_streaming(text, *, task_id=None, context_id=None, configuration=None, metadata=None)

Send a text message and stream responses via SSE.

Parameters:

Name Type Description Default
text str

The message text.

required
task_id str | None

Optional task to continue.

None
context_id str | None

Optional context to associate with.

None
configuration SendMessageConfiguration | None

Optional send configuration.

None
metadata dict[str, Any] | None

Optional request metadata.

None

Yields:

Type Description
StreamResponse

StreamResponse objects as they arrive.

Source code in a2a/a2a.py
def send_message_streaming(
    self,
    text: str,
    *,
    task_id: str | None = None,
    context_id: str | None = None,
    configuration: SendMessageConfiguration | None = None,
    metadata: dict[str, Any] | None = None,
) -> Iterator[StreamResponse]:
    """Send a text message and stream responses via SSE.

    Args:
        text: The message text.
        task_id: Optional task to continue.
        context_id: Optional context to associate with.
        configuration: Optional send configuration.
        metadata: Optional request metadata.

    Yields:
        ``StreamResponse`` objects as they arrive.
    """
    msg = Message(
        role=Role.USER,
        parts=[Part(text=text)],
        task_id=task_id,
        context_id=context_id,
    )
    req = SendMessageRequest(
        message=msg,
        configuration=configuration,
        metadata=metadata,
    )
    for event in self._make_sse_request("SendStreamingMessage", req.to_dict()):
        result = event.get("result")
        if result:
            yield StreamResponse.from_dict(result)

get_task(task_id, *, history_length=None)

Retrieve the current state of a task.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required
history_length int | None

Optional max number of history messages to return.

None

Returns:

Type Description
Task

The current Task object.

Source code in a2a/a2a.py
def get_task(
    self,
    task_id: str,
    *,
    history_length: int | None = None,
) -> Task:
    """Retrieve the current state of a task.

    Args:
        task_id: The task identifier.
        history_length: Optional max number of history messages to return.

    Returns:
        The current ``Task`` object.
    """
    params: dict[str, Any] = {"id": task_id}
    if history_length is not None:
        params["historyLength"] = history_length
    rpc_resp = self._make_rpc_request("GetTask", params)
    if rpc_resp.error:
        raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
    return Task.from_dict(rpc_resp.result or {})

list_tasks(*, context_id=None, status=None, page_size=None, page_token=None)

List tasks with optional filtering.

Parameters:

Name Type Description Default
context_id str | None

Filter by context.

None
status TaskState | None

Filter by task state.

None
page_size int | None

Maximum results per page.

None
page_token str | None

Cursor for pagination.

None

Returns:

Type Description
dict[str, Any]

Raw result dictionary with tasks, nextPageToken, etc.

Source code in a2a/a2a.py
def list_tasks(
    self,
    *,
    context_id: str | None = None,
    status: TaskState | None = None,
    page_size: int | None = None,
    page_token: str | None = None,
) -> dict[str, Any]:
    """List tasks with optional filtering.

    Args:
        context_id: Filter by context.
        status: Filter by task state.
        page_size: Maximum results per page.
        page_token: Cursor for pagination.

    Returns:
        Raw result dictionary with ``tasks``, ``nextPageToken``, etc.
    """
    params: dict[str, Any] = {}
    if context_id:
        params["contextId"] = context_id
    if status:
        params["status"] = status.value
    if page_size is not None:
        params["pageSize"] = page_size
    if page_token:
        params["pageToken"] = page_token
    rpc_resp = self._make_rpc_request("ListTasks", params)
    if rpc_resp.error:
        raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
    return rpc_resp.result or {}

cancel_task(task_id)

Request cancellation of a task.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required

Returns:

Type Description
Task

The updated Task with cancellation status.

Source code in a2a/a2a.py
def cancel_task(self, task_id: str) -> Task:
    """Request cancellation of a task.

    Args:
        task_id: The task identifier.

    Returns:
        The updated ``Task`` with cancellation status.
    """
    rpc_resp = self._make_rpc_request("CancelTask", {"id": task_id})
    if rpc_resp.error:
        raise A2AError(rpc_resp.error.message, rpc_resp.error.data)
    return Task.from_dict(rpc_resp.result or {})

A2ARequestHandler

Bases: BaseHTTPRequestHandler

HTTP request handler for the A2A JSON-RPC protocol binding.

Subclass this and assign dispatcher and agent_card on the server to customise behavior, or use A2AServer which wires everything up.

Source code in a2a/a2a.py
class A2ARequestHandler(http.server.BaseHTTPRequestHandler):
    """HTTP request handler for the A2A JSON-RPC protocol binding.

    Subclass this and assign ``dispatcher`` and ``agent_card`` on the server
    to customise behavior, or use ``A2AServer`` which wires everything up.
    """

    # Suppress default stderr logging from BaseHTTPRequestHandler
    def log_message(self, format: str, *args: Any) -> None:
        logger.debug(format, *args)

    @property
    def _dispatcher(self) -> JSONRPCDispatcher:
        return self.server.dispatcher  # type: ignore[attr-defined]  # ty: ignore[unresolved-attribute]

    @property
    def _agent_card(self) -> AgentCard:
        return self.server.agent_card  # type: ignore[attr-defined]  # ty: ignore[unresolved-attribute]

    # --- GET handlers ---

    def do_GET(self) -> None:
        """Handle GET requests (agent card endpoint)."""
        parsed = urllib.parse.urlparse(self.path)
        if parsed.path == "/.well-known/agent-card.json":
            self._serve_agent_card()
        else:
            self.send_error(404, "Not Found")

    def _serve_agent_card(self) -> None:
        """Respond with the agent card JSON."""
        body = json.dumps(self._agent_card.to_dict(), indent=2).encode("utf-8")
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.send_header("Access-Control-Allow-Origin", "*")
        self.end_headers()
        self.wfile.write(body)

    # --- POST handlers ---

    def do_POST(self) -> None:
        """Handle POST requests (JSON-RPC endpoint)."""
        content_length = int(self.headers.get("Content-Length", 0))
        if content_length == 0:
            self._send_json_error(400, "Empty request body")
            return

        raw = self.rfile.read(content_length)
        try:
            data = json.loads(raw.decode("utf-8"))
        except json.JSONDecodeError as exc:
            self._send_jsonrpc_error(
                None,
                JSONRPCError(code=PARSE_ERROR, message=f"Parse error: {exc}"),
            )
            return

        rpc_req = JSONRPCRequest.from_dict(data)

        # Determine if the client wants SSE
        accept = self.headers.get("Accept", "")
        wants_sse = "text/event-stream" in accept

        result = self._dispatcher.dispatch(rpc_req)

        if hasattr(result, "__next__"):
            # Streaming response — result is an Iterator here
            stream: Iterator[JSONRPCResponse] = result  # type: ignore[assignment]  # ty: ignore[invalid-assignment]
            if wants_sse:
                self._send_sse_stream(stream)
            else:
                # Collect all into a list and return as a single JSON-RPC
                # response with the last item.
                last: JSONRPCResponse | None = None
                for item in stream:
                    last = item
                if last is not None:
                    self._send_jsonrpc_response(last)
                else:
                    self._send_jsonrpc_response(
                        JSONRPCResponse.success(rpc_req.id, None)
                    )
        else:
            self._send_jsonrpc_response(result)

    def _send_jsonrpc_response(self, response: JSONRPCResponse) -> None:
        """Write a single JSON-RPC response."""
        body = json.dumps(response.to_dict()).encode("utf-8")
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.send_header("Access-Control-Allow-Origin", "*")
        self.end_headers()
        self.wfile.write(body)

    def _send_jsonrpc_error(self, request_id: Any, error: JSONRPCError) -> None:
        """Write a JSON-RPC error response."""
        resp = JSONRPCResponse(id=request_id, error=error)
        self._send_jsonrpc_response(resp)

    def _send_sse_stream(self, responses: Iterator[JSONRPCResponse]) -> None:
        """Write a stream of JSON-RPC responses as SSE events."""
        self.send_response(200)
        self.send_header("Content-Type", "text/event-stream")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("Connection", "close")
        self.send_header("Access-Control-Allow-Origin", "*")
        self.end_headers()
        try:
            for resp in responses:
                frame = sse_encode(resp.to_dict())
                self.wfile.write(frame)
                self.wfile.flush()
        except (BrokenPipeError, ConnectionResetError):
            logger.debug("SSE client disconnected")

    def _send_json_error(self, status: int, message: str) -> None:
        """Send a plain JSON error (non-RPC)."""
        body = json.dumps({"error": message}).encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    # --- OPTIONS (CORS preflight) ---

    def do_OPTIONS(self) -> None:
        """Handle CORS preflight requests."""
        self.send_response(204)
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
        self.send_header(
            "Access-Control-Allow-Headers",
            "Content-Type, Authorization, Accept, A2A-Version, A2A-Extensions",
        )
        self.send_header("Access-Control-Max-Age", "86400")
        self.end_headers()

do_GET()

Handle GET requests (agent card endpoint).

Source code in a2a/a2a.py
def do_GET(self) -> None:
    """Handle GET requests (agent card endpoint)."""
    parsed = urllib.parse.urlparse(self.path)
    if parsed.path == "/.well-known/agent-card.json":
        self._serve_agent_card()
    else:
        self.send_error(404, "Not Found")

do_POST()

Handle POST requests (JSON-RPC endpoint).

Source code in a2a/a2a.py
def do_POST(self) -> None:
    """Handle POST requests (JSON-RPC endpoint)."""
    content_length = int(self.headers.get("Content-Length", 0))
    if content_length == 0:
        self._send_json_error(400, "Empty request body")
        return

    raw = self.rfile.read(content_length)
    try:
        data = json.loads(raw.decode("utf-8"))
    except json.JSONDecodeError as exc:
        self._send_jsonrpc_error(
            None,
            JSONRPCError(code=PARSE_ERROR, message=f"Parse error: {exc}"),
        )
        return

    rpc_req = JSONRPCRequest.from_dict(data)

    # Determine if the client wants SSE
    accept = self.headers.get("Accept", "")
    wants_sse = "text/event-stream" in accept

    result = self._dispatcher.dispatch(rpc_req)

    if hasattr(result, "__next__"):
        # Streaming response — result is an Iterator here
        stream: Iterator[JSONRPCResponse] = result  # type: ignore[assignment]  # ty: ignore[invalid-assignment]
        if wants_sse:
            self._send_sse_stream(stream)
        else:
            # Collect all into a list and return as a single JSON-RPC
            # response with the last item.
            last: JSONRPCResponse | None = None
            for item in stream:
                last = item
            if last is not None:
                self._send_jsonrpc_response(last)
            else:
                self._send_jsonrpc_response(
                    JSONRPCResponse.success(rpc_req.id, None)
                )
    else:
        self._send_jsonrpc_response(result)

do_OPTIONS()

Handle CORS preflight requests.

Source code in a2a/a2a.py
def do_OPTIONS(self) -> None:
    """Handle CORS preflight requests."""
    self.send_response(204)
    self.send_header("Access-Control-Allow-Origin", "*")
    self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
    self.send_header(
        "Access-Control-Allow-Headers",
        "Content-Type, Authorization, Accept, A2A-Version, A2A-Extensions",
    )
    self.send_header("Access-Control-Max-Age", "86400")
    self.end_headers()

A2AServer

A2A protocol server using the JSON-RPC binding over HTTP.

This wraps Python's http.server.HTTPServer with threading support and provides a JSONRPCDispatcher for registering method handlers.

Parameters:

Name Type Description Default
host str

Bind address (default "0.0.0.0").

'0.0.0.0'
port int

Bind port (default 8000).

8000
agent_card AgentCard | None

The AgentCard to serve.

None

Example::

card = AgentCard(name="Echo Agent", description="Echoes messages")
server = A2AServer(port=9000, agent_card=card)

@server.dispatcher.register("SendMessage")
def handle_send(params):
    req = SendMessageRequest.from_dict(params)
    text = req.message.parts[0].text or ""
    task = Task(
        status=TaskStatus(state=TaskState.COMPLETED),
        artifacts=[Artifact(parts=[Part(text=f"Echo: {text}")])],
    )
    return SendMessageResponse(task=task).to_dict()

server.start()
Source code in a2a/a2a.py
class A2AServer:
    """A2A protocol server using the JSON-RPC binding over HTTP.

    This wraps Python's ``http.server.HTTPServer`` with threading support
    and provides a ``JSONRPCDispatcher`` for registering method handlers.

    Args:
        host: Bind address (default ``"0.0.0.0"``).
        port: Bind port (default ``8000``).
        agent_card: The ``AgentCard`` to serve.

    Example::

        card = AgentCard(name="Echo Agent", description="Echoes messages")
        server = A2AServer(port=9000, agent_card=card)

        @server.dispatcher.register("SendMessage")
        def handle_send(params):
            req = SendMessageRequest.from_dict(params)
            text = req.message.parts[0].text or ""
            task = Task(
                status=TaskStatus(state=TaskState.COMPLETED),
                artifacts=[Artifact(parts=[Part(text=f"Echo: {text}")])],
            )
            return SendMessageResponse(task=task).to_dict()

        server.start()
    """

    def __init__(
        self,
        host: str = "0.0.0.0",
        port: int = 8000,
        agent_card: AgentCard | None = None,
    ):
        self.host = host
        self.port = port
        self.dispatcher = JSONRPCDispatcher()
        self.agent_card = agent_card or AgentCard()
        self._httpd: http.server.HTTPServer | None = None
        self._thread: threading.Thread | None = None

    def _create_server(self) -> http.server.HTTPServer:
        """Create and configure the HTTP server instance."""

        class _ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
            daemon_threads = True

        server = _ThreadedHTTPServer((self.host, self.port), A2ARequestHandler)
        server.dispatcher = self.dispatcher  # type: ignore[attr-defined]  # ty: ignore[unresolved-attribute]
        server.agent_card = self.agent_card  # type: ignore[attr-defined]  # ty: ignore[unresolved-attribute]
        return server

    def start(self, blocking: bool = True) -> None:
        """Start serving requests.

        Args:
            blocking: If True, block the calling thread. If False, serve in
                a background daemon thread.
        """
        self._httpd = self._create_server()
        logger.info("A2A server starting on %s:%d", self.host, self.port)
        if blocking:
            try:
                self._httpd.serve_forever()
            except KeyboardInterrupt:
                self.stop()
        else:
            self._thread = threading.Thread(
                target=self._httpd.serve_forever, daemon=True
            )
            self._thread.start()

    def stop(self) -> None:
        """Shut down the server."""
        if self._httpd:
            self._httpd.shutdown()
            self._httpd.server_close()
            logger.info("A2A server stopped")

    @property
    def url(self) -> str:
        """Return the base URL of the running server."""
        return f"http://{self.host}:{self.port}"

url property

Return the base URL of the running server.

start(blocking=True)

Start serving requests.

Parameters:

Name Type Description Default
blocking bool

If True, block the calling thread. If False, serve in a background daemon thread.

True
Source code in a2a/a2a.py
def start(self, blocking: bool = True) -> None:
    """Start serving requests.

    Args:
        blocking: If True, block the calling thread. If False, serve in
            a background daemon thread.
    """
    self._httpd = self._create_server()
    logger.info("A2A server starting on %s:%d", self.host, self.port)
    if blocking:
        try:
            self._httpd.serve_forever()
        except KeyboardInterrupt:
            self.stop()
    else:
        self._thread = threading.Thread(
            target=self._httpd.serve_forever, daemon=True
        )
        self._thread.start()

stop()

Shut down the server.

Source code in a2a/a2a.py
def stop(self) -> None:
    """Shut down the server."""
    if self._httpd:
        self._httpd.shutdown()
        self._httpd.server_close()
        logger.info("A2A server stopped")

TaskStore

Thread-safe in-memory task store backed by a dictionary.

Provides CRUD operations for Task objects keyed by their id.

Example::

store = TaskStore()
task = Task(status=TaskStatus(state=TaskState.SUBMITTED))
store.save(task)
retrieved = store.get(task.id)
Source code in a2a/a2a.py
class TaskStore:
    """Thread-safe in-memory task store backed by a dictionary.

    Provides CRUD operations for ``Task`` objects keyed by their ``id``.

    Example::

        store = TaskStore()
        task = Task(status=TaskStatus(state=TaskState.SUBMITTED))
        store.save(task)
        retrieved = store.get(task.id)
    """

    def __init__(self) -> None:
        self._tasks: dict[str, Task] = {}
        self._lock = threading.Lock()

    def save(self, task: Task) -> None:
        """Save or update a task in the store.

        Args:
            task: The task to save.
        """
        with self._lock:
            self._tasks[task.id] = copy.deepcopy(task)

    def get(self, task_id: str) -> Task | None:
        """Retrieve a task by ID.

        Args:
            task_id: The task identifier.

        Returns:
            A deep copy of the task, or ``None`` if not found.
        """
        with self._lock:
            task = self._tasks.get(task_id)
            return copy.deepcopy(task) if task else None

    def delete(self, task_id: str) -> bool:
        """Remove a task from the store.

        Args:
            task_id: The task identifier.

        Returns:
            True if the task was deleted, False if it was not found.
        """
        with self._lock:
            return self._tasks.pop(task_id, None) is not None

    def list_tasks(
        self,
        *,
        context_id: str | None = None,
        status: TaskState | None = None,
        page_size: int = 50,
        page_token: str | None = None,
    ) -> tuple[list[Task], str, int]:
        """List tasks with optional filtering and pagination.

        Args:
            context_id: Filter by context ID.
            status: Filter by task state.
            page_size: Maximum number of tasks to return.
            page_token: Opaque cursor (task ID) for pagination.

        Returns:
            Tuple of (tasks, next_page_token, total_count).
        """
        with self._lock:
            all_tasks = list(self._tasks.values())

        # Filter
        if context_id:
            all_tasks = [t for t in all_tasks if t.context_id == context_id]
        if status:
            all_tasks = [t for t in all_tasks if t.status.state == status]

        # Sort by timestamp descending
        all_tasks.sort(
            key=lambda t: t.status.timestamp or "",
            reverse=True,
        )

        total = len(all_tasks)

        # Pagination
        start_idx = 0
        if page_token:
            for i, t in enumerate(all_tasks):
                if t.id == page_token:
                    start_idx = i + 1
                    break

        page = all_tasks[start_idx : start_idx + page_size]
        next_token = ""
        if start_idx + page_size < total:
            next_token = page[-1].id if page else ""

        return [copy.deepcopy(t) for t in page], next_token, total

save(task)

Save or update a task in the store.

Parameters:

Name Type Description Default
task Task

The task to save.

required
Source code in a2a/a2a.py
def save(self, task: Task) -> None:
    """Save or update a task in the store.

    Args:
        task: The task to save.
    """
    with self._lock:
        self._tasks[task.id] = copy.deepcopy(task)

get(task_id)

Retrieve a task by ID.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required

Returns:

Type Description
Task | None

A deep copy of the task, or None if not found.

Source code in a2a/a2a.py
def get(self, task_id: str) -> Task | None:
    """Retrieve a task by ID.

    Args:
        task_id: The task identifier.

    Returns:
        A deep copy of the task, or ``None`` if not found.
    """
    with self._lock:
        task = self._tasks.get(task_id)
        return copy.deepcopy(task) if task else None

delete(task_id)

Remove a task from the store.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required

Returns:

Type Description
bool

True if the task was deleted, False if it was not found.

Source code in a2a/a2a.py
def delete(self, task_id: str) -> bool:
    """Remove a task from the store.

    Args:
        task_id: The task identifier.

    Returns:
        True if the task was deleted, False if it was not found.
    """
    with self._lock:
        return self._tasks.pop(task_id, None) is not None

list_tasks(*, context_id=None, status=None, page_size=50, page_token=None)

List tasks with optional filtering and pagination.

Parameters:

Name Type Description Default
context_id str | None

Filter by context ID.

None
status TaskState | None

Filter by task state.

None
page_size int

Maximum number of tasks to return.

50
page_token str | None

Opaque cursor (task ID) for pagination.

None

Returns:

Type Description
tuple[list[Task], str, int]

Tuple of (tasks, next_page_token, total_count).

Source code in a2a/a2a.py
def list_tasks(
    self,
    *,
    context_id: str | None = None,
    status: TaskState | None = None,
    page_size: int = 50,
    page_token: str | None = None,
) -> tuple[list[Task], str, int]:
    """List tasks with optional filtering and pagination.

    Args:
        context_id: Filter by context ID.
        status: Filter by task state.
        page_size: Maximum number of tasks to return.
        page_token: Opaque cursor (task ID) for pagination.

    Returns:
        Tuple of (tasks, next_page_token, total_count).
    """
    with self._lock:
        all_tasks = list(self._tasks.values())

    # Filter
    if context_id:
        all_tasks = [t for t in all_tasks if t.context_id == context_id]
    if status:
        all_tasks = [t for t in all_tasks if t.status.state == status]

    # Sort by timestamp descending
    all_tasks.sort(
        key=lambda t: t.status.timestamp or "",
        reverse=True,
    )

    total = len(all_tasks)

    # Pagination
    start_idx = 0
    if page_token:
        for i, t in enumerate(all_tasks):
            if t.id == page_token:
                start_idx = i + 1
                break

    page = all_tasks[start_idx : start_idx + page_size]
    next_token = ""
    if start_idx + page_size < total:
        next_token = page[-1].id if page else ""

    return [copy.deepcopy(t) for t in page], next_token, total

TaskManager

High-level task lifecycle manager built on top of TaskStore.

Manages task creation, state transitions, artifact generation, and provides event callbacks for streaming.

Parameters:

Name Type Description Default
store TaskStore | None

The TaskStore to use. Creates one if not provided.

None

Example::

manager = TaskManager()
task = manager.create_task(message)
manager.update_status(task.id, TaskState.WORKING)
manager.add_artifact(task.id, Artifact(parts=[Part(text="result")]))
manager.update_status(task.id, TaskState.COMPLETED)
Source code in a2a/a2a.py
class TaskManager:
    """High-level task lifecycle manager built on top of ``TaskStore``.

    Manages task creation, state transitions, artifact generation, and
    provides event callbacks for streaming.

    Args:
        store: The ``TaskStore`` to use. Creates one if not provided.

    Example::

        manager = TaskManager()
        task = manager.create_task(message)
        manager.update_status(task.id, TaskState.WORKING)
        manager.add_artifact(task.id, Artifact(parts=[Part(text="result")]))
        manager.update_status(task.id, TaskState.COMPLETED)
    """

    # Valid state transitions (from -> set of allowed to-states)
    _TRANSITIONS: dict[TaskState, set] = {
        TaskState.UNSPECIFIED: {TaskState.SUBMITTED},
        TaskState.SUBMITTED: {
            TaskState.WORKING,
            TaskState.COMPLETED,
            TaskState.FAILED,
            TaskState.CANCELED,
            TaskState.REJECTED,
        },
        TaskState.WORKING: {
            TaskState.COMPLETED,
            TaskState.FAILED,
            TaskState.CANCELED,
            TaskState.INPUT_REQUIRED,
            TaskState.AUTH_REQUIRED,
        },
        TaskState.INPUT_REQUIRED: {
            TaskState.WORKING,
            TaskState.COMPLETED,
            TaskState.FAILED,
            TaskState.CANCELED,
        },
        TaskState.AUTH_REQUIRED: {
            TaskState.WORKING,
            TaskState.COMPLETED,
            TaskState.FAILED,
            TaskState.CANCELED,
        },
        # Terminal states allow no transitions
        TaskState.COMPLETED: set(),
        TaskState.FAILED: set(),
        TaskState.CANCELED: set(),
        TaskState.REJECTED: set(),
    }

    def __init__(self, store: TaskStore | None = None):
        self.store = store or TaskStore()
        self._listeners: dict[str, list[Callable[[StreamResponse], None]]] = {}
        self._lock = threading.Lock()

    def create_task(
        self,
        message: Message,
        *,
        context_id: str | None = None,
    ) -> Task:
        """Create a new task from an incoming message.

        Args:
            message: The initiating message.
            context_id: Optional context to associate the task with.

        Returns:
            The newly created ``Task`` in SUBMITTED state.
        """
        task = Task(
            id=str(uuid.uuid4()),
            context_id=context_id or str(uuid.uuid4()),
            status=TaskStatus(state=TaskState.SUBMITTED),
            history=[message],
        )
        self.store.save(task)
        return task

    def get_task(self, task_id: str) -> Task:
        """Retrieve a task, raising ``TaskNotFoundError`` if absent.

        Args:
            task_id: The task identifier.

        Returns:
            The ``Task`` object.

        Raises:
            TaskNotFoundError: If the task does not exist.
        """
        task = self.store.get(task_id)
        if task is None:
            raise TaskNotFoundError(f"Task {task_id!r} not found")
        return task

    def update_status(
        self,
        task_id: str,
        state: TaskState,
        *,
        status_message: Message | None = None,
    ) -> Task:
        """Transition a task to a new state.

        Args:
            task_id: The task identifier.
            state: The target state.
            status_message: Optional message to attach to the status.

        Returns:
            The updated ``Task``.

        Raises:
            TaskNotFoundError: If the task does not exist.
            ValueError: If the state transition is not valid.
        """
        task = self.get_task(task_id)
        current = task.status.state
        allowed = self._TRANSITIONS.get(current, set())
        if state not in allowed:
            raise ValueError(
                f"Invalid transition from {current.value} to {state.value}"
            )
        task.status = TaskStatus(
            state=state, message=status_message, timestamp=_now_iso()
        )
        self.store.save(task)

        # Notify listeners
        event = StreamResponse(
            status_update=TaskStatusUpdateEvent(
                task_id=task.id,
                context_id=task.context_id or "",
                status=task.status,
            )
        )
        self._notify(task_id, event)
        return task

    def add_artifact(
        self,
        task_id: str,
        artifact: Artifact,
        *,
        append: bool = False,
        last_chunk: bool = True,
    ) -> Task:
        """Add an artifact to a task.

        Args:
            task_id: The task identifier.
            artifact: The artifact to add.
            append: If True, append to an existing artifact with the same ID.
            last_chunk: If True, this is the final chunk of the artifact.

        Returns:
            The updated ``Task``.

        Raises:
            TaskNotFoundError: If the task does not exist.
        """
        task = self.get_task(task_id)
        if task.artifacts is None:
            task.artifacts = []

        if append:
            # Find and extend existing artifact
            for existing in task.artifacts:
                if existing.artifact_id == artifact.artifact_id:
                    existing.parts.extend(artifact.parts)
                    break
            else:
                task.artifacts.append(artifact)
        else:
            task.artifacts.append(artifact)

        self.store.save(task)

        # Notify listeners
        event = StreamResponse(
            artifact_update=TaskArtifactUpdateEvent(
                task_id=task.id,
                context_id=task.context_id or "",
                artifact=artifact,
                append=append,
                last_chunk=last_chunk,
            )
        )
        self._notify(task_id, event)
        return task

    def cancel_task(self, task_id: str) -> Task:
        """Cancel a task.

        Args:
            task_id: The task identifier.

        Returns:
            The updated ``Task`` with CANCELED state.

        Raises:
            TaskNotFoundError: If the task does not exist.
            TaskNotCancelableError: If the task is in a terminal state.
        """
        task = self.get_task(task_id)
        if task.status.state.is_terminal():
            raise TaskNotCancelableError(
                f"Task {task_id!r} is in terminal state {task.status.state.value}"
            )
        return self.update_status(task_id, TaskState.CANCELED)

    def subscribe(
        self, task_id: str, callback: Callable[[StreamResponse], None]
    ) -> Callable[[], None]:
        """Subscribe to streaming events for a task.

        Args:
            task_id: The task identifier.
            callback: Function called with each ``StreamResponse`` event.

        Returns:
            An unsubscribe function that removes the callback.
        """
        with self._lock:
            if task_id not in self._listeners:
                self._listeners[task_id] = []
            self._listeners[task_id].append(callback)

        def unsubscribe() -> None:
            with self._lock:
                listeners = self._listeners.get(task_id, [])
                if callback in listeners:
                    listeners.remove(callback)

        return unsubscribe

    def _notify(self, task_id: str, event: StreamResponse) -> None:
        """Notify all subscribers of an event for a task."""
        with self._lock:
            listeners = list(self._listeners.get(task_id, []))
        for cb in listeners:
            try:
                cb(event)
            except Exception:
                logger.exception("Error in task listener for %s", task_id)

create_task(message, *, context_id=None)

Create a new task from an incoming message.

Parameters:

Name Type Description Default
message Message

The initiating message.

required
context_id str | None

Optional context to associate the task with.

None

Returns:

Type Description
Task

The newly created Task in SUBMITTED state.

Source code in a2a/a2a.py
def create_task(
    self,
    message: Message,
    *,
    context_id: str | None = None,
) -> Task:
    """Create a new task from an incoming message.

    Args:
        message: The initiating message.
        context_id: Optional context to associate the task with.

    Returns:
        The newly created ``Task`` in SUBMITTED state.
    """
    task = Task(
        id=str(uuid.uuid4()),
        context_id=context_id or str(uuid.uuid4()),
        status=TaskStatus(state=TaskState.SUBMITTED),
        history=[message],
    )
    self.store.save(task)
    return task

get_task(task_id)

Retrieve a task, raising TaskNotFoundError if absent.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required

Returns:

Type Description
Task

The Task object.

Raises:

Type Description
TaskNotFoundError

If the task does not exist.

Source code in a2a/a2a.py
def get_task(self, task_id: str) -> Task:
    """Retrieve a task, raising ``TaskNotFoundError`` if absent.

    Args:
        task_id: The task identifier.

    Returns:
        The ``Task`` object.

    Raises:
        TaskNotFoundError: If the task does not exist.
    """
    task = self.store.get(task_id)
    if task is None:
        raise TaskNotFoundError(f"Task {task_id!r} not found")
    return task

update_status(task_id, state, *, status_message=None)

Transition a task to a new state.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required
state TaskState

The target state.

required
status_message Message | None

Optional message to attach to the status.

None

Returns:

Type Description
Task

The updated Task.

Raises:

Type Description
TaskNotFoundError

If the task does not exist.

ValueError

If the state transition is not valid.

Source code in a2a/a2a.py
def update_status(
    self,
    task_id: str,
    state: TaskState,
    *,
    status_message: Message | None = None,
) -> Task:
    """Transition a task to a new state.

    Args:
        task_id: The task identifier.
        state: The target state.
        status_message: Optional message to attach to the status.

    Returns:
        The updated ``Task``.

    Raises:
        TaskNotFoundError: If the task does not exist.
        ValueError: If the state transition is not valid.
    """
    task = self.get_task(task_id)
    current = task.status.state
    allowed = self._TRANSITIONS.get(current, set())
    if state not in allowed:
        raise ValueError(
            f"Invalid transition from {current.value} to {state.value}"
        )
    task.status = TaskStatus(
        state=state, message=status_message, timestamp=_now_iso()
    )
    self.store.save(task)

    # Notify listeners
    event = StreamResponse(
        status_update=TaskStatusUpdateEvent(
            task_id=task.id,
            context_id=task.context_id or "",
            status=task.status,
        )
    )
    self._notify(task_id, event)
    return task

add_artifact(task_id, artifact, *, append=False, last_chunk=True)

Add an artifact to a task.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required
artifact Artifact

The artifact to add.

required
append bool

If True, append to an existing artifact with the same ID.

False
last_chunk bool

If True, this is the final chunk of the artifact.

True

Returns:

Type Description
Task

The updated Task.

Raises:

Type Description
TaskNotFoundError

If the task does not exist.

Source code in a2a/a2a.py
def add_artifact(
    self,
    task_id: str,
    artifact: Artifact,
    *,
    append: bool = False,
    last_chunk: bool = True,
) -> Task:
    """Add an artifact to a task.

    Args:
        task_id: The task identifier.
        artifact: The artifact to add.
        append: If True, append to an existing artifact with the same ID.
        last_chunk: If True, this is the final chunk of the artifact.

    Returns:
        The updated ``Task``.

    Raises:
        TaskNotFoundError: If the task does not exist.
    """
    task = self.get_task(task_id)
    if task.artifacts is None:
        task.artifacts = []

    if append:
        # Find and extend existing artifact
        for existing in task.artifacts:
            if existing.artifact_id == artifact.artifact_id:
                existing.parts.extend(artifact.parts)
                break
        else:
            task.artifacts.append(artifact)
    else:
        task.artifacts.append(artifact)

    self.store.save(task)

    # Notify listeners
    event = StreamResponse(
        artifact_update=TaskArtifactUpdateEvent(
            task_id=task.id,
            context_id=task.context_id or "",
            artifact=artifact,
            append=append,
            last_chunk=last_chunk,
        )
    )
    self._notify(task_id, event)
    return task

cancel_task(task_id)

Cancel a task.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required

Returns:

Type Description
Task

The updated Task with CANCELED state.

Raises:

Type Description
TaskNotFoundError

If the task does not exist.

TaskNotCancelableError

If the task is in a terminal state.

Source code in a2a/a2a.py
def cancel_task(self, task_id: str) -> Task:
    """Cancel a task.

    Args:
        task_id: The task identifier.

    Returns:
        The updated ``Task`` with CANCELED state.

    Raises:
        TaskNotFoundError: If the task does not exist.
        TaskNotCancelableError: If the task is in a terminal state.
    """
    task = self.get_task(task_id)
    if task.status.state.is_terminal():
        raise TaskNotCancelableError(
            f"Task {task_id!r} is in terminal state {task.status.state.value}"
        )
    return self.update_status(task_id, TaskState.CANCELED)

subscribe(task_id, callback)

Subscribe to streaming events for a task.

Parameters:

Name Type Description Default
task_id str

The task identifier.

required
callback Callable[[StreamResponse], None]

Function called with each StreamResponse event.

required

Returns:

Type Description
Callable[[], None]

An unsubscribe function that removes the callback.

Source code in a2a/a2a.py
def subscribe(
    self, task_id: str, callback: Callable[[StreamResponse], None]
) -> Callable[[], None]:
    """Subscribe to streaming events for a task.

    Args:
        task_id: The task identifier.
        callback: Function called with each ``StreamResponse`` event.

    Returns:
        An unsubscribe function that removes the callback.
    """
    with self._lock:
        if task_id not in self._listeners:
            self._listeners[task_id] = []
        self._listeners[task_id].append(callback)

    def unsubscribe() -> None:
        with self._lock:
            listeners = self._listeners.get(task_id, [])
            if callback in listeners:
                listeners.remove(callback)

    return unsubscribe

sse_encode(data)

Encode data as a single SSE data: frame.

Parameters:

Name Type Description Default
data Any

JSON-serializable object.

required

Returns:

Type Description
bytes

UTF-8 encoded SSE frame bytes (data: ...\n\n).

Source code in a2a/a2a.py
def sse_encode(data: Any) -> bytes:
    """Encode *data* as a single SSE ``data:`` frame.

    Args:
        data: JSON-serializable object.

    Returns:
        UTF-8 encoded SSE frame bytes (``data: ...\\n\\n``).
    """
    payload = json.dumps(data, separators=(",", ":"))
    return f"data: {payload}\n\n".encode("utf-8")

sse_decode_stream(response)

Parse an SSE stream from an http.client.HTTPResponse.

Reads lines from the response, extracts data: fields, and yields parsed JSON objects. Handles chunked transfer encoding transparently because http.client decodes it for us.

Parameters:

Name Type Description Default
response HTTPResponse

An open HTTP response with Content-Type: text/event-stream.

required

Yields:

Type Description
dict[str, Any]

Parsed JSON dictionaries for each SSE data: event.

Source code in a2a/a2a.py
def sse_decode_stream(
    response: http.client.HTTPResponse,
) -> Iterator[dict[str, Any]]:
    """Parse an SSE stream from an ``http.client.HTTPResponse``.

    Reads lines from the response, extracts ``data:`` fields, and yields
    parsed JSON objects.  Handles chunked transfer encoding transparently
    because ``http.client`` decodes it for us.

    Args:
        response: An open HTTP response with ``Content-Type: text/event-stream``.

    Yields:
        Parsed JSON dictionaries for each SSE ``data:`` event.
    """
    buf = b""
    while True:
        chunk = response.read(4096)
        if not chunk:
            break
        buf += chunk
        while b"\n" in buf:
            line_bytes, buf = buf.split(b"\n", 1)
            line = line_bytes.decode("utf-8", errors="replace").rstrip("\r")
            if line.startswith("data: "):
                data_str = line[6:]
                if data_str.strip():
                    try:
                        yield json.loads(data_str)
                    except json.JSONDecodeError:
                        logger.warning("Failed to parse SSE data: %s", data_str)
            # Ignore comments (lines starting with ':'), event/id/retry fields.
    # Handle any remaining data in buffer
    if buf:
        line = buf.decode("utf-8", errors="replace").rstrip("\r\n")
        if line.startswith("data: ") and line[6:].strip():
            try:
                yield json.loads(line[6:])
            except json.JSONDecodeError:
                pass

handle_send_message(params)

Handle a SendMessage request by echoing the input.

Source code in a2a/a2a.py
@server.dispatcher.register("SendMessage")
def handle_send_message(params: dict[str, Any]) -> dict[str, Any]:
    """Handle a SendMessage request by echoing the input."""
    req = SendMessageRequest.from_dict(params)
    text = ""
    for part in req.message.parts:
        if part.text:
            text += part.text

    task = task_manager.create_task(req.message, context_id=req.message.context_id)
    task_manager.update_status(task.id, TaskState.WORKING)
    task_manager.add_artifact(
        task.id,
        Artifact(
            name="echo-response",
            parts=[Part(text=f"Echo: {text}")],
        ),
    )
    task = task_manager.update_status(task.id, TaskState.COMPLETED)
    return SendMessageResponse(task=task).to_dict()

handle_stream_message(params)

Handle a SendStreamingMessage by streaming token-by-token.

Source code in a2a/a2a.py
@server.dispatcher.register("SendStreamingMessage")
def handle_stream_message(
    params: dict[str, Any],
) -> Iterator[dict[str, Any]]:
    """Handle a SendStreamingMessage by streaming token-by-token."""
    req = SendMessageRequest.from_dict(params)
    text = ""
    for part in req.message.parts:
        if part.text:
            text += part.text

    task = task_manager.create_task(req.message, context_id=req.message.context_id)
    # Yield initial task
    yield StreamResponse(task=task).to_dict()

    task_manager.update_status(task.id, TaskState.WORKING)
    yield StreamResponse(
        status_update=TaskStatusUpdateEvent(
            task_id=task.id,
            context_id=task.context_id or "",
            status=TaskStatus(state=TaskState.WORKING),
        )
    ).to_dict()

    # Yield artifact
    artifact = Artifact(
        name="echo-response",
        parts=[Part(text=f"Echo: {text}")],
    )
    task_manager.add_artifact(task.id, artifact)
    yield StreamResponse(
        artifact_update=TaskArtifactUpdateEvent(
            task_id=task.id,
            context_id=task.context_id or "",
            artifact=artifact,
            last_chunk=True,
        )
    ).to_dict()

    # Complete
    task_manager.update_status(task.id, TaskState.COMPLETED)
    yield StreamResponse(
        status_update=TaskStatusUpdateEvent(
            task_id=task.id,
            context_id=task.context_id or "",
            status=TaskStatus(state=TaskState.COMPLETED),
        )
    ).to_dict()

handle_get_task(params)

Handle a GetTask request.

Source code in a2a/a2a.py
@server.dispatcher.register("GetTask")
def handle_get_task(params: dict[str, Any]) -> dict[str, Any]:
    """Handle a GetTask request."""
    task_id = params.get("id", "")
    task = task_manager.get_task(task_id)
    return task.to_dict()

handle_cancel_task(params)

Handle a CancelTask request.

Source code in a2a/a2a.py
@server.dispatcher.register("CancelTask")
def handle_cancel_task(params: dict[str, Any]) -> dict[str, Any]:
    """Handle a CancelTask request."""
    task_id = params.get("id", "")
    task = task_manager.cancel_task(task_id)
    return task.to_dict()