Skip to content

Log

eth_rpc.Log #

Bases: Request, Log

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    alias_generator=to_camel,
    populate_by_name=True,
    from_attributes=True,
)

transaction_hash instance-attribute #

transaction_hash

address instance-attribute #

address

block_hash instance-attribute #

block_hash

block_number instance-attribute #

block_number

data instance-attribute #

data

log_index instance-attribute #

log_index

removed instance-attribute #

removed

topics instance-attribute #

topics

transaction_index instance-attribute #

transaction_index

set_network #

set_network(network)
Source code in eth_rpc/utils/model.py
def set_network(self, network: type[Network] | None):
    object.__setattr__(self, "_network", network)

model_post_init #

model_post_init(__context)
Source code in eth_rpc/_request.py
def model_post_init(self, __context):
    network = self.__class__._network
    object.__setattr__(self, "_network", network)
    # overwrite the .rpc() classmethod
    object.__setattr__(self, "rpc", self._rpc)

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

load_by_number classmethod #

load_by_number(
    from_block, to_block=None, address=None, topics=None
)

Get all logs in a certain range

Source code in eth_rpc/log.py
@classmethod
def load_by_number(
    self,
    from_block: int | HexInt,
    to_block: int | HexInt | None = None,
    address: HexAddress | list[HexAddress] | None = None,
    topics: list[list[HexStr] | HexStr | None] | None = None,
) -> RPCResponseModel[LogsArgs, list["LogModel"]]:
    """
    Get all logs in a certain range
    """
    return RPCResponseModel(
        self.rpc().get_logs,
        LogsArgs(
            params=LogsParams(
                address=address,
                from_block=HexInt(from_block),
                to_block=HexInt(to_block or from_block + 1),
                topics=topics,
            )
        ),
    )

listen async classmethod #

listen(*, queue, publish_logs=DEFAULT_EVENT)

Subscribe to logs

Source code in eth_rpc/log.py
@classmethod
async def listen(
    cls,
    *,
    queue: asyncio.Queue["LogModel"],
    publish_logs: asyncio.Event = DEFAULT_EVENT,
):
    """
    Subscribe to logs
    """

    internal_queue: asyncio.Queue = asyncio.Queue()
    flush_queue: bool = True
    async for log in cls._listen():
        if publish_logs.is_set():
            if flush_queue:
                while not internal_queue.empty():
                    staged_block = await internal_queue.get()
                    await queue.put(staged_block)
                flush_queue = False
            await queue.put(log)
        else:
            await internal_queue.put(log)

subscribe_from async classmethod #

subscribe_from(start_block=None, batch_size=50)

Subscribe to logs, but backfilling starting at a specific block number and then listening

Source code in eth_rpc/log.py
@classmethod
async def subscribe_from(
    self,
    start_block: int | None = None,
    batch_size: int = 50,
) -> AsyncIterator[LogModel]:
    """
    Subscribe to logs, but backfilling starting at a specific block number and then listening
    """
    queue = asyncio.Queue[LogModel]()
    should_publish_logs = asyncio.Event()
    asyncio.create_task(
        self.listen(
            queue=queue,
            publish_logs=should_publish_logs,
        )
    )
    latest = await Block.get_number()
    if not start_block:
        start_block = latest
    assert start_block

    num = start_block
    while num <= latest:
        batch_end = min(num + batch_size, latest)
        for log in await self.load_by_number(num, batch_end):
            yield log
        num += batch_size

    should_publish_logs.set()
    while True:
        log = await queue.get()
        if log.block_number > latest:
            yield log