Skip to content

Event

eth_rpc.Event #

Bases: Request, Generic[T]

name instance-attribute #

name

anonymous class-attribute instance-attribute #

anonymous = False

topic1_filter class-attribute instance-attribute #

topic1_filter = IGNORE_VAL

topic2_filter class-attribute instance-attribute #

topic2_filter = IGNORE_VAL

topic3_filter class-attribute instance-attribute #

topic3_filter = IGNORE_VAL

addresses_filter class-attribute instance-attribute #

addresses_filter = []

get_topic0 cached property #

get_topic0

subscribe property #

subscribe

This returns a callable for the async subscriber, allowing you to select the network for the subscription, ie.

my_event = Event[EventType](name="MyEvent")
async for event in my_event.subscribe[Ethereum]():
    ...

If no network is provided, it will use the default network.

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

model_post_init #

model_post_init(__context)
Source code in eth_rpc/event.py
def model_post_init(self, __context) -> None:
    EventType, *_ = self.__pydantic_generic_metadata__["args"]
    self._output_type = EventType
    return super().model_post_init(__context)

match_topics #

match_topics(log)
Source code in eth_rpc/event.py
def match_topics(self, log: Log) -> bool:
    # TODO: addresses_filter
    if len(log.topics) == 0:
        return False
    if log.topics[0] != self.get_topic0:
        return False
    if self.topic1_filter != IGNORE_VAL and len(log.topics) >= 2:
        if not self._matches(log.topics[1], self.topic1_filter):
            return False
    if self.topic2_filter != IGNORE_VAL and len(log.topics) >= 3:
        if not self._matches(log.topics[2], self.topic2_filter):
            return False
    if self.topic3_filter != IGNORE_VAL and len(log.topics) >= 4:
        if not self._matches(log.topics[3], self.topic3_filter):
            return False
    return True

match_address #

match_address(log)
Source code in eth_rpc/event.py
def match_address(self, log: Log) -> bool:
    if self.addresses_filter:
        return log.address in self.addresses_filter
    return True

match #

match(log)
Source code in eth_rpc/event.py
def match(self, log: Log) -> bool:
    return self.match_address(log) and self.match_topics(log)

add_address #

add_address(address)
Source code in eth_rpc/event.py
def add_address(self, address: HexAddress | list[HexAddress]):
    if isinstance(address, list):
        for addr in address:
            self.add_address(addr)
    else:
        if address not in self.addresses_filter:
            self.addresses_filter.append(address)

remove_address #

remove_address(address)
Source code in eth_rpc/event.py
def remove_address(self, address: HexAddress):
    self.addresses_filter = [
        _address for _address in self.addresses_filter if _address != address
    ]

process_value staticmethod #

process_value(type_name, v)
Source code in eth_rpc/event.py
@staticmethod
def process_value(type_name, v: str):
    # strip prefix if necessary
    if "0x" in v:
        v = v[2:]

    if type_name == "address":
        # last 20 bytes of value
        return "0x{}".format(v[-40:])
    if "bytes" in type_name:
        return bytes.fromhex(v)
    if "uint" in type_name:
        return int.from_bytes(bytes.fromhex(v), "big", signed=False)
    elif "int" in type_name:
        return int.from_bytes(bytes.fromhex(v), "big", signed=True)
    if type_name == "bool":
        return v[-1] == "1"

from_dict #

from_dict(fields)
Source code in eth_rpc/event.py
def from_dict(self, fields: dict[str, Any]):
    EventType, *_ = self.__pydantic_generic_metadata__["args"]
    return EventType(**fields)

process_log #

process_log(log)
Source code in eth_rpc/event.py
def process_log(self, log: Log) -> EventData[T]:
    return EventData(
        name=self.name,
        log=log,
        event=self.process(log.topics, log.data),
        network=self._network or get_current_network(),
    )

process #

process(topics, data)
Source code in eth_rpc/event.py
def process(self, topics: list[HexStr], data: HexStr) -> T:
    EventType, *_ = self.__pydantic_generic_metadata__["args"]
    indexed = self.get_indexed()
    try:
        indexed_dict = {
            name: self.process_value(type_, topics[i + 1])
            for i, (name, type_) in enumerate(indexed)
        }
    except IndexError:
        raise LogDecodeError("Mismatched Indexed values")

    unindexed = self.get_unindexed()
    try:
        unindexed_values = decode(
            [type_ for (_, type_) in unindexed], bytes.fromhex(data[2:])
        )
    except InsufficientDataBytes:
        raise LogDecodeError("Mismatched Unindexed values")

    return EventType(
        **indexed_dict
        | {name: val for (name, _), val in zip(unindexed, unindexed_values)}
    )

