Skip to content

HTTP Client API Reference

Auto-generated API documentation for the HTTP client module.

httpclient

Zero-dependency sync + async HTTP REST client.

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

Sync (http.client) and async (asyncio streams) HTTP/1.1 client for REST API consumption. Thread-safe by design.

Sync usage::

response = get("https://httpbin.org/get")
response.json()

Async usage::

response = await async_get("https://httpbin.org/get")
response.json()

Session usage::

with Client() as client:
    r = client.get("https://httpbin.org/get")

async with AsyncClient() as client:
    r = await client.get("https://httpbin.org/get")

HttpClientError

Bases: Exception

Base exception for all httpclient operations.

Source code in httpclient/httpclient.py
class HttpClientError(Exception):
    """Base exception for all httpclient operations."""

HTTPError

Bases: HttpClientError

Raised on non-2xx status when raise_for_status() is called.

Source code in httpclient/httpclient.py
class HTTPError(HttpClientError):
    """Raised on non-2xx status when raise_for_status() is called."""

    def __init__(self, status_code: int, body: str, url: str) -> None:
        self.status_code = status_code
        self.body = body
        self.url = url
        super().__init__(f"HTTP {status_code} for {url}")

TooManyRedirects

Bases: HTTPError

Raised when redirect limit is exceeded.

Source code in httpclient/httpclient.py
class TooManyRedirects(HTTPError):
    """Raised when redirect limit is exceeded."""

    def __init__(self, url: str, max_redirects: int) -> None:
        super().__init__(0, "", url)
        self.max_redirects = max_redirects
        Exception.__init__(self, f"Too many redirects (>{max_redirects}) for {url}")

HttpConnectionError

Bases: HttpClientError

Raised on connection failures.

Attributes:

Name Type Description
host

Remote hostname that the connection targeted.

port

Remote port number.

message

Human-readable error description.

Source code in httpclient/httpclient.py
class HttpConnectionError(HttpClientError):
    """Raised on connection failures.

    Attributes:
        host: Remote hostname that the connection targeted.
        port: Remote port number.
        message: Human-readable error description.
    """

    def __init__(self, message: str, *, host: str = "", port: int = 0) -> None:
        self.host = host
        self.port = port
        self.message = message
        super().__init__(message)

HttpTimeoutError

Bases: HttpClientError

Raised on request timeout.

Attributes:

Name Type Description
url

The URL that timed out.

timeout

The timeout value in seconds that was exceeded.

message

Human-readable error description.

Source code in httpclient/httpclient.py
class HttpTimeoutError(HttpClientError):
    """Raised on request timeout.

    Attributes:
        url: The URL that timed out.
        timeout: The timeout value in seconds that was exceeded.
        message: Human-readable error description.
    """

    def __init__(self, message: str, *, url: str = "", timeout: float = 0.0) -> None:
        self.url = url
        self.timeout = timeout
        self.message = message
        super().__init__(message)

Socks5Error

Bases: HttpConnectionError

Raised on SOCKS5 proxy handshake failures.

Source code in httpclient/httpclient.py
class Socks5Error(HttpConnectionError):
    """Raised on SOCKS5 proxy handshake failures."""

Response

HTTP response object.

Attributes:

Name Type Description
status_code

HTTP status code.

headers

Response headers as dict (last value wins for duplicates).

content

Raw response body as bytes.

url

Final URL after redirects.

Source code in httpclient/httpclient.py
class Response:
    """HTTP response object.

    Attributes:
        status_code: HTTP status code.
        headers: Response headers as dict (last value wins for duplicates).
        content: Raw response body as bytes.
        url: Final URL after redirects.
    """

    __slots__ = ("status_code", "headers", "content", "url", "_text", "_json")

    def __init__(
        self,
        status_code: int,
        headers: dict[str, str],
        content: bytes,
        url: str,
    ) -> None:
        self.status_code = status_code
        self.headers = headers
        self.content = content
        self.url = url
        self._text: str | None = None
        self._json: Any = None

    @property
    def text(self) -> str:
        """Decode response body as text."""
        if self._text is None:
            encoding = self._guess_encoding()
            self._text = self.content.decode(encoding, errors="replace")
        return self._text

    def json(self) -> Any:
        """Parse response body as JSON."""
        if self._json is None:
            self._json = _json.loads(self.content)
        return self._json

    @property
    def ok(self) -> bool:
        """True if status_code is 2xx."""
        return 200 <= self.status_code < 300

    def raise_for_status(self) -> None:
        """Raise HTTPError if status is not 2xx."""
        if not self.ok:
            raise HTTPError(self.status_code, self.text, self.url)

    def _guess_encoding(self) -> str:
        return _guess_encoding_from_headers(self.headers)

    # ── Context managers (no-op, body is already fully read) ──

    def __enter__(self) -> Response:
        return self

    def __exit__(self, *args: Any) -> None:
        self.close()

    async def __aenter__(self) -> Response:
        return self

    async def __aexit__(self, *args: Any) -> None:
        self.close()

    def close(self) -> None:
        """No-op close for a fully-read response."""

    async def aclose(self) -> None:
        """No-op async close for a fully-read response."""

    def __repr__(self) -> str:
        return f"<Response [{self.status_code}]>"

