Source code for req2.adapters

"""Adapter implementations for :mod:`req2` sessions."""

from __future__ import annotations

import io
from tempfile import SpooledTemporaryFile
from typing import Any

import pycurl

from .exceptions import map_pycurl_error
from .models import PreparedRequest, Response
from .utils import CaseInsensitiveDict


class _CurlHandlePool:
    """Maintain a small pool of reusable :class:`pycurl.Curl` handles."""

    def __init__(self, maxsize: int = 10) -> None:
        self._available: list[pycurl.Curl] = []
        self._maxsize = maxsize

    def acquire(self) -> pycurl.Curl:
        if self._available:
            curl = self._available.pop()
        else:
            curl = pycurl.Curl()
        curl.setopt(pycurl.NOSIGNAL, 1)
        return curl

    def release(self, curl: pycurl.Curl, *, reusable: bool = True) -> None:
        if not reusable:
            curl.close()
            return

        try:
            curl.reset()
        except AttributeError:
            # Older versions of pycurl may not provide ``reset``; fall back to
            # closing the handle in those rare cases.
            curl.close()
            return

        curl.setopt(pycurl.NOSIGNAL, 1)
        if len(self._available) >= self._maxsize:
            curl.close()
        else:
            self._available.append(curl)

    def close(self) -> None:
        while self._available:
            curl = self._available.pop()
            curl.close()


