Subscription#
Subscribing to Events in Realtime#
The subscribe method on the Event instance is an async generator that will yield EventData[T], where T
is a Generic Argument of type BaseModel
for the event's schema. For example:
# Create an event filter for the V2SwapEvent
# Set up a filter for the V2SwapEvent, specifying the contract address
# You can also filter on the different topics, ie
event_filter = V2SwapEvent.set_filter(
addresses=["0x0d4a11d5EEaaC28EC3F61d100daF4d40471f1852"],
# topic1="0x...",
# topic2="0x...",
)
# Use an asynchronous for loop to iterate over the event subscriber
async for event_data in event_filter.subscribe():
# event_data contains two main components:
# 1. The raw log data
log: Log = event_data.log
# 2. The decoded event data as an instance of V2SwapEventType
event: V2SwapEventType = event_data.event
# You can access other fields similarly:
print(f"Sender: {event.sender}")
print(f"To: {event.to}")
print(f"Amount 0 In: {event.amount0_in}")
print(f"Amount 1 In: {event.amount1_in}")
print(f"Amount 0 Out: {event.amount0_out}")
print(f"Amount 1 Out: {event.amount1_out}")
You can also access raw log data if needed:
print(f"Block Number: {log.block_number}")
print(f"Transaction Hash: {log.transaction_hash}")
Backfill#
Backfilling Historical Event Data#
The Event class also provides functionality for retrieving historical event data within a specified block range. This is particularly useful for backfilling data. Here's an example of how to use this feature:
from eth_rpc.utils import address_to_topic
class TransferEventType(BaseModel):
sender: Annotated[primitives.address, Indexed]
recipient: Annotated[primitives.address, Indexed]
amount: primitives.uint256
# Create an event filter for the TransferEvent
TransferEvent = Event[TransferEventType](name="Transfer")
# Set up a filter for the TransferEvent, specifying the topic1 filter.
# This makes the subscription only return events with the sender specified.
event_filter = TransferEvent.set_filter(
topic1=address_to_topic("0xBE0eB53F46cd790Cd13851d5EFf43D12404d33E8"),
)
# Use an asynchronous for loop to iterate over the event subscriber
# This will yield EventData[TransferEvent]
async for event_data in event_filter.backfill(start_block=16_000_000, end_block=18_000_000):
# event_data contains two main components:
# 1. The raw log data
log: Log = event_data.log
# 2. The decoded event data as an instance of V2SwapEventType
event: V2SwapEventType = event_data.event