text property

Decode response body as text.

ok property

True if status_code is 2xx.

json()

Parse response body as JSON.

Source code in httpclient/httpclient.py
def json(self) -> Any:
    """Parse response body as JSON."""
    if self._json is None:
        self._json = _json.loads(self.content)
    return self._json

raise_for_status()

Raise HTTPError if status is not 2xx.

Source code in httpclient/httpclient.py
def raise_for_status(self) -> None:
    """Raise HTTPError if status is not 2xx."""
    if not self.ok:
        raise HTTPError(self.status_code, self.text, self.url)

close()

No-op close for a fully-read response.

Source code in httpclient/httpclient.py
def close(self) -> None:
    """No-op close for a fully-read response."""

aclose() async

No-op async close for a fully-read response.

Source code in httpclient/httpclient.py
async def aclose(self) -> None:
    """No-op async close for a fully-read response."""

Auth

Base class for HTTP authentication.

Source code in httpclient/httpclient.py
class Auth:
    """Base class for HTTP authentication."""

    def auth_headers(self, method: str, url: str) -> dict[str, str]:
        """Return authorization headers.

        Args:
            method: HTTP method.
            url: Request URL.

        Returns:
            Dict of headers to add to the request.
        """
        raise NotImplementedError

auth_headers(method, url)

Return authorization headers.

Parameters:

Name Type Description Default
method str

HTTP method.

required
url str

Request URL.

required

Returns:

Type Description
dict[str, str]

Dict of headers to add to the request.

Source code in httpclient/httpclient.py
def auth_headers(self, method: str, url: str) -> dict[str, str]:
    """Return authorization headers.

    Args:
        method: HTTP method.
        url: Request URL.

    Returns:
        Dict of headers to add to the request.
    """
    raise NotImplementedError

BasicAuth

Bases: Auth

HTTP Basic authentication.

Source code in httpclient/httpclient.py
class BasicAuth(Auth):
    """HTTP Basic authentication."""

    def __init__(self, username: str, password: str) -> None:
        self._username = username
        self._password = password

    def auth_headers(self, method: str, url: str) -> dict[str, str]:
        """Return Basic Authorization header."""
        credentials = f"{self._username}:{self._password}".encode()
        return {"Authorization": "Basic " + base64.b64encode(credentials).decode()}

auth_headers(method, url)

Return Basic Authorization header.

Source code in httpclient/httpclient.py
def auth_headers(self, method: str, url: str) -> dict[str, str]:
    """Return Basic Authorization header."""
    credentials = f"{self._username}:{self._password}".encode()
    return {"Authorization": "Basic " + base64.b64encode(credentials).decode()}

DigestAuth

Bases: Auth

HTTP Digest authentication.

Source code in httpclient/httpclient.py
class DigestAuth(Auth):
    """HTTP Digest authentication."""

    def __init__(self, username: str, password: str) -> None:
        self._username = username
        self._password = password
        self._nc = 0

    def auth_headers(self, method: str, url: str) -> dict[str, str]:
        """Not usable without a server challenge."""
        raise NotImplementedError("DigestAuth requires a server challenge")

    def auth_headers_from_challenge(
        self, method: str, path: str, challenge: str
    ) -> dict[str, str]:
        """Compute Digest auth headers from a WWW-Authenticate challenge.

        Args:
            method: HTTP method.
            path: Request path (URI).
            challenge: The WWW-Authenticate header value.

        Returns:
            Dict with the Authorization header.
        """
        params = _parse_digest_challenge(challenge)
        realm = params.get("realm", "")
        nonce = params.get("nonce", "")
        qop = params.get("qop", "")
        opaque = params.get("opaque", "")
        algorithm = params.get("algorithm", "MD5").upper()

        self._nc += 1
        nc_hex = f"{self._nc:08x}"
        cnonce = os.urandom(16).hex()

        if algorithm == "SHA-256":
            hash_fn = hashlib.sha256
        else:
            hash_fn = hashlib.md5

        ha1 = hash_fn(f"{self._username}:{realm}:{self._password}".encode()).hexdigest()
        ha2 = hash_fn(f"{method}:{path}".encode()).hexdigest()

        if qop == "auth":
            response = hash_fn(
                f"{ha1}:{nonce}:{nc_hex}:{cnonce}:{qop}:{ha2}".encode()
            ).hexdigest()
        else:
            response = hash_fn(f"{ha1}:{nonce}:{ha2}".encode()).hexdigest()

        header = (
            f'Digest username="{self._username}", realm="{realm}", '
            f'nonce="{nonce}", uri="{path}", response="{response}"'
        )
        if qop:
            header += f', qop={qop}, nc={nc_hex}, cnonce="{cnonce}"'
        if opaque:
            header += f', opaque="{opaque}"'
        header += f", algorithm={algorithm}"

        return {"Authorization": header}