[docs] class HTTPAdapter: """Basic HTTP adapter that drives requests through :mod:`pycurl`.""" def __init__( self, *, pool_connections: int = 10, pool_maxsize: int | None = None, spool_max_size: int = 1024 * 1024, ) -> None: maxsize = pool_maxsize if pool_maxsize is not None else pool_connections self._pool = _CurlHandlePool(maxsize=maxsize) self._spool_max_size = spool_max_size # ------------------------------------------------------------------ # Public API expected by :class:`req2.sessions.Session` # ------------------------------------------------------------------
[docs] def send( self, request: PreparedRequest, *, timeout: float | tuple[float | None, float | None] | None = None, verify: bool | str = True, cert: str | tuple[str, str] | None = None, stream: bool = False, proxies: dict[str, str] | None = None, auth: tuple[str, str] | None = None, ) -> tuple[Response, list[tuple[str, str]]]: curl = self._pool.acquire() header_blocks: list[list[str]] = [] current_block: list[str] = [] if stream: body_buffer: io.BufferedRandom = SpooledTemporaryFile( max_size=self._spool_max_size, mode="w+b" ) else: body_buffer = io.BytesIO() def header_function(line: bytes) -> None: text = line.decode("iso-8859-1") if text.startswith("HTTP/"): if current_block: header_blocks.append(current_block.copy()) current_block.clear() current_block.append(text.strip()) elif text in ("\r\n", "\n"): if current_block: header_blocks.append(current_block.copy()) current_block.clear() else: current_block.append(text.strip()) try: self._configure_handle( curl, request, body_buffer, header_function, timeout=timeout, verify=verify, cert=cert, proxies=proxies, auth=auth, ) curl.perform() if current_block: header_blocks.append(current_block.copy()) response_code = curl.getinfo(pycurl.RESPONSE_CODE) effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) timings = { "dns": curl.getinfo(pycurl.NAMELOOKUP_TIME), "tcp_connect": curl.getinfo(pycurl.CONNECT_TIME), "ssl_handshake": curl.getinfo(pycurl.APPCONNECT_TIME), "pretransfer": curl.getinfo(pycurl.PRETRANSFER_TIME), "ttfb": curl.getinfo(pycurl.STARTTRANSFER_TIME), "total": curl.getinfo(pycurl.TOTAL_TIME), } except pycurl.error as exc: self._pool.release(curl, reusable=False) body_buffer.close() _, code = exc.args raise map_pycurl_error(code) else: self._pool.release(curl, reusable=True) body_buffer.seek(0) header_map, header_pairs, reason = self._parse_headers(header_blocks) ttfb = timings["ttfb"] total = timings["total"] timings["content_transfer"] = max(total - ttfb, 0.0) if stream: raw_body = body_buffer content_bytes: bytes | None = None else: content_bytes = body_buffer.read() raw_body = io.BytesIO(content_bytes) body_buffer.close() raw_body.seek(0) response = Response( status_code=response_code, headers=header_map, raw=raw_body, content=content_bytes, url=effective_url, reason=reason, elapsed_seconds=total, timings=timings, request=request, stream_consumed=not stream, ) return response, header_pairs
[docs] def close(self) -> None: self._pool.close()
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _configure_handle( self, curl: pycurl.Curl, request: PreparedRequest, body_buffer: io.BufferedIOBase, header_function: Any, *, timeout: float | tuple[float | None, float | None] | None, verify: bool | str, cert: str | tuple[str, str] | None, proxies: dict[str, str] | None, auth: tuple[str, str] | None, ) -> None: curl.setopt(pycurl.URL, request.url) curl.setopt(pycurl.NOPROGRESS, True) curl.setopt(pycurl.WRITEDATA, body_buffer) curl.setopt(pycurl.HEADERFUNCTION, header_function) curl.setopt(pycurl.ACCEPT_ENCODING, "") self._apply_method(curl, request) if request.headers: header_list = [f"{k}: {v}" for k, v in request.headers.items()] curl.setopt(pycurl.HTTPHEADER, header_list) if timeout is not None: if isinstance(timeout, tuple): connect, read = timeout if connect is not None: curl.setopt(pycurl.CONNECTTIMEOUT, float(connect)) if read is not None: curl.setopt(pycurl.TIMEOUT, float(read)) else: curl.setopt(pycurl.TIMEOUT, float(timeout)) if isinstance(verify, str): curl.setopt(pycurl.CAINFO, verify) elif not verify: curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) if cert: if isinstance(cert, tuple): cert_file, key_file = cert curl.setopt(pycurl.SSLCERT, cert_file) curl.setopt(pycurl.SSLKEY, key_file) else: curl.setopt(pycurl.SSLCERT, cert) if auth and len(auth) == 2: user, password = auth curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC | pycurl.HTTPAUTH_DIGEST) curl.setopt(pycurl.USERPWD, f"{user}:{password}") if proxies: scheme = request.url.split(":", 1)[0].lower() proxy = proxies.get(scheme) if proxy: curl.setopt(pycurl.PROXY, proxy) def _apply_method(self, curl: pycurl.Curl, request: PreparedRequest) -> None: method = request.method.upper() if method == "GET": curl.setopt(pycurl.HTTPGET, True) elif method == "POST": curl.setopt(pycurl.POST, True) if request.body is not None: curl.setopt(pycurl.POSTFIELDS, request.body) elif method == "HEAD": curl.setopt(pycurl.NOBODY, True) curl.setopt(pycurl.CUSTOMREQUEST, "HEAD") elif method in {"PUT", "PATCH", "DELETE"}: curl.setopt(pycurl.CUSTOMREQUEST, method) if request.body is not None: curl.setopt(pycurl.POSTFIELDS, request.body) else: curl.setopt(pycurl.CUSTOMREQUEST, method) if request.body is not None: curl.setopt(pycurl.POSTFIELDS, request.body) def _parse_headers( self, header_blocks: list[list[str]] ) -> tuple[CaseInsensitiveDict, list[tuple[str, str]], str]: headers = CaseInsensitiveDict() header_pairs: list[tuple[str, str]] = [] reason = "" for block in header_blocks: if not block: continue status_line = block[0] if status_line.startswith("HTTP/"): parts = status_line.split(" ", 2) if len(parts) >= 3: reason = parts[2].strip() elif len(parts) == 2: reason = parts[1].strip() else: reason = "" for header in block[1:]: if ":" not in header: continue name, value = header.split(":", 1) name = name.strip() value = value.strip() headers[name] = value header_pairs.append((name, value)) return headers, header_pairs, reason
AdapterType = HTTPAdapter