Skip to content

SSE API Reference

Auto-generated API documentation for the SSE (Server-Sent Events) module.

sse

Zero-dependency SSE (Server-Sent Events) client.

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

Provides three abstraction layers:

  1. Low-level parser (EventSource / AsyncEventSource): Parse any Iterable[str] or AsyncIterable[str] of lines into SSEEvent objects. No network dependency.

  2. High-level client (SSEClient / AsyncSSEClient): Open a streaming HTTP GET, parse SSE events, and auto-reconnect on connection loss. Requires sibling httpclient module.

  3. Convenience functions (connect / async_connect): Shorthand for creating SSEClient / AsyncSSEClient.

Usage::

# High-level (auto-connect + reconnect)
from sse import connect, async_connect

with connect("https://api.example.com/events") as events:
    for event in events:
        print(event.event, event.data)

async with async_connect("https://api.example.com/events") as events:
    async for event in events:
        print(event.data)

# Low-level (parse from any line source)
from sse import EventSource

for event in EventSource(["data: hello", "", "data: world", ""]):
    print(event.data)  # "hello", then "world"

Requires Python 3.10+.

SSEEvent dataclass

A single Server-Sent Event.

Attributes:

Name Type Description
event str

Event type (default "message").

data str

Event payload. Multiple data: lines are joined with \n.

id str

Last event ID. Persists across events until changed by the server.

retry int | None

Reconnection interval in milliseconds, or None.

Source code in sse/sse.py
@dataclasses.dataclass(frozen=True, slots=True)
class SSEEvent:
    """A single Server-Sent Event.

    Attributes:
        event: Event type (default ``"message"``).
        data: Event payload. Multiple ``data:`` lines are joined with ``\\n``.
        id: Last event ID. Persists across events until changed by the server.
        retry: Reconnection interval in milliseconds, or ``None``.
    """

    event: str = "message"
    data: str = ""
    id: str = ""
    retry: int | None = None

    def __repr__(self) -> str:
        d = self.data[:50] + "..." if len(self.data) > 50 else self.data
        return f"<SSEEvent event={self.event!r} data={d!r} id={self.id!r}>"

EventSource

Sync SSE parser wrapping any line iterable.

Example::

lines = ["event: greeting", "data: hello", "", "data: world", ""]
for event in EventSource(lines):
    print(event.event, event.data)
Source code in sse/sse.py
class EventSource:
    """Sync SSE parser wrapping any line iterable.

    Example::

        lines = ["event: greeting", "data: hello", "", "data: world", ""]
        for event in EventSource(lines):
            print(event.event, event.data)
    """

    def __init__(self, lines: Iterable[str]) -> None:
        self._lines = lines

    def __iter__(self) -> Iterator[SSEEvent]:
        parser = _SSEParser()
        for line in self._lines:
            event = parser.feed_line(line)
            if event is not None:
                yield event

AsyncEventSource

Async SSE parser wrapping any async line iterable.

Example::

async for event in AsyncEventSource(async_line_source):
    print(event.data)
Source code in sse/sse.py
class AsyncEventSource:
    """Async SSE parser wrapping any async line iterable.

    Example::

        async for event in AsyncEventSource(async_line_source):
            print(event.data)
    """

    def __init__(self, lines: AsyncIterable[str]) -> None:
        self._lines = lines

    async def __aiter__(self) -> AsyncIterator[SSEEvent]:
        parser = _SSEParser()
        async for line in self._lines:
            event = parser.feed_line(line)
            if event is not None:
                yield event

SSEError

Bases: Exception

Base exception for SSE errors.

Source code in sse/sse.py
class SSEError(Exception):
    """Base exception for SSE errors."""

SSEConnectionError

Bases: SSEError

Raised when max retries exhausted.

Source code in sse/sse.py
class SSEConnectionError(SSEError):
    """Raised when max retries exhausted."""

    def __init__(
        self, url: str, retries: int, last_error: Exception | None = None
    ) -> None:
        self.url = url
        self.retries = retries
        self.last_error = last_error
        super().__init__(
            f"SSE connection to {url} failed after {retries} retries"
            + (f": {last_error}" if last_error else "")
        )

SSEHTTPError

Bases: SSEError

Raised on non-2xx HTTP response (other than 204).