auth_headers(method, url)

Not usable without a server challenge.

Source code in httpclient/httpclient.py
def auth_headers(self, method: str, url: str) -> dict[str, str]:
    """Not usable without a server challenge."""
    raise NotImplementedError("DigestAuth requires a server challenge")

auth_headers_from_challenge(method, path, challenge)

Compute Digest auth headers from a WWW-Authenticate challenge.

Parameters:

Name Type Description Default
method str

HTTP method.

required
path str

Request path (URI).

required
challenge str

The WWW-Authenticate header value.

required

Returns:

Type Description
dict[str, str]

Dict with the Authorization header.

Source code in httpclient/httpclient.py
def auth_headers_from_challenge(
    self, method: str, path: str, challenge: str
) -> dict[str, str]:
    """Compute Digest auth headers from a WWW-Authenticate challenge.

    Args:
        method: HTTP method.
        path: Request path (URI).
        challenge: The WWW-Authenticate header value.

    Returns:
        Dict with the Authorization header.
    """
    params = _parse_digest_challenge(challenge)
    realm = params.get("realm", "")
    nonce = params.get("nonce", "")
    qop = params.get("qop", "")
    opaque = params.get("opaque", "")
    algorithm = params.get("algorithm", "MD5").upper()

    self._nc += 1
    nc_hex = f"{self._nc:08x}"
    cnonce = os.urandom(16).hex()

    if algorithm == "SHA-256":
        hash_fn = hashlib.sha256
    else:
        hash_fn = hashlib.md5

    ha1 = hash_fn(f"{self._username}:{realm}:{self._password}".encode()).hexdigest()
    ha2 = hash_fn(f"{method}:{path}".encode()).hexdigest()

    if qop == "auth":
        response = hash_fn(
            f"{ha1}:{nonce}:{nc_hex}:{cnonce}:{qop}:{ha2}".encode()
        ).hexdigest()
    else:
        response = hash_fn(f"{ha1}:{nonce}:{ha2}".encode()).hexdigest()

    header = (
        f'Digest username="{self._username}", realm="{realm}", '
        f'nonce="{nonce}", uri="{path}", response="{response}"'
    )
    if qop:
        header += f', qop={qop}, nc={nc_hex}, cnonce="{cnonce}"'
    if opaque:
        header += f', opaque="{opaque}"'
    header += f", algorithm={algorithm}"

    return {"Authorization": header}

StreamingResponse

HTTP streaming response -- holds the connection open.

Use as a context manager to ensure cleanup::

with get(url, stream=True) as r:
    for chunk in r.iter_bytes():
        process(chunk)

async with await async_get(url, stream=True) as r:
    async for line in r.aiter_lines():
        handle(line)
