Source code for pons.http_provider

"""HTTP provider based on `httpx`."""

from collections.abc import AsyncIterator, Mapping
from contextlib import asynccontextmanager
from http import HTTPStatus
from json import JSONDecodeError
from typing import cast

import httpx
from compages import StructuringError
from ethereum_rpc import RPCError, structure

from ._provider import (
    RPC_JSON,
    InvalidResponse,
    ProtocolError,
    Provider,
    ProviderError,
    ProviderSession,
    Unreachable,
)

__all__ = ["HTTPError", "HTTPProvider"]


[docs] class HTTPError(ProtocolError): """ Raised when the provider returns a response with a status code other than 200, and no ``"error"`` field in the associated JSON data. """ status: HTTPStatus """The HTTP status of the response.""" message: str """The response body.""" def __init__(self, status_code: int, message: str): try: status = HTTPStatus(status_code) except ValueError: # pragma: no cover # How to handle it better? Ideally, `httpx` should have returned a parsed status # in the first place, but, alas, it just gives us an integer. status = HTTPStatus.INTERNAL_SERVER_ERROR self.status = status self.message = message def __str__(self) -> str: # noqa: D105 return f"HTTP status {self.status}: {self.message}"
[docs] class HTTPProvider(Provider): """A provider for RPC via HTTP(S).""" def __init__(self, url: str): self._url = url @asynccontextmanager async def session(self) -> AsyncIterator["HTTPProviderSession"]: # noqa: D102 async with httpx.AsyncClient() as client: yield HTTPProviderSession(self._url, client)
class HTTPProviderSession(ProviderSession): def __init__(self, url: str, http_client: httpx.AsyncClient): self._url = url self._client = http_client def _prepare_request(self, method: str, *args: RPC_JSON) -> RPC_JSON: return {"jsonrpc": "2.0", "method": method, "params": args, "id": 0} async def rpc(self, method: str, *args: RPC_JSON) -> RPC_JSON: json = self._prepare_request(method, *args) try: response = await self._client.post(self._url, json=json) except httpx.ConnectError as exc: raise ProviderError(Unreachable(str(exc))) from exc status = response.status_code try: response_json = response.json() except JSONDecodeError as exc: content = response.content.decode() raise ProviderError( InvalidResponse(f"Expected a JSON response, got HTTP status {status}: {content}") ) from exc if not isinstance(response_json, Mapping): raise ProviderError( InvalidResponse(f"RPC response must be a dictionary, got: {response_json}") ) response_json = cast("Mapping[str, RPC_JSON]", response_json) # Note that the Eth-side errors (e.g. transaction having been reverted) # will have the HTTP status 200, so we are checking for the "error" field first. if "error" in response_json: try: error = structure(RPCError, response_json["error"]) except StructuringError as exc: raise ProviderError( InvalidResponse(f"Failed to parse an error response: {response_json}") ) from exc raise ProviderError(error) if status == HTTPStatus.OK: if "result" in response_json: return response_json["result"] raise ProviderError( InvalidResponse(f"`result` is not present in the response: {response_json}") ) raise ProviderError(HTTPError(status, response.content.decode()))