Skip to content

Block

eth_rpc.Block #

Bases: Block, Request, Generic[NetworkT]

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    alias_generator=to_camel,
    populate_by_name=True,
    from_attributes=True,
)

number instance-attribute #

number

hash class-attribute instance-attribute #

hash = None

transactions class-attribute instance-attribute #

transactions = Field(default_factory=list)

base_fee_per_gas class-attribute instance-attribute #

base_fee_per_gas = None

difficulty instance-attribute #

difficulty

extra_data instance-attribute #

extra_data

gas_limit instance-attribute #

gas_limit

gas_used instance-attribute #

gas_used

logs_bloom instance-attribute #

logs_bloom

miner class-attribute instance-attribute #

miner = None

mix_hash instance-attribute #

mix_hash

nonce class-attribute instance-attribute #

nonce = None

parent_hash instance-attribute #

parent_hash

receipts_root instance-attribute #

receipts_root

sha3_uncles instance-attribute #

sha3_uncles

size instance-attribute #

size

state_root instance-attribute #

state_root

timestamp instance-attribute #

timestamp

total_difficulty class-attribute instance-attribute #

total_difficulty = None

transactions_root instance-attribute #

transactions_root

uncles class-attribute instance-attribute #

uncles = Field(default_factory=list)

normalize_timestamp class-attribute instance-attribute #

normalize_timestamp = field_validator(
    "timestamp", mode="before"
)(load_datetime_string)

model_post_init #

model_post_init(__context)
Source code in eth_rpc/_request.py
def model_post_init(self, __context):
    network = self.__class__._network
    object.__setattr__(self, "_network", network)
    # overwrite the .rpc() classmethod
    object.__setattr__(self, "rpc", self._rpc)

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

set_network #

set_network(network)
Source code in eth_rpc/utils/model.py
def set_network(self, network: type[Network] | None):
    object.__setattr__(self, "_network", network)

has_log #

has_log(topic)
Source code in eth_rpc/models/block.py
def has_log(self, topic: HexStr):
    t = bytes.fromhex(topic.replace("0x", ""))
    return t in BloomFilter(self.logs_bloom)

compress #

compress()
Source code in eth_rpc/models/block.py
def compress(self) -> bytes:
    return zlib.compress(self.model_dump_json().encode("utf-8"))

priority_fee classmethod #

priority_fee()
Source code in eth_rpc/block.py
@classmethod
def priority_fee(cls) -> RPCResponseModel[NoArgs, HexInteger]:
    return RPCResponseModel(
        cls.rpc().max_priority_fee_per_gas,
    )

fee_history classmethod #

fee_history(
    block_count=4,
    lower_percentile=25,
    upper_percentile=75,
    block_number="latest",
)
Source code in eth_rpc/block.py
@classmethod
def fee_history(
    cls,
    block_count: int = 4,
    lower_percentile: int = 25,
    upper_percentile: int = 75,
    block_number: BlockReference = "latest",
) -> RPCResponseModel[FeeHistoryArgs, FeeHistory]:
    return RPCResponseModel(
        cls.rpc().fee_history,
        FeeHistoryArgs(
            block_count=block_count,
            block_number=block_number,
            percentiles=[
                lower_percentile,
                upper_percentile,
            ],
        ),
    )

get_block_transaction_count classmethod #

get_block_transaction_count(block_number)
Source code in eth_rpc/block.py
@classmethod
def get_block_transaction_count(
    cls, block_number: HexInteger
) -> RPCResponseModel[BlockNumberArg, int]:
    return RPCResponseModel(
        cls.rpc().get_block_tx_count_by_number,
        BlockNumberArg(
            block_number=block_number,
        ),
    )

load_by_number classmethod #

load_by_number(block_number, with_tx_data=False)
Source code in eth_rpc/block.py
@classmethod
def load_by_number(
    cls,
    block_number: int | HexInteger | BLOCK_STRINGS,
    with_tx_data: bool = False,
) -> RPCResponseModel[GetBlockByNumberArgs, "Block[Network]"]:
    return RPCResponseModel(
        cls.rpc().get_block_by_number,
        GetBlockByNumberArgs(
            block_number=(
                HexInteger(block_number)
                if isinstance(block_number, int)
                else block_number
            ),
            with_tx_data=with_tx_data,
        ),
    )

get_number classmethod #

get_number()
Source code in eth_rpc/block.py
@classmethod
def get_number(cls) -> RPCResponseModel[NoArgs, HexInteger]:
    return RPCResponseModel(
        cls.rpc().block_number,
    )

load_by_datetime async classmethod #

load_by_datetime(
    when, low=None, high=None, apprx_block_time=12
)

Searches for a block, finding the first block before a datetime. Recursively searches, using low and high as the boundaries for the binary search.