Source code in httpclient/httpclient.py
class StreamingResponse:
    """HTTP streaming response -- holds the connection open.

    Use as a context manager to ensure cleanup::

        with get(url, stream=True) as r:
            for chunk in r.iter_bytes():
                process(chunk)

        async with await async_get(url, stream=True) as r:
            async for line in r.aiter_lines():
                handle(line)
    """

    __slots__ = (
        "status_code",
        "headers",
        "url",
        "_encoding",
        "_decompressor",
        "_sync_resp",
        "_sync_conn",
        "_async_reader",
        "_async_writer",
        "_async_timeout",
        "_is_chunked",
        "_content_length",
        "_bytes_remaining",
        "_closed",
    )

    status_code: int
    headers: dict[str, str]
    url: str
    _encoding: str
    _decompressor: zlib._Decompress | None
    _sync_resp: http.client.HTTPResponse | None
    _sync_conn: http.client.HTTPConnection | None
    _async_reader: asyncio.StreamReader | None
    _async_writer: asyncio.StreamWriter | None
    _async_timeout: float | None
    _is_chunked: bool
    _content_length: int | None
    _bytes_remaining: int | None
    _closed: bool

    def __init__(self) -> None:
        raise TypeError("Use _from_sync() or _from_async()")

    @classmethod
    def _from_sync(
        cls,
        status_code: int,
        headers: dict[str, str],
        url: str,
        resp: http.client.HTTPResponse,
        conn: http.client.HTTPConnection,
        content_encoding: str = "",
    ) -> "StreamingResponse":
        obj = object.__new__(cls)
        obj.status_code = status_code
        obj.headers = headers
        obj.url = url
        obj._encoding = _guess_encoding_from_headers(headers)
        obj._decompressor = (
            _make_decompressor(content_encoding) if content_encoding else None
        )
        obj._sync_resp = resp
        obj._sync_conn = conn
        obj._async_reader = None
        obj._async_writer = None
        obj._async_timeout = None
        obj._is_chunked = False
        obj._content_length = None
        obj._bytes_remaining = None
        obj._closed = False
        return obj

    @classmethod
    def _from_async(
        cls,
        status_code: int,
        headers: dict[str, str],
        url: str,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
        is_chunked: bool,
        content_length: int | None,
        timeout: float,
        content_encoding: str = "",
    ) -> "StreamingResponse":
        obj = object.__new__(cls)
        obj.status_code = status_code
        obj.headers = headers
        obj.url = url
        obj._encoding = _guess_encoding_from_headers(headers)
        obj._decompressor = (
            _make_decompressor(content_encoding) if content_encoding else None
        )
        obj._sync_resp = None
        obj._sync_conn = None
        obj._async_reader = reader
        obj._async_writer = writer
        obj._async_timeout = timeout
        obj._is_chunked = is_chunked
        obj._content_length = content_length
        obj._bytes_remaining = content_length
        obj._closed = False
        return obj

    @property
    def ok(self) -> bool:
        """True if status_code is 2xx."""
        return 200 <= self.status_code < 300

    def raise_for_status(self) -> None:
        """Raise HTTPError if status is not 2xx."""
        if not self.ok:
            raise HTTPError(self.status_code, "", self.url)

    # ── Sync iteration ──

    def iter_bytes(self, chunk_size: int = 4096) -> Iterator[bytes]:
        """Yield response body in chunks."""
        if self._sync_resp is None:
            raise RuntimeError("iter_bytes() on async response")
        try:
            while True:
                chunk = self._sync_resp.read(chunk_size)
                if not chunk:
                    break
                if self._decompressor:
                    chunk = self._decompressor.decompress(chunk)
                yield chunk
            if self._decompressor:
                remaining = self._decompressor.flush()
                if remaining:
                    yield remaining
        except (OSError, http.client.HTTPException) as exc:
            raise HttpConnectionError(str(exc)) from exc

    def iter_lines(self) -> Iterator[str]:
        """Yield response body line by line (decoded)."""
        if self._sync_resp is None:
            raise RuntimeError("iter_lines() on async response")
        try:
            while True:
                line = self._sync_resp.readline()
                if not line:
                    break
                yield line.decode(self._encoding, errors="replace").rstrip("\r\n")
        except (OSError, http.client.HTTPException) as exc:
            raise HttpConnectionError(str(exc)) from exc

    def read(self) -> bytes:
        """Consume entire stream into bytes."""
        return b"".join(self.iter_bytes())

    # ── Async iteration ──

    async def aiter_bytes(self, chunk_size: int = 4096) -> AsyncIterator[bytes]:
        """Async yield response body in chunks."""
        if self._async_reader is None:
            raise RuntimeError("aiter_bytes() on sync response")
        try:
            raw_iter = self._select_raw_iterator(chunk_size)
            async for chunk in raw_iter:
                if self._decompressor:
                    chunk = self._decompressor.decompress(chunk)
                yield chunk
            if self._decompressor:
                remaining = self._decompressor.flush()
                if remaining:
                    yield remaining
        except asyncio.TimeoutError:
            raise HttpTimeoutError(
                f"Streaming read timed out for {self.url}",
                url=self.url,
                timeout=self._async_timeout or 0.0,
            )
        except OSError as exc:
            raise HttpConnectionError(str(exc)) from exc

    async def _select_raw_iterator(self, chunk_size: int) -> AsyncIterator[bytes]:
        """Select and yield from the appropriate raw byte iterator."""
        if self._is_chunked:
            async for chunk in self._aiter_chunked():
                yield chunk
        elif self._bytes_remaining is not None:
            async for chunk in self._aiter_fixed_length(chunk_size):
                yield chunk
        else:
            async for chunk in self._aiter_until_eof(chunk_size):
                yield chunk

    async def _aiter_fixed_length(self, chunk_size: int) -> AsyncIterator[bytes]:
        """Read a known-length response body in chunks."""
        assert self._async_reader is not None
        while self._bytes_remaining is not None and self._bytes_remaining > 0:
            to_read = min(chunk_size, self._bytes_remaining)
            data = await asyncio.wait_for(
                self._async_reader.read(to_read),
                timeout=self._async_timeout,
            )
            if not data:
                break
            self._bytes_remaining -= len(data)
            yield data

    async def _aiter_until_eof(self, chunk_size: int) -> AsyncIterator[bytes]:
        """Read response body until EOF in chunks."""
        assert self._async_reader is not None
        while True:
            data = await asyncio.wait_for(
                self._async_reader.read(chunk_size),
                timeout=self._async_timeout,
            )
            if not data:
                break
            yield data

    async def _aiter_chunked(self) -> AsyncIterator[bytes]:
        """Decode chunked transfer encoding from async reader."""
        assert self._async_reader is not None  # guaranteed by aiter_bytes guard
        reader = self._async_reader
        timeout = self._async_timeout
        while True:
            size_line = await asyncio.wait_for(reader.readline(), timeout=timeout)
            size_str = size_line.decode("latin-1").split(";")[0].strip()
            if not size_str:
                break
            chunk_size = int(size_str, 16)
            if chunk_size == 0:
                await asyncio.wait_for(
                    reader.readline(), timeout=timeout
                )  # trailing \r\n
                break
            data = await asyncio.wait_for(
                reader.readexactly(chunk_size), timeout=timeout
            )
            await asyncio.wait_for(reader.readline(), timeout=timeout)  # trailing \r\n
            yield data

    async def aiter_lines(self) -> AsyncIterator[str]:
        """Async yield response body line by line (decoded)."""
        buf = ""
        async for chunk in self.aiter_bytes():
            buf += chunk.decode(self._encoding, errors="replace")
            while "\n" in buf:
                line, buf = buf.split("\n", 1)
                yield line.rstrip("\r")
        if buf:
            yield buf.rstrip("\r")

    async def aread(self) -> bytes:
        """Async consume entire stream into bytes."""
        parts = []
        async for chunk in self.aiter_bytes():
            parts.append(chunk)
        return b"".join(parts)

    # ── Context managers ──

    def __enter__(self) -> "StreamingResponse":
        return self

    def __exit__(self, *args: Any) -> None:
        self.close()

    async def __aenter__(self) -> "StreamingResponse":
        return self

    async def __aexit__(self, *args: Any) -> None:
        await self.aclose()

    def close(self) -> None:
        """Close the underlying sync connection."""
        if self._closed:
            return
        self._closed = True
        # Tier 2: best-effort observable -- active streaming resource
        if self._sync_resp is not None:
            try:
                self._sync_resp.close()
            except Exception:
                logger.debug(
                    "failed to close sync response for %s",
                    self.url,
                    exc_info=True,
                )
        if self._sync_conn is not None:
            try:
                self._sync_conn.close()
            except Exception:
                logger.debug(
                    "failed to close sync connection for %s",
                    self.url,
                    exc_info=True,
                )

    async def aclose(self) -> None:
        """Close the underlying async connection."""
        if self._closed:
            return
        self._closed = True
        # Tier 2: best-effort observable -- active streaming resource
        if self._async_writer is not None:
            try:
                self._async_writer.close()
                await self._async_writer.wait_closed()
            except Exception:
                logger.debug(
                    "failed to close async writer for %s",
                    self.url,
                    exc_info=True,
                )

    def __del__(self) -> None:
        if not self._closed:
            warnings.warn(
                f"Unclosed StreamingResponse for {self.url}",
                ResourceWarning,
                stacklevel=2,
            )
            self.close()

    def __repr__(self) -> str:
        return f"<StreamingResponse [{self.status_code}]>"