Source code in sse/sse.py
class SSEHTTPError(SSEError):
    """Raised on non-2xx HTTP response (other than 204)."""

    def __init__(self, status_code: int, url: str) -> None:
        self.status_code = status_code
        self.url = url
        super().__init__(f"SSE request to {url} returned HTTP {status_code}")

SSEClient

Bases: _SSEClientMixin

Synchronous SSE client with auto-reconnection.

Opens a streaming HTTP GET, parses text/event-stream, and automatically reconnects when the connection drops.

Parameters:

Name Type Description Default
url str

SSE endpoint URL.

required
headers dict[str, str] | None

Extra HTTP headers to send.

None
timeout float

Connection/read timeout in seconds.

DEFAULT_TIMEOUT
retry_interval int

Initial reconnection delay in milliseconds.

DEFAULT_RETRY_INTERVAL
max_retries int

Maximum reconnection attempts (-1 = unlimited).

-1
verify bool

Whether to verify TLS certificates.

True
last_event_id str

Initial Last-Event-ID header value.

''
transport Callable[..., Any] | None | _Unset

Sync HTTP GET callable. Defaults to _UNSET (auto-discover sibling httpclient.get). Must accept (url, *, headers, stream, timeout, verify) and return a response with .status_code, .ok, .close(), and .iter_lines() attributes.

_UNSET

Example::

with SSEClient("https://api.example.com/events") as client:
    for event in client:
        print(event.data)
Source code in sse/sse.py
class SSEClient(_SSEClientMixin):
    """Synchronous SSE client with auto-reconnection.

    Opens a streaming HTTP GET, parses ``text/event-stream``, and
    automatically reconnects when the connection drops.

    Args:
        url: SSE endpoint URL.
        headers: Extra HTTP headers to send.
        timeout: Connection/read timeout in seconds.
        retry_interval: Initial reconnection delay in milliseconds.
        max_retries: Maximum reconnection attempts (``-1`` = unlimited).
        verify: Whether to verify TLS certificates.
        last_event_id: Initial ``Last-Event-ID`` header value.
        transport: Sync HTTP GET callable. Defaults to ``_UNSET``
            (auto-discover sibling ``httpclient.get``). Must accept
            ``(url, *, headers, stream, timeout, verify)`` and return
            a response with ``.status_code``, ``.ok``, ``.close()``,
            and ``.iter_lines()`` attributes.

    Example::

        with SSEClient("https://api.example.com/events") as client:
            for event in client:
                print(event.data)
    """

    def __init__(
        self,
        url: str,
        *,
        headers: dict[str, str] | None = None,
        timeout: float = DEFAULT_TIMEOUT,
        retry_interval: int = DEFAULT_RETRY_INTERVAL,
        max_retries: int = -1,
        verify: bool = True,
        last_event_id: str = "",
        transport: Callable[..., Any] | None | _Unset = _UNSET,
    ) -> None:
        self._transport: Callable[..., Any]
        if isinstance(transport, _Unset):
            _require_httpclient()
            self._transport = _http_get
            self._reconnect_errors: tuple[type[Exception], ...] = (
                _HttpConnectionError,
                _HttpTimeoutError,
                ConnectionError,
                OSError,
            )
        elif transport is None:
            raise ValueError(
                "SSEClient requires a transport; pass a callable "
                "or omit to use sibling httpclient"
            )
        else:
            self._transport = transport
            self._reconnect_errors = (ConnectionError, OSError)
        self._url = url
        self._user_headers = headers or {}
        self._timeout = timeout
        self._retry_interval = retry_interval
        self._max_retries = max_retries
        self._verify = verify
        self._last_event_id = last_event_id
        self._response: Any = None
        self._closed = False

    def __enter__(self) -> SSEClient:
        return self

    def __exit__(self, *args: Any) -> None:
        self.close()

    def __iter__(self) -> Iterator[SSEEvent]:
        retries = 0
        last_error: Exception | None = None

        while not self._closed:
            try:
                self._response = self._connect()
                parser = self._init_parser()

                for line in self._response.iter_lines():
                    if self._closed:
                        return
                    event = parser.feed_line(line)
                    if event is not None:
                        self._handle_event(event)
                        retries = 0
                        yield event

                # Stream ended normally β€” attempt reconnect
                if self._closed:
                    return

            except self._reconnect_errors as exc:
                last_error = exc
            finally:
                self._close_response()

            retries += 1
            self._check_reconnect(retries, last_error)
            time.sleep(self._retry_interval / 1000)

    def _connect(self) -> Any:
        """Open a streaming GET request."""
        headers = {
            "Accept": "text/event-stream",
            "Cache-Control": "no-cache",
            **self._user_headers,
        }
        if self._last_event_id:
            headers["Last-Event-ID"] = self._last_event_id

        resp = self._transport(
            self._url,
            headers=headers,
            stream=True,
            timeout=self._timeout,
            verify=self._verify,
        )

        if resp.status_code == 204:
            resp.close()
            self._closed = True
            return resp

        if not resp.ok:
            status = resp.status_code
            resp.close()
            raise SSEHTTPError(status, self._url)

        return resp

    def _close_response(self) -> None:
        if self._response is not None:
            # Tier 3: best-effort silent β€” reconnect cleanup
            try:
                self._response.close()
            except Exception:
                pass
            self._response = None

    def close(self) -> None:
        """Close the SSE connection."""
        self._closed = True
        self._close_response()