Source code in eth_rpc/block.py
@classmethod
async def load_by_datetime(
    cls,
    when: datetime,
    low: int | None = None,
    high: int | None = None,
    apprx_block_time=12,
) -> "Block[Network]":
    """
    Searches for a block, finding the first block before a datetime.
    Recursively searches, using low and high as the boundaries for the binary search.
    """
    Network = cls._network
    if not when.tzinfo:
        when = when.replace(tzinfo=timezone.utc)
    if not (low and high):
        now = datetime.now(timezone.utc)
        diff = now - when
        day_diff = diff.days
        seconds_diff = day_diff * 24 * 3600
        block_number = await cls[Network].get_number()  # type: ignore
        if not low:
            low = int(
                max(block_number - (seconds_diff / (apprx_block_time * 0.8)), 0)
            )
        if not high:
            high = int(
                min(
                    block_number - (seconds_diff / (apprx_block_time * 1.2)),
                    block_number,
                )
            )
    if when < datetime(
        year=2015,
        month=7,
        day=30,
        hour=3,
        minute=26,
        second=13,
        tzinfo=timezone.utc,
    ):
        raise ValueError("Block before genesis")

    if high > low:
        mid = (high + low) // 2
        mid_block = await cls[Network].load_by_number(mid)  # type: ignore

        if mid_block.timestamp == when:
            return mid_block
        elif mid_block.timestamp > when:
            return await cls[Network].load_by_datetime(when, low, mid - 1)  # type: ignore
        else:
            return await cls[Network].load_by_datetime(when, mid + 1, high)  # type: ignore
    return await cls[Network].load_by_number(high)  # type: ignore

latest classmethod #

latest(with_tx_data=False)
Source code in eth_rpc/block.py
@classmethod
def latest(
    cls, with_tx_data: bool = False
) -> RPCResponseModel[GetBlockByNumberArgs, "Block[Network]"]:
    return cls.load_by_number("latest", with_tx_data=with_tx_data)

pending classmethod #

pending(with_tx_data=False)
Source code in eth_rpc/block.py
@classmethod
def pending(
    cls, with_tx_data: bool = False
) -> RPCResponseModel[GetBlockByNumberArgs, "Block[Network]"]:
    return cls.load_by_number("pending", with_tx_data=with_tx_data)

load_by_hash classmethod #

load_by_hash(block_hash, with_tx_data=False)
Source code in eth_rpc/block.py
@classmethod
def load_by_hash(
    cls, block_hash: HexStr, with_tx_data: bool = False
) -> RPCResponseModel[GetBlockByHashArgs, "Block[Network]"]:
    return RPCResponseModel(
        cls.rpc().get_block_by_hash,
        GetBlockByHashArgs(
            block_hash=block_hash,
            with_tx_data=with_tx_data,
        ),
    )

subscribe_from async classmethod #

subscribe_from(start_block=None, with_tx_data=True)
Source code in eth_rpc/block.py
@classmethod
async def subscribe_from(
    cls,
    start_block: int | None = None,
    with_tx_data: bool = True,
) -> AsyncIterator["Block[Network]"]:
    queue = asyncio.Queue[Block[Network]]()
    should_publish_blocks = asyncio.Event()
    asyncio.create_task(
        cls.listen(
            queue=queue,
            publish_blocks=should_publish_blocks,
            with_tx_data=with_tx_data,
        )
    )
    latest = await cls.latest()
    if not start_block:
        start_block = latest.number
    assert start_block

    # NOTE: you pull latest twice because there can be a backfill while you're populating
    for num in range(start_block, latest.number + 1):
        yield await cls.load_by_number(num, with_tx_data=with_tx_data)

    should_publish_blocks.set()
    while True:
        block = await queue.get()
        if block.number > latest.number:
            yield block

listen async classmethod #

listen(
    *,
    queue,
    publish_blocks=DEFAULT_EVENT,
    with_tx_data=True,
    subscription_type="newHeads"
)
Source code in eth_rpc/block.py
@classmethod
async def listen(
    cls,
    *,
    # TODO: typehinting this is tricky because the type of the Queue is conditional based on the subscription type
    queue: asyncio.Queue,
    publish_blocks: asyncio.Event = DEFAULT_EVENT,
    with_tx_data: bool = True,
    subscription_type: SUBSCRIPTION_TYPE = "newHeads",
):
    internal_queue: asyncio.Queue = asyncio.Queue()
    flush_queue: bool = True
    async for block in cls._listen(
        with_tx_data=with_tx_data, subscription_type=subscription_type
    ):
        if publish_blocks.is_set():
            if flush_queue:
                while not internal_queue.empty():
                    staged_block = await internal_queue.get()
                    await queue.put(staged_block)
                flush_queue = False
            await queue.put(block)
        else:
            await internal_queue.put(block)

convert async classmethod #

convert(block_value)
Source code in eth_rpc/block.py
@classmethod
async def convert(cls, block_value: BLOCK_STRINGS | int) -> int:
    if isinstance(block_value, int):
        return block_value
    block = await cls.load_by_number(block_value)
    return block.number

decompress classmethod #

decompress(raw_bytes)

Convert gzip compressed block to a Block

Source code in eth_rpc/block.py
@classmethod
def decompress(cls, raw_bytes: bytes) -> "Block":
    """Convert gzip compressed block to a Block"""
    return Block.model_validate_json(zlib.decompress(raw_bytes))

parent_block #

parent_block()
Source code in eth_rpc/block.py
def parent_block(self) -> RPCResponseModel[GetBlockByHashArgs, "Block[Network]"]:
    return self.load_by_hash(self.parent_hash)