ok property

True if status_code is 2xx.

raise_for_status()

Raise HTTPError if status is not 2xx.

Source code in httpclient/httpclient.py
def raise_for_status(self) -> None:
    """Raise HTTPError if status is not 2xx."""
    if not self.ok:
        raise HTTPError(self.status_code, "", self.url)

iter_bytes(chunk_size=4096)

Yield response body in chunks.

Source code in httpclient/httpclient.py
def iter_bytes(self, chunk_size: int = 4096) -> Iterator[bytes]:
    """Yield response body in chunks."""
    if self._sync_resp is None:
        raise RuntimeError("iter_bytes() on async response")
    try:
        while True:
            chunk = self._sync_resp.read(chunk_size)
            if not chunk:
                break
            if self._decompressor:
                chunk = self._decompressor.decompress(chunk)
            yield chunk
        if self._decompressor:
            remaining = self._decompressor.flush()
            if remaining:
                yield remaining
    except (OSError, http.client.HTTPException) as exc:
        raise HttpConnectionError(str(exc)) from exc

iter_lines()

Yield response body line by line (decoded).

Source code in httpclient/httpclient.py
def iter_lines(self) -> Iterator[str]:
    """Yield response body line by line (decoded)."""
    if self._sync_resp is None:
        raise RuntimeError("iter_lines() on async response")
    try:
        while True:
            line = self._sync_resp.readline()
            if not line:
                break
            yield line.decode(self._encoding, errors="replace").rstrip("\r\n")
    except (OSError, http.client.HTTPException) as exc:
        raise HttpConnectionError(str(exc)) from exc