close()

Close the SSE connection.

Source code in sse/sse.py
def close(self) -> None:
    """Close the SSE connection."""
    self._closed = True
    self._close_response()

AsyncSSEClient

Bases: _SSEClientMixin

Asynchronous SSE client with auto-reconnection.

Opens a streaming HTTP GET, parses text/event-stream, and automatically reconnects when the connection drops.

Parameters:

Name Type Description Default
url str

SSE endpoint URL.

required
headers dict[str, str] | None

Extra HTTP headers to send.

None
timeout float

Connection/read timeout in seconds.

DEFAULT_TIMEOUT
retry_interval int

Initial reconnection delay in milliseconds.

DEFAULT_RETRY_INTERVAL
max_retries int

Maximum reconnection attempts (-1 = unlimited).

-1
verify bool

Whether to verify TLS certificates.

True
last_event_id str

Initial Last-Event-ID header value.

''
transport Callable[..., Any] | None | _Unset

Async HTTP GET callable. Defaults to _UNSET (auto-discover sibling httpclient.async_get). Must accept (url, *, headers, stream, timeout, verify) and return a response with .status_code, .ok, .aclose(), and .aiter_lines() attributes.

_UNSET

Example::

async with AsyncSSEClient("https://api.example.com/events") as client:
    async for event in client:
        print(event.data)
