Skip to content

WebSocket Client API Reference

Auto-generated API documentation for the WebSocket Client module.

websocket

Zero-dependency WebSocket client (RFC 6455).

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

Provides sync and async WebSocket clients for ws:// and wss:// connections. Implements the core WebSocket protocol including text frames, ping/pong, close handshake, and client-side masking.

Usage::

# Sync
from websocket import WebSocketClient

with WebSocketClient("ws://localhost:9222/") as ws:
    ws.send("hello")
    response = ws.recv()
    print(response)

# Async
from websocket import AsyncWebSocketClient

async with AsyncWebSocketClient("wss://example.com/ws") as ws:
    await ws.send('{"type": "subscribe"}')
    data = await ws.recv()
    print(data)

Requires Python 3.10+.

WebSocketError

Bases: Exception

Base exception for all websocket operations.

Source code in websocket/websocket.py
class WebSocketError(Exception):
    """Base exception for all websocket operations."""

WebSocketConnectionError

Bases: WebSocketError

Raised on connection failures (TCP connect, handshake reject).

Source code in websocket/websocket.py
class WebSocketConnectionError(WebSocketError):
    """Raised on connection failures (TCP connect, handshake reject)."""

    def __init__(self, message: str, *, host: str = "", port: int = 0) -> None:
        self.host = host
        self.port = port
        super().__init__(message)

WebSocketTimeoutError

Bases: WebSocketError

Raised when an operation times out.

Source code in websocket/websocket.py
class WebSocketTimeoutError(WebSocketError):
    """Raised when an operation times out."""

    def __init__(self, message: str, *, url: str = "", timeout: float = 0.0) -> None:
        self.url = url
        self.timeout = timeout
        super().__init__(message)

WebSocketProtocolError

Bases: WebSocketError

Raised on protocol violations (bad frame, unexpected opcode).

Source code in websocket/websocket.py
class WebSocketProtocolError(WebSocketError):
    """Raised on protocol violations (bad frame, unexpected opcode)."""

WebSocketClient

Synchronous WebSocket client.

Parameters:

Name Type Description Default
url str