read()

Consume entire stream into bytes.

Source code in httpclient/httpclient.py
def read(self) -> bytes:
    """Consume entire stream into bytes."""
    return b"".join(self.iter_bytes())

aiter_bytes(chunk_size=4096) async

Async yield response body in chunks.

Source code in httpclient/httpclient.py
async def aiter_bytes(self, chunk_size: int = 4096) -> AsyncIterator[bytes]:
    """Async yield response body in chunks."""
    if self._async_reader is None:
        raise RuntimeError("aiter_bytes() on sync response")
    try:
        raw_iter = self._select_raw_iterator(chunk_size)
        async for chunk in raw_iter:
            if self._decompressor:
                chunk = self._decompressor.decompress(chunk)
            yield chunk
        if self._decompressor:
            remaining = self._decompressor.flush()
            if remaining:
                yield remaining
    except asyncio.TimeoutError:
        raise HttpTimeoutError(
            f"Streaming read timed out for {self.url}",
            url=self.url,
            timeout=self._async_timeout or 0.0,
        )
    except OSError as exc:
        raise HttpConnectionError(str(exc)) from exc

aiter_lines() async

Async yield response body line by line (decoded).

Source code in httpclient/httpclient.py
async def aiter_lines(self) -> AsyncIterator[str]:
    """Async yield response body line by line (decoded)."""
    buf = ""
    async for chunk in self.aiter_bytes():
        buf += chunk.decode(self._encoding, errors="replace")
        while "\n" in buf:
            line, buf = buf.split("\n", 1)
            yield line.rstrip("\r")
    if buf:
        yield buf.rstrip("\r")

aread() async

Async consume entire stream into bytes.

Source code in httpclient/httpclient.py
async def aread(self) -> bytes:
    """Async consume entire stream into bytes."""
    parts = []
    async for chunk in self.aiter_bytes():
        parts.append(chunk)
    return b"".join(parts)

close()

Close the underlying sync connection.

Source code in httpclient/httpclient.py
def close(self) -> None:
    """Close the underlying sync connection."""
    if self._closed:
        return
    self._closed = True
    # Tier 2: best-effort observable -- active streaming resource
    if self._sync_resp is not None:
        try:
            self._sync_resp.close()
        except Exception:
            logger.debug(
                "failed to close sync response for %s",
                self.url,
                exc_info=True,
            )
    if self._sync_conn is not None:
        try:
            self._sync_conn.close()
        except Exception:
            logger.debug(
                "failed to close sync connection for %s",
                self.url,
                exc_info=True,
            )

aclose() async

Close the underlying async connection.

Source code in httpclient/httpclient.py
async def aclose(self) -> None:
    """Close the underlying async connection."""
    if self._closed:
        return
    self._closed = True
    # Tier 2: best-effort observable -- active streaming resource
    if self._async_writer is not None:
        try:
            self._async_writer.close()
            await self._async_writer.wait_closed()
        except Exception:
            logger.debug(
                "failed to close async writer for %s",
                self.url,
                exc_info=True,
            )

Client

Synchronous HTTP client session with connection pooling.

Thread-safe: the underlying connection pool uses its own threading.Lock to protect shared state.

Usage::

with Client(headers={"Authorization": "Bearer token"}) as c:
    r = c.get("https://api.example.com/data")