get_indexed #

get_indexed()
Source code in eth_rpc/event.py
def get_indexed(self):
    inputs, *_ = self.__pydantic_generic_metadata__["args"]
    input_types = inputs.model_fields
    results = []
    for name, field in input_types.items():
        indexed = False
        _type = field.annotation
        annotations = field.metadata
        for annotation in annotations:
            if annotation == Indexed:
                indexed = True
            elif isinstance(annotation, Name):
                name = annotation.value
        if indexed:
            results.append((name, map_type(_type)))
    return results

get_unindexed #

get_unindexed()
Source code in eth_rpc/event.py
def get_unindexed(self):
    inputs, *_ = self.__pydantic_generic_metadata__["args"]
    input_types = inputs.model_fields
    results = []
    for name, field in input_types.items():
        indexed = False
        _type = field.annotation
        annotations = field.metadata
        for annotation in annotations:
            if annotation == Indexed:
                indexed = True
            elif isinstance(annotation, Name):
                name = annotation.value
        if not indexed:
            results.append((name, convert(_type)))
    return results

set_filter #

set_filter(
    addresses=[],
    topic1=IGNORE_VAL,
    topic2=IGNORE_VAL,
    topic3=IGNORE_VAL,
)
Source code in eth_rpc/event.py
def set_filter(
    self,
    addresses: list[HexAddress] = [],
    topic1: Optional[HexStr | list[HexStr]] | IGNORE = IGNORE_VAL,
    topic2: Optional[HexStr | list[HexStr]] | IGNORE = IGNORE_VAL,
    topic3: Optional[HexStr | list[HexStr]] | IGNORE = IGNORE_VAL,
):
    model = deepcopy(self)
    model.addresses_filter = addresses
    if topic1 != IGNORE_VAL:
        model.topic1_filter = topic1
    if topic2 != IGNORE_VAL:
        model.topic1_filter = model.topic1_filter or None
        model.topic2_filter = topic2
    if topic3 != IGNORE_VAL:
        model.topic1_filter = model.topic1_filter or None
        model.topic2_filter = model.topic2_filter or None
        model.topic3_filter = topic3
    return model

get_logs async #

get_logs(start_block, end_block)
Source code in eth_rpc/event.py
async def get_logs(
    self,
    start_block: BlockReference | int,
    end_block: BlockReference | int,
) -> AsyncIterator[EventData[T]]:
    cur_end = end_block
    try:
        response = await self._get_logs(
            start_block,
            cur_end,
            self.addresses_filter,
            topic1=self.topic1_filter,
            topic2=self.topic2_filter,
            topic3=self.topic3_filter,
        )
    except ValueError as err:
        message = err.args[0]
        if "Log response size exceeded." in message:
            boundaries = re.findall("0x[0-9a-f]+", message)
            raise LogResponseExceededError(
                err.args[0], int(boundaries[0], 16), int(boundaries[1], 16)
            )
        elif (
            "Your app has exceeded its compute units per second capacity" in message
        ):
            raise RateLimitingError(message)
        raise err

    for result in response:
        # TODO: this is just a placeholder
        if len(result.topics) != (len(self.get_indexed()) + 1):
            # this happens when an event has the same topic0, but different indexed events so it doesn't match up to the expected ABI
            continue

        event_data = EventData[T](
            name=self.name,
            log=result,
            event=self.process(
                result.topics,
                result.data,
            ),
            network=self._network or get_current_network(),
        )
        yield event_data

backfill async #

backfill(start_block=None, end_block=None, step_size=None)

This backfills events, handling LogResponseExceededError to provide all logs in a range too large for a single request

Source code in eth_rpc/event.py
async def backfill(
    self,
    start_block: int | None = None,
    end_block: int | None = None,
    step_size: Optional[int] = None,
) -> AsyncIterator[EventData[T]]:
    """
    This backfills events, handling LogResponseExceededError to provide all logs in a range too large for a single request
    """
    start_block = start_block or 1
    current_number = await Block.get_number()
    end_block = end_block or (current_number - 3)  # set 3 default confirmations

    if start_block == "earliest":
        cur_start = 0
    else:
        cur_start = start_block

    if step_size:
        cur_end = cur_start + step_size
    else:
        cur_end = end_block
    while cur_start <= end_block:
        try:
            async for log in self.get_logs(
                start_block=cur_start,
                end_block=min(cur_end, end_block),
            ):
                yield log
        except LogResponseExceededError as err:
            cur_end = err.recommended_end
            continue
        except RateLimitingError:
            await asyncio.sleep(3)
            continue
        cur_start = cur_end + 1
        if step_size:
            cur_end += step_size
        else:
            cur_end = end_block