WebSocket URL (ws:// or wss://).

required
headers dict[str, str] | None

Optional extra headers for the upgrade request.

None
subprotocols list[str] | None

Optional list of subprotocols to negotiate.

None

Example::

with WebSocketClient("ws://localhost:9222/") as ws:
    ws.send("hello")
    print(ws.recv())
Source code in websocket/websocket.py
class WebSocketClient:
    """Synchronous WebSocket client.

    Args:
        url: WebSocket URL (``ws://`` or ``wss://``).
        headers: Optional extra headers for the upgrade request.
        subprotocols: Optional list of subprotocols to negotiate.

    Example::

        with WebSocketClient("ws://localhost:9222/") as ws:
            ws.send("hello")
            print(ws.recv())
    """

    def __init__(
        self,
        url: str,
        *,
        headers: dict[str, str] | None = None,
        subprotocols: list[str] | None = None,
    ) -> None:
        self._url = url
        self._headers = headers
        self._subprotocols = subprotocols
        self._host, self._port, self._path, self._is_secure = _parse_ws_url(url)
        self._sock: socket.socket | None = None
        self._connected = False
        self._closing = False
        self._accepted_subprotocol: str | None = None

    @property
    def connected(self) -> bool:
        """Whether the WebSocket connection is active."""
        return self._connected

    @property
    def accepted_subprotocol(self) -> str | None:
        """The subprotocol accepted by the server, if any."""
        return self._accepted_subprotocol

    def connect(
        self,
        *,
        timeout: float = DEFAULT_TIMEOUT,
        verify: bool = True,
    ) -> None:
        """Open the WebSocket connection.

        Args:
            timeout: Connection timeout in seconds.
            verify: Whether to verify TLS certificates (for ``wss://``).

        Raises:
            WebSocketConnectionError: If the TCP connection or handshake fails.
            WebSocketTimeoutError: If the connection times out.
        """
        if self._connected:
            return

        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            sock.connect((self._host, self._port))

            if self._is_secure:
                ctx = _make_ssl_context(verify)
                sock = ctx.wrap_socket(sock, server_hostname=self._host)

            # Send upgrade request
            key = base64.b64encode(os.urandom(16)).decode("ascii")
            request = _build_handshake_request(
                self._host,
                self._port,
                self._path,
                key,
                headers=self._headers,
                subprotocols=self._subprotocols,
                is_secure=self._is_secure,
            )
            sock.sendall(request)

            # Read response until \r\n\r\n
            response = self._read_handshake_response(sock)
            resp_headers = _validate_handshake_response(response, key)

            # Check subprotocol
            if self._subprotocols:
                accepted = resp_headers.get("sec-websocket-protocol", "")
                if accepted:
                    self._accepted_subprotocol = accepted

            self._sock = sock
            self._connected = True
            self._closing = False

        except socket.timeout as exc:
            raise WebSocketTimeoutError(
                f"connection to {self._url} timed out after {timeout}s",
                url=self._url,
                timeout=timeout,
            ) from exc
        except OSError as exc:
            raise WebSocketConnectionError(
                f"failed to connect to {self._host}:{self._port}: {exc}",
                host=self._host,
                port=self._port,
            ) from exc

    def send(self, data: str) -> None:
        """Send a text message.

        Args:
            data: Text message to send.

        Raises:
            WebSocketError: If not connected.
        """
        self._ensure_connected()
        payload = data.encode("utf-8")
        frame = _make_frame(_OPCODE_TEXT, payload, mask=True)
        assert self._sock is not None
        self._sock.sendall(frame)

    def recv(self, *, timeout: float | None = None) -> str:
        """Receive a text message.

        Automatically handles ping frames by sending pong responses.
        Raises on close frames.

        Args:
            timeout: Receive timeout in seconds. ``None`` uses the socket's
                current timeout.

        Returns:
            The received text message.

        Raises:
            WebSocketTimeoutError: If the receive times out.
            WebSocketProtocolError: On protocol errors.
            WebSocketConnectionError: If the connection is closed.
        """
        self._ensure_connected()
        assert self._sock is not None
        old_timeout = self._sock.gettimeout()
        if timeout is not None:
            self._sock.settimeout(timeout)
        try:
            return self._recv_message()
        except socket.timeout as exc:
            raise WebSocketTimeoutError(
                f"recv timed out after {timeout}s",
                url=self._url,
                timeout=timeout or 0.0,
            ) from exc
        finally:
            if timeout is not None:
                self._sock.settimeout(old_timeout)

    def ping(self, data: bytes = b"") -> None:
        """Send a ping frame.

        Args:
            data: Optional ping payload (max 125 bytes).
        """
        self._ensure_connected()
        assert self._sock is not None
        frame = _make_frame(_OPCODE_PING, data, mask=True)
        self._sock.sendall(frame)

    def close(self, code: int = 1000, reason: str = "") -> None:
        """Close the WebSocket connection.

        Sends a close frame, waits for the server's close frame response,
        then closes the underlying socket.

        Args:
            code: Close status code (default 1000 for normal closure).
            reason: Human-readable close reason.
        """
        if not self._connected or self._sock is None:
            return

        try:
            if not self._closing:
                self._closing = True
                payload = _make_close_payload(code, reason)
                frame = _make_frame(_OPCODE_CLOSE, payload, mask=True)
                self._sock.sendall(frame)

                # Try to receive server's close frame
                old_timeout = self._sock.gettimeout()
                self._sock.settimeout(2.0)
                try:
                    while True:
                        opcode, _, _ = self._read_frame()
                        if opcode == _OPCODE_CLOSE:
                            break
                except (socket.timeout, WebSocketError, OSError):
                    pass
                finally:
                    try:
                        self._sock.settimeout(old_timeout)
                    except OSError:
                        pass
        except OSError:
            pass
        finally:
            self._shutdown()

    def __enter__(self) -> WebSocketClient:
        self.connect()
        return self

    def __exit__(self, *args: object) -> None:
        self.close()

    # ── Sync internal helpers ──

    def _ensure_connected(self) -> None:
        if not self._connected or self._sock is None:
            raise WebSocketError("not connected")

    def _shutdown(self) -> None:
        """Close the socket and reset state."""
        if self._sock is not None:
            try:
                self._sock.shutdown(socket.SHUT_RDWR)
            except OSError:
                pass
            try:
                self._sock.close()
            except OSError:
                pass
            self._sock = None
        self._connected = False
        self._closing = False

    def _recv_message(self) -> str:
        """Read frames until a complete text message is received."""
        assert self._sock is not None
        while True:
            opcode, payload, fin = self._read_frame()

            if opcode == _OPCODE_TEXT:
                if not fin:
                    raise WebSocketProtocolError(
                        "fragmented messages are not supported"
                    )
                return payload.decode("utf-8")

            if opcode == _OPCODE_PING:
                pong = _make_frame(_OPCODE_PONG, payload, mask=True)
                self._sock.sendall(pong)
                continue

            if opcode == _OPCODE_PONG:
                continue

            if opcode == _OPCODE_CLOSE:
                close_code = 1005
                close_reason = ""
                if len(payload) >= 2:
                    close_code = struct.unpack(">H", payload[:2])[0]
                    close_reason = payload[2:].decode("utf-8", errors="replace")
                # Send close response if we didn't initiate
                if not self._closing:
                    self._closing = True
                    resp_payload = _make_close_payload(close_code, "")
                    resp_frame = _make_frame(_OPCODE_CLOSE, resp_payload, mask=True)
                    try:
                        self._sock.sendall(resp_frame)
                    except OSError:
                        pass
                self._shutdown()
                raise WebSocketConnectionError(
                    f"connection closed by server: {close_code} {close_reason}",
                    host=self._host,
                    port=self._port,
                )

            if opcode == _OPCODE_BINARY:
                raise WebSocketProtocolError("binary frames are not supported")

            raise WebSocketProtocolError(f"unexpected opcode: {opcode:#x}")

    def _read_frame(self) -> tuple[int, bytes, bool]:
        """Read a single WebSocket frame from the socket.

        Returns:
            Tuple of (opcode, payload, fin).
        """
        assert self._sock is not None
        header = _recv_exact(self._sock, 2)
        fin, opcode, is_masked, length = _parse_frame_header(header)

        # Extended payload length
        if length == 126:
            ext = _recv_exact(self._sock, 2)
            length = struct.unpack(">H", ext)[0]
        elif length == 127:
            ext = _recv_exact(self._sock, 8)
            length = struct.unpack(">Q", ext)[0]

        if length > _MAX_PAYLOAD_SIZE:
            raise WebSocketProtocolError(
                f"payload too large: {length} bytes (max {_MAX_PAYLOAD_SIZE})"
            )

        # Mask key (server should not mask, but handle it)
        mask_key = None
        if is_masked:
            mask_key = _recv_exact(self._sock, 4)

        # Payload
        payload = _recv_exact(self._sock, length) if length > 0 else b""
        if mask_key:
            payload = _mask_payload(mask_key, payload)

        return opcode, payload, fin

    @staticmethod
    def _read_handshake_response(sock: socket.socket) -> bytes:
        """Read the HTTP handshake response until \\r\\n\\r\\n."""
        buf = bytearray()
        while True:
            chunk = sock.recv(4096)
            if not chunk:
                raise WebSocketConnectionError("connection closed during handshake")
            buf.extend(chunk)
            if b"\r\n\r\n" in buf:
                return bytes(buf)
            if len(buf) > 65536:
                raise WebSocketConnectionError("handshake response too large")

connected property

Whether the WebSocket connection is active.

accepted_subprotocol property

The subprotocol accepted by the server, if any.

connect(*, timeout=DEFAULT_TIMEOUT, verify=True)

Open the WebSocket connection.

Parameters:

Name Type Description Default
timeout float

Connection timeout in seconds.

DEFAULT_TIMEOUT
verify bool

Whether to verify TLS certificates (for wss://).

True

Raises:

Type Description
WebSocketConnectionError

If the TCP connection or handshake fails.

WebSocketTimeoutError

If the connection times out.

Source code in websocket/websocket.py
def connect(
    self,
    *,
    timeout: float = DEFAULT_TIMEOUT,
    verify: bool = True,
) -> None:
    """Open the WebSocket connection.

    Args:
        timeout: Connection timeout in seconds.
        verify: Whether to verify TLS certificates (for ``wss://``).

    Raises:
        WebSocketConnectionError: If the TCP connection or handshake fails.
        WebSocketTimeoutError: If the connection times out.
    """
    if self._connected:
        return

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        sock.connect((self._host, self._port))

        if self._is_secure:
            ctx = _make_ssl_context(verify)
            sock = ctx.wrap_socket(sock, server_hostname=self._host)

        # Send upgrade request
        key = base64.b64encode(os.urandom(16)).decode("ascii")
        request = _build_handshake_request(
            self._host,
            self._port,
            self._path,
            key,
            headers=self._headers,
            subprotocols=self._subprotocols,
            is_secure=self._is_secure,
        )
        sock.sendall(request)

        # Read response until \r\n\r\n
        response = self._read_handshake_response(sock)
        resp_headers = _validate_handshake_response(response, key)

        # Check subprotocol
        if self._subprotocols:
            accepted = resp_headers.get("sec-websocket-protocol", "")
            if accepted:
                self._accepted_subprotocol = accepted

        self._sock = sock
        self._connected = True
        self._closing = False

    except socket.timeout as exc:
        raise WebSocketTimeoutError(
            f"connection to {self._url} timed out after {timeout}s",
            url=self._url,
            timeout=timeout,
        ) from exc
    except OSError as exc:
        raise WebSocketConnectionError(
            f"failed to connect to {self._host}:{self._port}: {exc}",
            host=self._host,
            port=self._port,
        ) from exc

send(data)

Send a text message.

Parameters:

Name Type Description Default
data str

Text message to send.

required

Raises:

Type Description
WebSocketError

If not connected.

Source code in websocket/websocket.py
def send(self, data: str) -> None:
    """Send a text message.

    Args:
        data: Text message to send.

    Raises:
        WebSocketError: If not connected.
    """
    self._ensure_connected()
    payload = data.encode("utf-8")
    frame = _make_frame(_OPCODE_TEXT, payload, mask=True)
    assert self._sock is not None
    self._sock.sendall(frame)

recv(*, timeout=None)

Receive a text message.

Automatically handles ping frames by sending pong responses. Raises on close frames.

Parameters:

Name Type Description Default
timeout float | None

Receive timeout in seconds. None uses the socket's current timeout.

None

Returns:

Type Description
str

The received text message.

Raises:

Type Description
WebSocketTimeoutError

If the receive times out.

WebSocketProtocolError

On protocol errors.

WebSocketConnectionError

If the connection is closed.

Source code in websocket/websocket.py
def recv(self, *, timeout: float | None = None) -> str:
    """Receive a text message.

    Automatically handles ping frames by sending pong responses.
    Raises on close frames.

    Args:
        timeout: Receive timeout in seconds. ``None`` uses the socket's
            current timeout.

    Returns:
        The received text message.

    Raises:
        WebSocketTimeoutError: If the receive times out.
        WebSocketProtocolError: On protocol errors.
        WebSocketConnectionError: If the connection is closed.
    """
    self._ensure_connected()
    assert self._sock is not None
    old_timeout = self._sock.gettimeout()
    if timeout is not None:
        self._sock.settimeout(timeout)
    try:
        return self._recv_message()
    except socket.timeout as exc:
        raise WebSocketTimeoutError(
            f"recv timed out after {timeout}s",
            url=self._url,
            timeout=timeout or 0.0,
        ) from exc
    finally:
        if timeout is not None:
            self._sock.settimeout(old_timeout)

ping(data=b'')

Send a ping frame.

Parameters:

Name Type Description Default
data bytes

Optional ping payload (max 125 bytes).

b''
Source code in websocket/websocket.py
def ping(self, data: bytes = b"") -> None:
    """Send a ping frame.

    Args:
        data: Optional ping payload (max 125 bytes).
    """
    self._ensure_connected()
    assert self._sock is not None
    frame = _make_frame(_OPCODE_PING, data, mask=True)
    self._sock.sendall(frame)

close(code=1000, reason='')

Close the WebSocket connection.

Sends a close frame, waits for the server's close frame response, then closes the underlying socket.

Parameters:

Name Type Description Default
code int

Close status code (default 1000 for normal closure).

1000
reason str

Human-readable close reason.

''
Source code in websocket/websocket.py
def close(self, code: int = 1000, reason: str = "") -> None:
    """Close the WebSocket connection.

    Sends a close frame, waits for the server's close frame response,
    then closes the underlying socket.

    Args:
        code: Close status code (default 1000 for normal closure).
        reason: Human-readable close reason.
    """
    if not self._connected or self._sock is None:
        return

    try:
        if not self._closing:
            self._closing = True
            payload = _make_close_payload(code, reason)
            frame = _make_frame(_OPCODE_CLOSE, payload, mask=True)
            self._sock.sendall(frame)

            # Try to receive server's close frame
            old_timeout = self._sock.gettimeout()
            self._sock.settimeout(2.0)
            try:
                while True:
                    opcode, _, _ = self._read_frame()
                    if opcode == _OPCODE_CLOSE:
                        break
            except (socket.timeout, WebSocketError, OSError):
                pass
            finally:
                try:
                    self._sock.settimeout(old_timeout)
                except OSError:
                    pass
    except OSError:
        pass
    finally:
        self._shutdown()

AsyncWebSocketClient

Asynchronous WebSocket client.

Parameters:

Name Type Description Default
url str

WebSocket URL (ws:// or wss://).

required
headers dict[str, str] | None

Optional extra headers for the upgrade request.

None
subprotocols list[str] | None

Optional list of subprotocols to negotiate.

None

Example::

async with AsyncWebSocketClient("wss://example.com/ws") as ws:
    await ws.send("hello")
    print(await ws.recv())
Source code in websocket/websocket.py
class AsyncWebSocketClient:
    """Asynchronous WebSocket client.

    Args:
        url: WebSocket URL (``ws://`` or ``wss://``).
        headers: Optional extra headers for the upgrade request.
        subprotocols: Optional list of subprotocols to negotiate.

    Example::

        async with AsyncWebSocketClient("wss://example.com/ws") as ws:
            await ws.send("hello")
            print(await ws.recv())
    """

    def __init__(
        self,
        url: str,
        *,
        headers: dict[str, str] | None = None,
        subprotocols: list[str] | None = None,
    ) -> None:
        self._url = url
        self._headers = headers
        self._subprotocols = subprotocols
        self._host, self._port, self._path, self._is_secure = _parse_ws_url(url)
        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._connected = False
        self._closing = False
        self._accepted_subprotocol: str | None = None

    @property
    def connected(self) -> bool:
        """Whether the WebSocket connection is active."""
        return self._connected

    @property
    def accepted_subprotocol(self) -> str | None:
        """The subprotocol accepted by the server, if any."""
        return self._accepted_subprotocol

    async def connect(
        self,
        *,
        timeout: float = DEFAULT_TIMEOUT,
        verify: bool = True,
    ) -> None:
        """Open the WebSocket connection.

        Args:
            timeout: Connection timeout in seconds.
            verify: Whether to verify TLS certificates (for ``wss://``).

        Raises:
            WebSocketConnectionError: If the TCP connection or handshake fails.
            WebSocketTimeoutError: If the connection times out.
        """
        if self._connected:
            return

        try:
            ssl_ctx = _make_ssl_context(verify) if self._is_secure else None
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(self._host, self._port, ssl=ssl_ctx),
                timeout=timeout,
            )

            # Send upgrade request
            key = base64.b64encode(os.urandom(16)).decode("ascii")
            request = _build_handshake_request(
                self._host,
                self._port,
                self._path,
                key,
                headers=self._headers,
                subprotocols=self._subprotocols,
                is_secure=self._is_secure,
            )
            writer.write(request)
            await writer.drain()

            # Read response until \r\n\r\n
            response = await asyncio.wait_for(
                self._async_read_handshake_response(reader),
                timeout=timeout,
            )
            resp_headers = _validate_handshake_response(response, key)

            # Check subprotocol
            if self._subprotocols:
                accepted = resp_headers.get("sec-websocket-protocol", "")
                if accepted:
                    self._accepted_subprotocol = accepted

            self._reader = reader
            self._writer = writer
            self._connected = True
            self._closing = False

        except asyncio.TimeoutError as exc:
            raise WebSocketTimeoutError(
                f"connection to {self._url} timed out after {timeout}s",
                url=self._url,
                timeout=timeout,
            ) from exc
        except OSError as exc:
            raise WebSocketConnectionError(
                f"failed to connect to {self._host}:{self._port}: {exc}",
                host=self._host,
                port=self._port,
            ) from exc

    async def send(self, data: str) -> None:
        """Send a text message.

        Args:
            data: Text message to send.

        Raises:
            WebSocketError: If not connected.
        """
        self._ensure_connected()
        assert self._writer is not None
        payload = data.encode("utf-8")
        frame = _make_frame(_OPCODE_TEXT, payload, mask=True)
        self._writer.write(frame)
        await self._writer.drain()

    async def recv(self, *, timeout: float | None = None) -> str:
        """Receive a text message.

        Automatically handles ping frames by sending pong responses.
        Raises on close frames.

        Args:
            timeout: Receive timeout in seconds. ``None`` waits indefinitely.

        Returns:
            The received text message.

        Raises:
            WebSocketTimeoutError: If the receive times out.
            WebSocketProtocolError: On protocol errors.
            WebSocketConnectionError: If the connection is closed.
        """
        self._ensure_connected()
        try:
            if timeout is not None:
                return await asyncio.wait_for(self._recv_message(), timeout=timeout)
            return await self._recv_message()
        except asyncio.TimeoutError as exc:
            raise WebSocketTimeoutError(
                f"recv timed out after {timeout}s",
                url=self._url,
                timeout=timeout or 0.0,
            ) from exc

    async def ping(self, data: bytes = b"") -> None:
        """Send a ping frame.

        Args:
            data: Optional ping payload (max 125 bytes).
        """
        self._ensure_connected()
        assert self._writer is not None
        frame = _make_frame(_OPCODE_PING, data, mask=True)
        self._writer.write(frame)
        await self._writer.drain()

    async def close(self, code: int = 1000, reason: str = "") -> None:
        """Close the WebSocket connection.

        Sends a close frame, waits for the server's close frame response,
        then closes the underlying transport.

        Args:
            code: Close status code (default 1000 for normal closure).
            reason: Human-readable close reason.
        """
        if not self._connected or self._writer is None:
            return

        try:
            if not self._closing:
                self._closing = True
                payload = _make_close_payload(code, reason)
                frame = _make_frame(_OPCODE_CLOSE, payload, mask=True)
                self._writer.write(frame)
                await self._writer.drain()

                # Try to receive server's close frame
                try:
                    while True:
                        opcode, _, _ = await asyncio.wait_for(
                            self._read_frame(), timeout=2.0
                        )
                        if opcode == _OPCODE_CLOSE:
                            break
                except (asyncio.TimeoutError, WebSocketError, OSError):
                    pass
        except OSError:
            pass
        finally:
            await self._shutdown()

    async def __aenter__(self) -> AsyncWebSocketClient:
        await self.connect()
        return self

    async def __aexit__(self, *args: object) -> None:
        await self.close()

    # ── Async internal helpers ──

    def _ensure_connected(self) -> None:
        if not self._connected or self._reader is None or self._writer is None:
            raise WebSocketError("not connected")

    async def _shutdown(self) -> None:
        """Close the transport and reset state."""
        if self._writer is not None:
            try:
                self._writer.close()
                await self._writer.wait_closed()
            except OSError:
                pass
            self._writer = None
        self._reader = None
        self._connected = False
        self._closing = False

    async def _recv_message(self) -> str:
        """Read frames until a complete text message is received."""
        assert self._writer is not None
        while True:
            opcode, payload, fin = await self._read_frame()

            if opcode == _OPCODE_TEXT:
                if not fin:
                    raise WebSocketProtocolError(
                        "fragmented messages are not supported"
                    )
                return payload.decode("utf-8")

            if opcode == _OPCODE_PING:
                pong = _make_frame(_OPCODE_PONG, payload, mask=True)
                self._writer.write(pong)
                await self._writer.drain()
                continue

            if opcode == _OPCODE_PONG:
                continue

            if opcode == _OPCODE_CLOSE:
                close_code = 1005
                close_reason = ""
                if len(payload) >= 2:
                    close_code = struct.unpack(">H", payload[:2])[0]
                    close_reason = payload[2:].decode("utf-8", errors="replace")
                # Send close response if we didn't initiate
                if not self._closing:
                    self._closing = True
                    resp_payload = _make_close_payload(close_code, "")
                    resp_frame = _make_frame(_OPCODE_CLOSE, resp_payload, mask=True)
                    try:
                        self._writer.write(resp_frame)
                        await self._writer.drain()
                    except OSError:
                        pass
                await self._shutdown()
                raise WebSocketConnectionError(
                    f"connection closed by server: {close_code} {close_reason}",
                    host=self._host,
                    port=self._port,
                )

            if opcode == _OPCODE_BINARY:
                raise WebSocketProtocolError("binary frames are not supported")

            raise WebSocketProtocolError(f"unexpected opcode: {opcode:#x}")

    async def _read_frame(self) -> tuple[int, bytes, bool]:
        """Read a single WebSocket frame from the stream.

        Returns:
            Tuple of (opcode, payload, fin).
        """
        assert self._reader is not None
        header = await self._reader.readexactly(2)
        fin, opcode, is_masked, length = _parse_frame_header(header)

        # Extended payload length
        if length == 126:
            ext = await self._reader.readexactly(2)
            length = struct.unpack(">H", ext)[0]
        elif length == 127:
            ext = await self._reader.readexactly(8)
            length = struct.unpack(">Q", ext)[0]

        if length > _MAX_PAYLOAD_SIZE:
            raise WebSocketProtocolError(
                f"payload too large: {length} bytes (max {_MAX_PAYLOAD_SIZE})"
            )

        # Mask key (server should not mask, but handle it)
        mask_key = None
        if is_masked:
            mask_key = await self._reader.readexactly(4)

        # Payload
        payload = await self._reader.readexactly(length) if length > 0 else b""
        if mask_key:
            payload = _mask_payload(mask_key, payload)

        return opcode, payload, fin

    @staticmethod
    async def _async_read_handshake_response(
        reader: asyncio.StreamReader,
    ) -> bytes:
        """Read the HTTP handshake response until \\r\\n\\r\\n."""
        buf = bytearray()
        while True:
            chunk = await reader.read(4096)
            if not chunk:
                raise WebSocketConnectionError("connection closed during handshake")
            buf.extend(chunk)
            if b"\r\n\r\n" in buf:
                return bytes(buf)
            if len(buf) > 65536:
                raise WebSocketConnectionError("handshake response too large")

connected property

Whether the WebSocket connection is active.

accepted_subprotocol property

The subprotocol accepted by the server, if any.

connect(*, timeout=DEFAULT_TIMEOUT, verify=True) async

Open the WebSocket connection.

Parameters:

Name Type Description Default
timeout float

Connection timeout in seconds.

DEFAULT_TIMEOUT
verify bool

Whether to verify TLS certificates (for wss://).

True

Raises:

Type Description
WebSocketConnectionError

If the TCP connection or handshake fails.

WebSocketTimeoutError

If the connection times out.

Source code in websocket/websocket.py
async def connect(
    self,
    *,
    timeout: float = DEFAULT_TIMEOUT,
    verify: bool = True,
) -> None:
    """Open the WebSocket connection.

    Args:
        timeout: Connection timeout in seconds.
        verify: Whether to verify TLS certificates (for ``wss://``).

    Raises:
        WebSocketConnectionError: If the TCP connection or handshake fails.
        WebSocketTimeoutError: If the connection times out.
    """
    if self._connected:
        return

    try:
        ssl_ctx = _make_ssl_context(verify) if self._is_secure else None
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(self._host, self._port, ssl=ssl_ctx),
            timeout=timeout,
        )

        # Send upgrade request
        key = base64.b64encode(os.urandom(16)).decode("ascii")
        request = _build_handshake_request(
            self._host,
            self._port,
            self._path,
            key,
            headers=self._headers,
            subprotocols=self._subprotocols,
            is_secure=self._is_secure,
        )
        writer.write(request)
        await writer.drain()

        # Read response until \r\n\r\n
        response = await asyncio.wait_for(
            self._async_read_handshake_response(reader),
            timeout=timeout,
        )
        resp_headers = _validate_handshake_response(response, key)

        # Check subprotocol
        if self._subprotocols:
            accepted = resp_headers.get("sec-websocket-protocol", "")
            if accepted:
                self._accepted_subprotocol = accepted

        self._reader = reader
        self._writer = writer
        self._connected = True
        self._closing = False

    except asyncio.TimeoutError as exc:
        raise WebSocketTimeoutError(
            f"connection to {self._url} timed out after {timeout}s",
            url=self._url,
            timeout=timeout,
        ) from exc
    except OSError as exc:
        raise WebSocketConnectionError(
            f"failed to connect to {self._host}:{self._port}: {exc}",
            host=self._host,
            port=self._port,
        ) from exc

send(data) async

Send a text message.

Parameters:

Name Type Description Default
data str

Text message to send.

required

Raises:

Type Description
WebSocketError

If not connected.

Source code in websocket/websocket.py
async def send(self, data: str) -> None:
    """Send a text message.

    Args:
        data: Text message to send.

    Raises:
        WebSocketError: If not connected.
    """
    self._ensure_connected()
    assert self._writer is not None
    payload = data.encode("utf-8")
    frame = _make_frame(_OPCODE_TEXT, payload, mask=True)
    self._writer.write(frame)
    await self._writer.drain()

recv(*, timeout=None) async

Receive a text message.

Automatically handles ping frames by sending pong responses. Raises on close frames.

Parameters:

Name Type Description Default
timeout float | None

Receive timeout in seconds. None waits indefinitely.

None

Returns:

Type Description
str

The received text message.

Raises:

Type Description
WebSocketTimeoutError

If the receive times out.

WebSocketProtocolError

On protocol errors.

WebSocketConnectionError

If the connection is closed.

Source code in websocket/websocket.py
async def recv(self, *, timeout: float | None = None) -> str:
    """Receive a text message.

    Automatically handles ping frames by sending pong responses.
    Raises on close frames.

    Args:
        timeout: Receive timeout in seconds. ``None`` waits indefinitely.

    Returns:
        The received text message.

    Raises:
        WebSocketTimeoutError: If the receive times out.
        WebSocketProtocolError: On protocol errors.
        WebSocketConnectionError: If the connection is closed.
    """
    self._ensure_connected()
    try:
        if timeout is not None:
            return await asyncio.wait_for(self._recv_message(), timeout=timeout)
        return await self._recv_message()
    except asyncio.TimeoutError as exc:
        raise WebSocketTimeoutError(
            f"recv timed out after {timeout}s",
            url=self._url,
            timeout=timeout or 0.0,
        ) from exc

ping(data=b'') async

Send a ping frame.

Parameters:

Name Type Description Default
data bytes

Optional ping payload (max 125 bytes).

b''
Source code in websocket/websocket.py
async def ping(self, data: bytes = b"") -> None:
    """Send a ping frame.

    Args:
        data: Optional ping payload (max 125 bytes).
    """
    self._ensure_connected()
    assert self._writer is not None
    frame = _make_frame(_OPCODE_PING, data, mask=True)
    self._writer.write(frame)
    await self._writer.drain()

close(code=1000, reason='') async

Close the WebSocket connection.

Sends a close frame, waits for the server's close frame response, then closes the underlying transport.

Parameters:

Name Type Description Default
code int

Close status code (default 1000 for normal closure).

1000
reason str

Human-readable close reason.

''
Source code in websocket/websocket.py
async def close(self, code: int = 1000, reason: str = "") -> None:
    """Close the WebSocket connection.

    Sends a close frame, waits for the server's close frame response,
    then closes the underlying transport.

    Args:
        code: Close status code (default 1000 for normal closure).
        reason: Human-readable close reason.
    """
    if not self._connected or self._writer is None:
        return

    try:
        if not self._closing:
            self._closing = True
            payload = _make_close_payload(code, reason)
            frame = _make_frame(_OPCODE_CLOSE, payload, mask=True)
            self._writer.write(frame)
            await self._writer.drain()

            # Try to receive server's close frame
            try:
                while True:
                    opcode, _, _ = await asyncio.wait_for(
                        self._read_frame(), timeout=2.0
                    )
                    if opcode == _OPCODE_CLOSE:
                        break
            except (asyncio.TimeoutError, WebSocketError, OSError):
                pass
    except OSError:
        pass
    finally:
        await self._shutdown()