Source code in httpclient/httpclient.py
class Client:
    """Synchronous HTTP client session with connection pooling.

    Thread-safe: the underlying connection pool uses its own
    ``threading.Lock`` to protect shared state.

    Usage::

        with Client(headers={"Authorization": "Bearer token"}) as c:
            r = c.get("https://api.example.com/data")
    """

    def __init__(
        self,
        *,
        headers: dict[str, str] | None = None,
        timeout: float = DEFAULT_TIMEOUT,
        max_redirects: int = DEFAULT_MAX_REDIRECTS,
        verify: bool = True,
        auth: tuple[str, str] | Auth | None = None,
        proxy: str | None = None,
        pool_size: int = DEFAULT_POOL_SIZE,
    ) -> None:
        self._base_headers = headers or {}
        self._timeout = timeout
        self._max_redirects = max_redirects
        self._verify = verify
        self._auth = auth
        self._proxy = proxy
        self._pool = _SyncConnectionPool(pool_size)

    def request(
        self,
        method: str,
        url: str,
        **kwargs: Any,
    ) -> Response | StreamingResponse:
        """Send an HTTP request."""
        kwargs.setdefault("timeout", self._timeout)
        kwargs.setdefault("max_redirects", self._max_redirects)
        kwargs.setdefault("verify", self._verify)
        kwargs.setdefault("auth", self._auth)
        kwargs.setdefault("proxy", self._proxy)
        kwargs["_pool"] = self._pool
        kwargs["headers"] = _merge_headers(self._base_headers, kwargs.get("headers"))
        return _sync_request(method, url, **kwargs)

    def get(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("GET", url, **kwargs)

    def post(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("POST", url, **kwargs)

    def put(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("PUT", url, **kwargs)

    def patch(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("PATCH", url, **kwargs)

    def delete(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("DELETE", url, **kwargs)

    def head(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("HEAD", url, **kwargs)

    def options(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return self.request("OPTIONS", url, **kwargs)

    def close(self) -> None:
        """Close all pooled connections."""
        self._pool.close_all()

    def __enter__(self) -> Client:
        return self

    def __exit__(self, *args: Any) -> None:
        self._pool.close_all()

request(method, url, **kwargs)

Send an HTTP request.

Source code in httpclient/httpclient.py
def request(
    self,
    method: str,
    url: str,
    **kwargs: Any,
) -> Response | StreamingResponse:
    """Send an HTTP request."""
    kwargs.setdefault("timeout", self._timeout)
    kwargs.setdefault("max_redirects", self._max_redirects)
    kwargs.setdefault("verify", self._verify)
    kwargs.setdefault("auth", self._auth)
    kwargs.setdefault("proxy", self._proxy)
    kwargs["_pool"] = self._pool
    kwargs["headers"] = _merge_headers(self._base_headers, kwargs.get("headers"))
    return _sync_request(method, url, **kwargs)

close()

Close all pooled connections.

Source code in httpclient/httpclient.py
def close(self) -> None:
    """Close all pooled connections."""
    self._pool.close_all()

AsyncClient

Asynchronous HTTP client session with connection pooling.

Safe for concurrent use from multiple asyncio tasks. The underlying connection pool uses its own asyncio.Lock to protect shared state.

Usage::

async with AsyncClient(headers={"Authorization": "Bearer token"}) as c:
    r = await c.get("https://api.example.com/data")
Source code in httpclient/httpclient.py
class AsyncClient:
    """Asynchronous HTTP client session with connection pooling.

    Safe for concurrent use from multiple asyncio tasks.  The underlying
    connection pool uses its own ``asyncio.Lock`` to protect shared state.

    Usage::

        async with AsyncClient(headers={"Authorization": "Bearer token"}) as c:
            r = await c.get("https://api.example.com/data")
    """

    def __init__(
        self,
        *,
        headers: dict[str, str] | None = None,
        timeout: float = DEFAULT_TIMEOUT,
        max_redirects: int = DEFAULT_MAX_REDIRECTS,
        verify: bool = True,
        auth: tuple[str, str] | Auth | None = None,
        proxy: str | None = None,
        pool_size: int = DEFAULT_POOL_SIZE,
    ) -> None:
        self._base_headers = headers or {}
        self._timeout = timeout
        self._max_redirects = max_redirects
        self._verify = verify
        self._auth = auth
        self._proxy = proxy
        self._pool = _AsyncConnectionPool(pool_size)

    async def request(
        self,
        method: str,
        url: str,
        **kwargs: Any,
    ) -> Response | StreamingResponse:
        """Send an async HTTP request."""
        kwargs.setdefault("timeout", self._timeout)
        kwargs.setdefault("max_redirects", self._max_redirects)
        kwargs.setdefault("verify", self._verify)
        kwargs.setdefault("auth", self._auth)
        kwargs.setdefault("proxy", self._proxy)
        kwargs["_pool"] = self._pool
        kwargs["headers"] = _merge_headers(self._base_headers, kwargs.get("headers"))
        return await _async_request(method, url, **kwargs)

    async def get(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("GET", url, **kwargs)

    async def post(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("POST", url, **kwargs)

    async def put(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("PUT", url, **kwargs)

    async def patch(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("PATCH", url, **kwargs)

    async def delete(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("DELETE", url, **kwargs)

    async def head(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("HEAD", url, **kwargs)

    async def options(self, url: str, **kwargs: Any) -> Response | StreamingResponse:
        return await self.request("OPTIONS", url, **kwargs)

    async def aclose(self) -> None:
        """Close all pooled connections."""
        await self._pool.close_all()

    async def __aenter__(self) -> AsyncClient:
        return self

    async def __aexit__(self, *args: Any) -> None:
        await self._pool.close_all()

request(method, url, **kwargs) async

Send an async HTTP request.

Source code in httpclient/httpclient.py
async def request(
    self,
    method: str,
    url: str,
    **kwargs: Any,
) -> Response | StreamingResponse:
    """Send an async HTTP request."""
    kwargs.setdefault("timeout", self._timeout)
    kwargs.setdefault("max_redirects", self._max_redirects)
    kwargs.setdefault("verify", self._verify)
    kwargs.setdefault("auth", self._auth)
    kwargs.setdefault("proxy", self._proxy)
    kwargs["_pool"] = self._pool
    kwargs["headers"] = _merge_headers(self._base_headers, kwargs.get("headers"))
    return await _async_request(method, url, **kwargs)

aclose() async

Close all pooled connections.

Source code in httpclient/httpclient.py
async def aclose(self) -> None:
    """Close all pooled connections."""
    await self._pool.close_all()

get(url, **kwargs)

Send a GET request.

Source code in httpclient/httpclient.py
def get(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send a GET request."""
    return _sync_request("GET", url, **kwargs)

post(url, **kwargs)

Send a POST request.

Source code in httpclient/httpclient.py
def post(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send a POST request."""
    return _sync_request("POST", url, **kwargs)

put(url, **kwargs)

Send a PUT request.

Source code in httpclient/httpclient.py
def put(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send a PUT request."""
    return _sync_request("PUT", url, **kwargs)

patch(url, **kwargs)

Send a PATCH request.

Source code in httpclient/httpclient.py
def patch(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send a PATCH request."""
    return _sync_request("PATCH", url, **kwargs)

delete(url, **kwargs)

Send a DELETE request.

Source code in httpclient/httpclient.py
def delete(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send a DELETE request."""
    return _sync_request("DELETE", url, **kwargs)

head(url, **kwargs)

Send a HEAD request.

Source code in httpclient/httpclient.py
def head(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send a HEAD request."""
    return _sync_request("HEAD", url, **kwargs)

options(url, **kwargs)

Send an OPTIONS request.

Source code in httpclient/httpclient.py
def options(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an OPTIONS request."""
    return _sync_request("OPTIONS", url, **kwargs)

async_get(url, **kwargs) async

Send an async GET request.

Source code in httpclient/httpclient.py
async def async_get(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async GET request."""
    return await _async_request("GET", url, **kwargs)

async_post(url, **kwargs) async

Send an async POST request.

Source code in httpclient/httpclient.py
async def async_post(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async POST request."""
    return await _async_request("POST", url, **kwargs)

async_put(url, **kwargs) async

Send an async PUT request.

Source code in httpclient/httpclient.py
async def async_put(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async PUT request."""
    return await _async_request("PUT", url, **kwargs)

async_patch(url, **kwargs) async

Send an async PATCH request.

Source code in httpclient/httpclient.py
async def async_patch(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async PATCH request."""
    return await _async_request("PATCH", url, **kwargs)

async_delete(url, **kwargs) async

Send an async DELETE request.

Source code in httpclient/httpclient.py
async def async_delete(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async DELETE request."""
    return await _async_request("DELETE", url, **kwargs)

async_head(url, **kwargs) async

Send an async HEAD request.

Source code in httpclient/httpclient.py
async def async_head(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async HEAD request."""
    return await _async_request("HEAD", url, **kwargs)

async_options(url, **kwargs) async

Send an async OPTIONS request.

Source code in httpclient/httpclient.py
async def async_options(url: str, **kwargs: Any) -> Response | StreamingResponse:
    """Send an async OPTIONS request."""
    return await _async_request("OPTIONS", url, **kwargs)