import inspect
from collections.abc import Iterable, Iterator, Mapping, Sequence
from collections.abc import Set as AbstractSet
from enum import Enum
from functools import cached_property
from inspect import BoundArguments
from itertools import chain
from keyword import iskeyword
from typing import Any, Generic, TypeVar, cast
from ethereum_rpc import LogEntry, LogTopic, keccak
from . import abi
from ._abi_types import ABI_JSON, Type, decode_args, dispatch_parameter_types, encode_args
# Anonymous events can have at most 4 indexed fields
ANONYMOUS_EVENT_INDEXED_FIELDS = 4
# Anonymous events can have at most 4 indexed fields
EVENT_INDEXED_FIELDS = 3
# The number of bytes in a function selector.
SELECTOR_LENGTH = 4
[docs]
class FieldValues:
"""
A container for field values of an event, error, or a method return.
Since Solidity allows fields at arbitrary positions to be anonymous,
a dictionary cannot handle all the possibilities.
"""
def __init__(self, values: Sequence[tuple[str | None, Any]]):
names = [name for name, _value in values if name is not None]
if len(names) != len(set(names)):
raise ValueError("The values cannot have repeating names")
self._values_seq = values
self._values_dict = {name: value for name, value in values if name is not None}
self._representable_as_dict = len(names) == len(self._values_seq)
@property
def as_dict(self) -> dict[str, Any]:
"""
Returns the equivalent dictionary representation.
Raises ``ValueError`` if there are anonymous fields present.
"""
if not self._representable_as_dict:
raise ValueError(
"This structure has some anonymous fields "
"and therefore is not representable as a `dict`"
)
return self._values_dict
@cached_property
def as_tuple(self) -> tuple[Any, ...]:
"""
Returns the equivalent tuple representation
(a tuple of the values with the field names omitted).
"""
return tuple(item for _name, item in self._values_seq)
[docs]
def __getitem__(self, name: str) -> Any:
"""Returns the value with the given name."""
return self._values_dict[name]
[docs]
def __getattr__(self, name: str) -> Any:
"""Returns the value with the given name."""
return self._values_dict[name]
def __repr__(self) -> str:
return f"FieldValues({self._values_seq!r})"
[docs]
class Fields:
"""
Describes a sequence of optionally named typed values.
These can be method parameters, method outputs, error fields,
or event fields.
"""
names: tuple[str | None, ...]
"""Field names."""
types: tuple[Type, ...]
"""Field types."""
def __init__(
self, fields: Mapping[str, Type] | Sequence[Type] | Sequence[tuple[str | None, Type]]
):
names: tuple[str | None, ...]
if isinstance(fields, Mapping):
names = tuple(fields)
types = tuple(fields.values())
elif all(isinstance(elem, Type) for elem in fields):
fields = cast("Sequence[Type]", fields)
names = tuple(None for tp in fields)
types = tuple(fields)
else:
fields = cast("Sequence[tuple[str | None, Type]]", fields)
names = tuple(name for name, _tp in fields)
types = tuple(tp for _name, tp in fields)
self.names = names
self.types = types
@cached_property
def named_fields(self) -> set[str]:
return {name for name in self.names if name is not None}
@cached_property
def as_signature(self) -> inspect.Signature:
"""
Returns the fields represented as a signature.
.. note::
In Solidity, it is possible to have named and anonymous method parameters
or event/error fields in arbitrary order.
This cannot be mapped to Python function signatures.
Also it is possible that some parameter names are Python keywords,
so they will be rejected by the Signature constructor.
So the keyword names will be postfixed with a `_`,
and anonymous fields will be given auto-generated names.
"""
# Keep as many original names as possible
existing_names = {name for name in self.names if name is not None and not iskeyword(name)}
safe_names = []
disambiguation_counter = 1
for arg_num, name in enumerate(self.names):
if name is None:
base_name = "_" + str(arg_num + 1)
elif iskeyword(name):
base_name = name + "_"
else:
safe_names.append(name)
continue
# Since we renamed an existing name, there can potentially be
# an existing one equal to it.
safe_name = base_name
while safe_name in existing_names:
safe_name = base_name + "_" + str(disambiguation_counter)
disambiguation_counter += 1
safe_names.append(safe_name)
return inspect.Signature(
parameters=[
inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD)
for name in safe_names
]
)
@cached_property
def canonical_form(self) -> str:
"""Returns the field types serialized in the canonical form as a string."""
return "(" + ",".join(tp.canonical_form for tp in self.types) + ")"
[docs]
def encode(self, values: Iterable[Any]) -> bytes:
"""Encodes the given position values into bytes according to field types."""
return encode_args(*zip(self.types, values, strict=True))
[docs]
def decode(self, value_bytes: bytes) -> FieldValues:
"""
Decodes the packed bytestring into a list of pairs
of the original parameter/field name and the value.
"""
return FieldValues(list(zip(self.names, decode_args(self.types, value_bytes), strict=True)))
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
args = []
for name, tp in zip(self.names, self.types, strict=True):
args.append(
{
"name": name if name is not None else "",
"type": tp.canonical_form,
}
)
return args
def __str__(self) -> str:
fields = ", ".join(
tp.canonical_form + ((" " + name) if name is not None else "")
for name, tp in zip(self.names, self.types, strict=True)
)
return f"({fields})"
class Either:
"""Denotes an `OR` operation when filtering events."""
def __init__(self, *items: Any):
self.items = items
[docs]
class EventFields(Fields):
"""Fields of an event structure."""
indexed: tuple[bool, ...]
"""A sequence indicating whether the field at the given position is indexed."""
def __init__(
self,
fields: Mapping[str, Type] | Sequence[Type] | Sequence[tuple[str | None, Type]],
indexed: AbstractSet[str] | Sequence[bool],
):
super().__init__(fields)
self._signature = self.as_signature
# Unique names for each field, will be used for internal field identification.
self._safe_names = tuple(self._signature.parameters)
if isinstance(indexed, AbstractSet):
if not set(indexed).issubset(self.named_fields):
raise ValueError("All the names in `indexed` must be present in the fields list")
indexed_seq = tuple(name in indexed for name in self._signature.parameters)
else:
indexed_seq = tuple(indexed)
if len(indexed_seq) != len(self.names):
raise ValueError(
"If `indexed` is a sequence of booleans, "
"its length must match the number of fields"
)
self.indexed = indexed_seq
# Need to preserve the order of the names that was declared when creating the signature.
self._indexed_names = [
name for name, indexed in zip(self._safe_names, indexed_seq, strict=True) if indexed
]
self._indexed_types = [
tp for tp, indexed in zip(self.types, indexed_seq, strict=True) if indexed
]
self._nonindexed_names = [
name for name, indexed in zip(self._safe_names, indexed_seq, strict=True) if not indexed
]
self._nonindexed_types = [
tp for tp, indexed in zip(self.types, indexed_seq, strict=True) if not indexed
]
[docs]
def encode_to_topics(self, *args: Any, **kwargs: Any) -> tuple[None | tuple[bytes, ...], ...]:
"""
Binds given arguments to event's indexed parameters
and encodes them as log topics.
.. note::
If keyword arguments are used, any field names that matched Python keywords
need to be postfixed by a `_`.
"""
bound_args = self._signature.bind_partial(*args, **kwargs)
encoded_topics: list[None | tuple[bytes, ...]] = []
for safe_name, tp in zip(self._safe_names, self.types, strict=True):
if safe_name not in bound_args.arguments:
encoded_topics.append(None)
continue
bound_val = bound_args.arguments[safe_name]
if isinstance(bound_val, Either):
encoded_val = tuple(tp.encode_to_topic(elem) for elem in bound_val.items)
else:
# Make it a one-element tuple to simplify type signatures.
encoded_val = (tp.encode_to_topic(bound_val),)
encoded_topics.append(encoded_val)
# remove trailing `None`s - they are redundant
while encoded_topics and encoded_topics[-1] is None:
encoded_topics.pop()
return tuple(encoded_topics)
[docs]
def decode_log_entry(self, topics: Sequence[bytes], data: bytes) -> FieldValues:
"""Decodes the event fields from the given log entry data."""
if len(topics) != len(self._indexed_names):
raise ValueError(
f"The number of topics in the log entry ({len(topics)}) does not match "
f"the number of indexed fields in the event ({len(self._indexed_names)})"
)
decoded_topics: dict[str, Any] = {
name: tp.decode_from_topic(topic)
for name, tp, topic in zip(
self._indexed_names, self._indexed_types, topics, strict=True
)
}
decoded_data_tuple = decode_args(self._nonindexed_types, data)
decoded_nonindexed = dict(zip(self._nonindexed_names, decoded_data_tuple, strict=True))
# Assemble preserving the field order
decoded_data = []
for safe_name, name in zip(self._safe_names, self.names, strict=True):
if safe_name in decoded_topics:
decoded_data.append((name, decoded_topics[safe_name]))
else:
decoded_data.append((name, decoded_nonindexed[safe_name]))
return FieldValues(decoded_data)
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
args: list[ABI_JSON] = []
for name, tp, indexed in zip(self.names, self.types, self.indexed, strict=True):
args.append(
{
"indexed": indexed,
"name": name if name is not None else "",
"type": tp.canonical_form,
}
)
return args
def __str__(self) -> str:
params = []
for name, tp, indexed in zip(self.names, self.types, self.indexed, strict=True):
indexed_str = " indexed" if indexed else ""
name_str = (" " + name) if name is not None else ""
params.append(f"{tp.canonical_form}{indexed_str}{name_str}")
return "(" + ", ".join(params) + ")"
[docs]
class Constructor:
"""
Contract constructor.
.. note::
If the name of a parameter given to the constructor matches a Python keyword,
``_`` will be appended to it.
"""
inputs: Fields
"""Input signature."""
payable: bool
"""Whether this method is marked as payable"""
[docs]
@classmethod
def from_json(cls, method_entry: ABI_JSON) -> "Constructor":
# TODO (#83): use proper validation
method_entry_typed = cast("Mapping[str, ABI_JSON]", method_entry)
"""Creates this object from a JSON ABI method entry."""
if method_entry_typed["type"] != "constructor":
raise ValueError(
"Constructor object must be created from a JSON entry with type='constructor'"
)
if "name" in method_entry_typed:
raise ValueError("Constructor's JSON entry cannot have a `name`")
if method_entry_typed.get("outputs"):
raise ValueError("Constructor's JSON entry cannot have non-empty `outputs`")
if method_entry_typed["stateMutability"] not in ("nonpayable", "payable"):
raise ValueError(
"Constructor's JSON entry state mutability must be `nonpayable` or `payable`"
)
inputs = dispatch_parameter_types(method_entry_typed.get("inputs", []))
payable = method_entry_typed["stateMutability"] == "payable"
return cls(inputs, payable=payable)
def __init__(
self,
inputs: Mapping[str, Type] | Sequence[tuple[str | None, Type]],
*,
payable: bool = False,
):
self.inputs = Fields(inputs)
self._inputs_signature = self.inputs.as_signature
self.payable = payable
[docs]
def __call__(self, *args: Any, **kwargs: Any) -> "ConstructorCall":
"""Returns an encoded call with given arguments."""
input_bytes = self.inputs.encode(self._inputs_signature.bind(*args, **kwargs).args)
return ConstructorCall(input_bytes)
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
return {
"type": "constructor",
"stateMutability": "payable" if self.payable else "nonpayable",
"inputs": self.inputs.to_json(),
}
def __str__(self) -> str:
return f"constructor{self.inputs} " + ("payable" if self.payable else "nonpayable")
[docs]
class Mutability(Enum):
"""Possible states of a contract's method mutability."""
PURE = "pure"
"""Solidity's ``pure`` (does not read or write the contract state)."""
VIEW = "view"
"""Solidity's ``view`` (may read the contract state)."""
NONPAYABLE = "nonpayable"
"""Solidity's ``nonpayable`` (may write the contract state)."""
PAYABLE = "payable"
"""
Solidity's ``payable`` (may write the contract state
and accept associated funds with transactions).
"""
[docs]
@classmethod
def from_json(cls, entry: ABI_JSON) -> "Mutability":
# TODO (#83): use proper validation
entry_typed = cast("str", entry)
values = dict(
pure=Mutability.PURE,
view=Mutability.VIEW,
nonpayable=Mutability.NONPAYABLE,
payable=Mutability.PAYABLE,
)
if entry_typed not in values:
raise ValueError(f"Unknown mutability identifier: {entry}")
return values[entry_typed]
@property
def payable(self) -> bool:
return self == Mutability.PAYABLE
@property
def mutating(self) -> bool:
return self in {Mutability.PAYABLE, Mutability.NONPAYABLE}
[docs]
class Method:
"""
A contract method.
.. note::
If the name of a parameter (input or output) given to the constructor
matches a Python keyword, ``_`` will be appended to it.
"""
name: str
"""The name of this method."""
inputs: Fields
"""The input signature of this method."""
outputs: Fields
"""The output signature of this method."""
payable: bool
"""Whether this method is marked as payable."""
mutating: bool
"""Whether this method may mutate the contract state."""
[docs]
@classmethod
def from_json(cls, method_entry: ABI_JSON) -> "Method":
"""Creates this object from a JSON ABI method entry."""
# TODO (#83): use proper validation
method_entry_typed = cast("Mapping[str, Any]", method_entry)
if method_entry_typed["type"] != "function":
raise ValueError("Method object must be created from a JSON entry with type='function'")
name = method_entry_typed["name"]
inputs = dispatch_parameter_types(method_entry_typed["inputs"])
mutability = Mutability.from_json(method_entry_typed["stateMutability"])
outputs: None | dict[str, Type] | list[tuple[str | None, Type]]
if "outputs" not in method_entry_typed:
outputs = None
else:
outputs = dispatch_parameter_types(method_entry_typed["outputs"])
return cls(name=name, inputs=inputs, outputs=outputs, mutability=mutability)
def __init__(
self,
name: str,
mutability: Mutability,
inputs: Mapping[str, Type] | Sequence[Type] | Sequence[tuple[str | None, Type]],
outputs: None
| Mapping[str, Type]
| Sequence[Type]
| Sequence[tuple[str | None, Type]]
| Type = None,
):
self.name = name
self.inputs = Fields(inputs)
self._inputs_signature = self.inputs.as_signature
self._mutability = mutability
self.payable = mutability.payable
self.mutating = mutability.mutating
if outputs is None:
outputs = []
if isinstance(outputs, Type):
outputs = [(None, outputs)]
self.outputs = Fields(outputs)
[docs]
def bind(self, *args: Any, **kwargs: Any) -> BoundArguments:
"""Binds the given arguments to the method's signature."""
return self._inputs_signature.bind(*args, **kwargs)
[docs]
def __call__(self, *args: Any, **kwargs: Any) -> "MethodCall":
"""Returns an encoded call with given arguments."""
bound_args = self.bind(*args, **kwargs)
return self.call_bound(bound_args)
[docs]
def call_bound(self, bound_args: BoundArguments) -> "MethodCall":
"""Creates a method call object using previouosly bound arguments."""
input_bytes = self.inputs.encode(bound_args.args)
encoded = self.selector + input_bytes
return MethodCall(self, encoded)
@cached_property
def selector(self) -> bytes:
"""Method's selector."""
return keccak(self.name.encode() + self.inputs.canonical_form.encode())[:SELECTOR_LENGTH]
[docs]
def decode_output(self, output_bytes: bytes) -> Any:
"""
Decodes the output from ABI-packed bytes.
If there is only a single output, its value is returned.
If all the fields in the output are unnamed, it is returned as a tuple of values.
Otherwise it is returned as a :py:class:`FieldValues` object.
"""
results = self.outputs.decode(output_bytes)
if len(self.outputs.names) == 1:
return results.as_tuple[0]
if all(name is None for name in self.outputs.names):
return results.as_tuple
return results
[docs]
def with_method(self, method: "Method") -> "MultiMethod":
"""Returns a multimethod resulting from joining this method with `method`."""
return MultiMethod(self, method)
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
return {
"type": "function",
"name": self.name,
"stateMutability": self._mutability.value,
"inputs": self.inputs.to_json(),
"outputs": self.outputs.to_json(),
}
def __str__(self) -> str:
returns = "" if not self.outputs.names else f" returns {self.outputs}"
return f"function {self.name}{self.inputs} {self._mutability.value}{returns}"
[docs]
class MultiMethod:
"""
An overloaded contract method, containing several :py:class:`Method` objects with the same name
but different input signatures.
"""
def __init__(self, *methods: Method):
if len(methods) == 0:
raise ValueError("`methods` cannot be empty")
first_method = methods[0]
self._methods = {first_method.inputs.canonical_form: first_method}
self._name = first_method.name
for method in methods[1:]:
self._add_method(method)
def __getitem__(self, args: str) -> Method:
"""
Returns the :py:class:`Method` with the given canonical form of an input signature
(corresponding to :py:attr:`Fields.canonical_form`).
"""
return self._methods[args]
@property
def name(self) -> str:
"""The name of this method."""
return self._name
@property
def methods(self) -> dict[str, Method]:
"""All the overloaded methods, indexed by the canonical form of their input signatures."""
return self._methods
def _add_method(self, method: Method) -> None:
if method.name != self.name:
raise ValueError("All overloaded methods must have the same name")
if method.inputs.canonical_form in self._methods:
raise ValueError(
f"A method {self.name}{method.inputs.canonical_form} "
"is already registered in this MultiMethod"
)
self._methods[method.inputs.canonical_form] = method
[docs]
def with_method(self, method: Method) -> "MultiMethod":
"""Returns a new ``MultiMethod`` with the given method included."""
new_mm = MultiMethod(*self._methods.values())
new_mm._add_method(method)
return new_mm
[docs]
def __call__(self, *args: Any, **kwds: Any) -> "MethodCall":
"""Returns an encoded call with given arguments."""
for method in self._methods.values():
try:
bound_args = method.bind(*args, **kwds)
except TypeError:
# If it's a non-overloaded method, we do not want to complicate things
if len(self._methods) == 1:
raise
continue
return method.call_bound(bound_args)
raise TypeError("Could not find a suitable overloaded method for the given arguments")
[docs]
def to_json(self) -> list[ABI_JSON]:
"""Returns this object's JSON ABI."""
return [method.to_json() for method in self._methods.values()]
def __str__(self) -> str:
return "; ".join(str(method) for method in self._methods.values())
[docs]
class Event:
"""A contract event."""
name: str
"""The name of this event."""
fields: EventFields
"""The event fields."""
anonymous: bool
"""Whether the event is anonymous."""
[docs]
@classmethod
def from_json(cls, event_entry: ABI_JSON) -> "Event":
"""Creates this object from a JSON ABI method entry."""
# TODO (#83): use proper validation
event_entry_typed = cast("Mapping[str, Any]", event_entry)
if event_entry_typed["type"] != "event":
raise ValueError("Event object must be created from a JSON entry with type='event'")
name = event_entry_typed["name"]
fields = dispatch_parameter_types(event_entry_typed["inputs"])
indexed = [input_["indexed"] for input_ in event_entry_typed["inputs"]]
return cls(
name=name, fields=fields, indexed=indexed, anonymous=event_entry_typed["anonymous"]
)
def __init__(
self,
name: str,
fields: Mapping[str, Type] | Sequence[tuple[str | None, Type]],
indexed: AbstractSet[str] | Sequence[bool],
*,
anonymous: bool = False,
):
self.name = name
self.fields = EventFields(fields, indexed)
self.anonymous = anonymous
indexed_num = sum(self.fields.indexed)
if anonymous and indexed_num > ANONYMOUS_EVENT_INDEXED_FIELDS:
raise ValueError(
f"Anonymous events can have at most {ANONYMOUS_EVENT_INDEXED_FIELDS} indexed fields"
)
if not anonymous and indexed_num > EVENT_INDEXED_FIELDS:
raise ValueError(
f"Non-anonymous events can have at most {EVENT_INDEXED_FIELDS} indexed fields"
)
@cached_property
def _topic(self) -> LogTopic:
"""The topic representing this event's signature."""
return LogTopic(keccak(self.name.encode() + self.fields.canonical_form.encode()))
def __call__(self, *args: Any, **kwargs: Any) -> "EventFilter":
"""
Creates an event filter from provided values for indexed parameters.
Some arguments can be omitted, which will mean that the filter
will match events with any value of that parameter.
:py:class:`Either` can be used to denote an OR operation and match
either of several values of a parameter.
"""
encoded_topics = self.fields.encode_to_topics(*args, **kwargs)
log_topics: list[None | tuple[LogTopic, ...]] = []
if not self.anonymous:
log_topics.append((self._topic,))
for topic in encoded_topics:
if topic is None:
log_topics.append(None)
else:
log_topics.append(tuple(LogTopic(elem) for elem in topic))
return EventFilter(tuple(log_topics))
[docs]
def decode_log_entry(self, log_entry: LogEntry) -> FieldValues:
"""
Decodes the event fields from the given log entry.
Fields that cannot be decoded (indexed reference types,
which are hashed before saving them to the log) are set to ``None``.
"""
topics = log_entry.topics
if not self.anonymous:
if topics[0] != self._topic:
raise ValueError("This log entry belongs to a different event")
topics = topics[1:]
return self.fields.decode_log_entry([bytes(topic) for topic in topics], log_entry.data)
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
return {
"type": "event",
"name": self.name,
"inputs": self.fields.to_json(),
"anonymous": self.anonymous,
}
def __str__(self) -> str:
return f"event {self.name}{self.fields}" + (" anonymous" if self.anonymous else "")
[docs]
class EventFilter:
"""A filter for events coming from any contract address."""
topics: tuple[None | tuple[LogTopic, ...], ...]
def __init__(self, topics: tuple[None | tuple[LogTopic, ...], ...]):
self.topics = topics
[docs]
class Error:
"""A custom contract error."""
name: str
"""The name of the error structure."""
fields: Fields
"""The fields of the structure."""
[docs]
@classmethod
def from_json(cls, error_entry: ABI_JSON) -> "Error":
"""Creates this object from a JSON ABI method entry."""
# TODO (#83): use proper validation
error_entry_typed = cast("Mapping[str, Any]", error_entry)
if error_entry_typed["type"] != "error":
raise ValueError("Error object must be created from a JSON entry with type='error'")
name = error_entry_typed["name"]
fields = dispatch_parameter_types(error_entry_typed["inputs"])
return cls(name=name, fields=fields)
def __init__(
self,
name: str,
fields: Mapping[str, Type] | Sequence[Type] | Sequence[tuple[str | None, Type]],
):
self.name = name
self._named_fields = isinstance(fields, Mapping)
self.fields = Fields(fields)
@cached_property
def selector(self) -> bytes:
"""Error's selector."""
return keccak(self.name.encode() + self.fields.canonical_form.encode())[:SELECTOR_LENGTH]
[docs]
def decode_fields(self, data_bytes: bytes) -> FieldValues:
"""Decodes the error fields from the given packed data."""
return self.fields.decode(data_bytes)
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
return {
"type": "error",
"name": self.name,
"inputs": self.fields.to_json(),
}
def __str__(self) -> str:
return f"error {self.name}{self.fields}"
[docs]
class Fallback:
"""A fallback method."""
payable: bool
"""Whether this method is marked as payable"""
[docs]
@classmethod
def from_json(cls, method_entry: ABI_JSON) -> "Fallback":
"""Creates this object from a JSON ABI method entry."""
# TODO (#83): use proper validation
method_entry_typed = cast("Mapping[str, ABI_JSON]", method_entry)
if method_entry_typed["type"] != "fallback":
raise ValueError(
"Fallback object must be created from a JSON entry with type='fallback'"
)
if method_entry_typed["stateMutability"] not in ("nonpayable", "payable"):
raise ValueError(
"Fallback method's JSON entry state mutability must be `nonpayable` or `payable`"
)
payable = method_entry_typed["stateMutability"] == "payable"
return cls(payable=payable)
def __init__(self, *, payable: bool = False):
self.payable = payable
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
return {
"type": "fallback",
"stateMutability": "payable" if self.payable else "nonpayable",
}
def __str__(self) -> str:
return "fallback() " + ("payable" if self.payable else "nonpayable")
[docs]
class Receive:
"""A receive method."""
payable: bool
"""Whether this method is marked as payable"""
[docs]
@classmethod
def from_json(cls, method_entry: ABI_JSON) -> "Receive":
"""Creates this object from a JSON ABI method entry."""
# TODO (#83): use proper validation
method_entry_typed = cast("Mapping[str, ABI_JSON]", method_entry)
if method_entry_typed["type"] != "receive":
raise ValueError("Receive object must be created from a JSON entry with type='receive'")
if method_entry_typed["stateMutability"] not in ("nonpayable", "payable"):
raise ValueError(
"Receive method's JSON entry state mutability must be `nonpayable` or `payable`"
)
payable = method_entry_typed["stateMutability"] == "payable"
return cls(payable=payable)
def __init__(self, *, payable: bool = False):
self.payable = payable
[docs]
def to_json(self) -> ABI_JSON:
"""Returns this object's JSON ABI."""
return {
"type": "receive",
"stateMutability": "payable" if self.payable else "nonpayable",
}
def __str__(self) -> str:
return "receive() " + ("payable" if self.payable else "nonpayable")
[docs]
class ConstructorCall:
"""A call to the contract's constructor."""
input_bytes: bytes
"""Encoded call arguments."""
def __init__(self, input_bytes: bytes):
self.input_bytes = input_bytes
[docs]
class MethodCall:
"""A call to a contract's regular method."""
data_bytes: bytes
"""Encoded call arguments with the selector."""
method: Method
"""The method object that encoded this call."""
def __init__(self, method: Method, data_bytes: bytes):
self.method = method
self.data_bytes = data_bytes
# This is force-documented as :py:class in ``api.rst``
# because Sphinx cannot resolve typevars correctly.
# See https://github.com/sphinx-doc/sphinx/issues/9705
MethodType = TypeVar("MethodType")
[docs]
class Methods(Generic[MethodType]):
"""
Bases: ``Generic`` [``MethodType``].
A holder for named methods which can be accessed as attributes,
or iterated over.
"""
# :show-inheritance: is turned off in ``api.rst``, and we are documenting the base manually
# (although without hyperlinking which I cannot get to work).
# See https://github.com/sphinx-doc/sphinx/issues/9705
def __init__(self, methods_dict: Mapping[str, MethodType]):
self._methods_dict = methods_dict
[docs]
def __getattr__(self, method_name: str) -> MethodType:
"""Returns the method by name."""
return self._methods_dict[method_name]
[docs]
def __iter__(self) -> Iterator[MethodType]:
"""Returns the iterator over all methods."""
return iter(self._methods_dict.values())
PANIC_ERROR = Error("Panic", dict(code=abi.uint(256)))
LEGACY_ERROR = Error("Error", dict(message=abi.string))
class UnknownError(Exception):
pass
[docs]
class ContractABI:
"""
A wrapper for contract ABI.
Contract methods are grouped by type and are accessible via the attributes below.
"""
constructor: Constructor
"""Contract's constructor."""
fallback: None | Fallback
"""Contract's fallback method."""
receive: None | Receive
"""Contract's receive method."""
method: Methods[Method | MultiMethod]
"""Contract's regular methods."""
event: Methods[Event]
"""Contract's events."""
error: Methods[Error]
"""Contract's errors."""
[docs]
@classmethod
def from_json(cls, json_abi: ABI_JSON) -> "ContractABI": # noqa: C901, PLR0912
"""Creates this object from a JSON ABI (e.g. generated by a Solidity compiler)."""
# TODO (#83): use proper validation
json_abi_typed = cast("Sequence[Mapping[str, ABI_JSON]]", json_abi)
constructor = None
fallback = None
receive = None
methods: dict[Any, Method | MultiMethod] = {}
events = {}
errors = {}
for entry in json_abi_typed:
if entry["type"] == "constructor":
if constructor:
raise ValueError("JSON ABI contains more than one constructor declarations")
constructor = Constructor.from_json(entry)
elif entry["type"] == "function":
method = Method.from_json(entry)
if entry["name"] in methods:
methods[entry["name"]] = methods[entry["name"]].with_method(method)
else:
methods[entry["name"]] = method
elif entry["type"] == "fallback":
if fallback:
raise ValueError("JSON ABI contains more than one fallback declarations")
fallback = Fallback.from_json(entry)
elif entry["type"] == "receive":
if receive:
raise ValueError("JSON ABI contains more than one receive method declarations")
receive = Receive.from_json(entry)
elif entry["type"] == "event":
if entry["name"] in events:
raise ValueError(
f"JSON ABI contains more than one declarations of `{entry['name']}`"
)
events[entry["name"]] = Event.from_json(entry)
elif entry["type"] == "error":
if entry["name"] in errors:
raise ValueError(
f"JSON ABI contains more than one declarations of `{entry['name']}`"
)
errors[entry["name"]] = Error.from_json(entry)
else:
raise ValueError(f"Unknown ABI entry type: {entry['type']}")
return cls(
constructor=constructor,
fallback=fallback,
receive=receive,
methods=methods.values(),
events=events.values(),
errors=errors.values(),
)
def __init__(
self,
constructor: None | Constructor = None,
fallback: None | Fallback = None,
receive: None | Receive = None,
methods: None | Iterable[Method | MultiMethod] = None,
events: None | Iterable[Event] = None,
errors: None | Iterable[Error] = None,
):
if constructor is None:
constructor = Constructor(inputs=[])
self.fallback = fallback
self.receive = receive
self.constructor = constructor
self.method = Methods({method.name: method for method in (methods or [])})
self.event = Methods({event.name: event for event in (events or [])})
self.error = Methods({error.name: error for error in (errors or [])})
self._error_by_selector = {
error.selector: error for error in chain([PANIC_ERROR, LEGACY_ERROR], self.error)
}
[docs]
def resolve_error(self, error_data: bytes) -> tuple[Error, FieldValues]:
"""
Given the packed error data, attempts to find the error in the ABI
and decode the data into its fields.
"""
if len(error_data) < SELECTOR_LENGTH:
raise ValueError("Error data too short to contain a selector")
selector, data = error_data[:SELECTOR_LENGTH], error_data[SELECTOR_LENGTH:]
if selector in self._error_by_selector:
error = self._error_by_selector[selector]
decoded = error.decode_fields(data)
return error, decoded
raise UnknownError(f"Could not find an error with selector {selector.hex()} in the ABI")
[docs]
def to_json(self) -> ABI_JSON:
"""Returns the serialized list of contract items (methods, errors, events)."""
all_items: Iterable[
Constructor | Fallback | Receive | Method | MultiMethod | Event | Error
] = chain(
[self.constructor] if self.constructor else [],
[self.fallback] if self.fallback else [],
[self.receive] if self.receive else [],
self.method,
self.event,
self.error,
)
entries: list[ABI_JSON] = []
for item in all_items:
if isinstance(item, MultiMethod):
entries.extend(item.to_json())
else:
entries.append(item.to_json())
return entries
def __str__(self) -> str:
all_items: Iterable[
Constructor | Fallback | Receive | Method | MultiMethod | Event | Error
] = chain(
[self.constructor] if self.constructor else [],
[self.fallback] if self.fallback else [],
[self.receive] if self.receive else [],
self.method,
self.event,
self.error,
)
indent = " "
def to_str(item: Any) -> str:
if isinstance(item, MultiMethod):
return ("\n" + indent).join(str(method) for method in item.methods.values())
return str(item)
method_list = [indent + to_str(method) for method in all_items]
return "{\n" + "\n".join(method_list) + "\n}"