Source code in sse/sse.py
class AsyncSSEClient(_SSEClientMixin):
    """Asynchronous SSE client with auto-reconnection.

    Opens a streaming HTTP GET, parses ``text/event-stream``, and
    automatically reconnects when the connection drops.

    Args:
        url: SSE endpoint URL.
        headers: Extra HTTP headers to send.
        timeout: Connection/read timeout in seconds.
        retry_interval: Initial reconnection delay in milliseconds.
        max_retries: Maximum reconnection attempts (``-1`` = unlimited).
        verify: Whether to verify TLS certificates.
        last_event_id: Initial ``Last-Event-ID`` header value.
        transport: Async HTTP GET callable. Defaults to ``_UNSET``
            (auto-discover sibling ``httpclient.async_get``). Must accept
            ``(url, *, headers, stream, timeout, verify)`` and return
            a response with ``.status_code``, ``.ok``, ``.aclose()``,
            and ``.aiter_lines()`` attributes.

    Example::

        async with AsyncSSEClient("https://api.example.com/events") as client:
            async for event in client:
                print(event.data)
    """

    def __init__(
        self,
        url: str,
        *,
        headers: dict[str, str] | None = None,
        timeout: float = DEFAULT_TIMEOUT,
        retry_interval: int = DEFAULT_RETRY_INTERVAL,
        max_retries: int = -1,
        verify: bool = True,
        last_event_id: str = "",
        transport: Callable[..., Any] | None | _Unset = _UNSET,
    ) -> None:
        self._transport: Callable[..., Any]
        if isinstance(transport, _Unset):
            _require_httpclient()
            self._transport = _http_async_get
            self._reconnect_errors: tuple[type[Exception], ...] = (
                _HttpConnectionError,
                _HttpTimeoutError,
                ConnectionError,
                OSError,
            )
        elif transport is None:
            raise ValueError(
                "AsyncSSEClient requires a transport; pass a callable "
                "or omit to use sibling httpclient"
            )
        else:
            self._transport = transport
            self._reconnect_errors = (ConnectionError, OSError)
        self._url = url
        self._user_headers = headers or {}
        self._timeout = timeout
        self._retry_interval = retry_interval
        self._max_retries = max_retries
        self._verify = verify
        self._last_event_id = last_event_id
        self._response: Any = None
        self._closed = False

    async def __aenter__(self) -> AsyncSSEClient:
        return self

    async def __aexit__(self, *args: Any) -> None:
        await self.close()

    async def __aiter__(self) -> AsyncIterator[SSEEvent]:
        retries = 0
        last_error: Exception | None = None

        while not self._closed:
            try:
                self._response = await self._connect()
                parser = self._init_parser()

                async for line in self._response.aiter_lines():
                    if self._closed:
                        return
                    event = parser.feed_line(line)
                    if event is not None:
                        self._handle_event(event)
                        retries = 0
                        yield event

                if self._closed:
                    return

            except self._reconnect_errors as exc:
                last_error = exc
            finally:
                await self._close_response()

            retries += 1
            self._check_reconnect(retries, last_error)
            await asyncio.sleep(self._retry_interval / 1000)

    async def _connect(self) -> Any:
        """Open a streaming async GET request."""
        headers = {
            "Accept": "text/event-stream",
            "Cache-Control": "no-cache",
            **self._user_headers,
        }
        if self._last_event_id:
            headers["Last-Event-ID"] = self._last_event_id

        resp = await self._transport(
            self._url,
            headers=headers,
            stream=True,
            timeout=self._timeout,
            verify=self._verify,
        )

        if resp.status_code == 204:
            await resp.aclose()
            self._closed = True
            return resp

        if not resp.ok:
            status = resp.status_code
            await resp.aclose()
            raise SSEHTTPError(status, self._url)

        return resp

    async def _close_response(self) -> None:
        if self._response is not None:
            # Tier 3: best-effort silent β€” reconnect cleanup
            try:
                await self._response.aclose()
            except Exception:
                pass
            self._response = None

    async def close(self) -> None:
        """Close the SSE connection."""
        self._closed = True
        await self._close_response()

close() async

Close the SSE connection.

Source code in sse/sse.py
async def close(self) -> None:
    """Close the SSE connection."""
    self._closed = True
    await self._close_response()

connect(url, **kwargs)

Open a synchronous SSE connection.

Shorthand for SSEClient(url, **kwargs). Use as a context manager::

with connect("https://example.com/events") as events:
    for event in events:
        print(event.data)

Parameters:

Name Type Description Default
url str

SSE endpoint URL.

required
**kwargs Any

Passed to SSEClient.

{}

Returns:

Type Description
SSEClient

An SSEClient instance.

Source code in sse/sse.py
def connect(url: str, **kwargs: Any) -> SSEClient:
    """Open a synchronous SSE connection.

    Shorthand for ``SSEClient(url, **kwargs)``.
    Use as a context manager::

        with connect("https://example.com/events") as events:
            for event in events:
                print(event.data)

    Args:
        url: SSE endpoint URL.
        **kwargs: Passed to ``SSEClient``.

    Returns:
        An ``SSEClient`` instance.
    """
    return SSEClient(url, **kwargs)

async_connect(url, **kwargs)

Open an asynchronous SSE connection.

Shorthand for AsyncSSEClient(url, **kwargs). Use as an async context manager::

async with async_connect("https://example.com/events") as events:
    async for event in events:
        print(event.data)

Parameters:

Name Type Description Default
url str

SSE endpoint URL.

required
**kwargs Any

Passed to AsyncSSEClient.

{}

Returns:

Type Description
AsyncSSEClient

An AsyncSSEClient instance.

Source code in sse/sse.py
def async_connect(url: str, **kwargs: Any) -> AsyncSSEClient:
    """Open an asynchronous SSE connection.

    Shorthand for ``AsyncSSEClient(url, **kwargs)``.
    Use as an async context manager::

        async with async_connect("https://example.com/events") as events:
            async for event in events:
                print(event.data)

    Args:
        url: SSE endpoint URL.
        **kwargs: Passed to ``AsyncSSEClient``.

    Returns:
        An ``AsyncSSEClient`` instance.
    """
    return AsyncSSEClient(url, **kwargs)