Bases: Block
, Request
, Generic[NetworkT]
model_config class-attribute
instance-attribute
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)
number instance-attribute
hash class-attribute
instance-attribute
transactions class-attribute
instance-attribute
transactions = Field(default_factory=list)
base_fee_per_gas class-attribute
instance-attribute
difficulty instance-attribute
gas_limit instance-attribute
gas_used instance-attribute
logs_bloom instance-attribute
miner class-attribute
instance-attribute
mix_hash instance-attribute
nonce class-attribute
instance-attribute
parent_hash instance-attribute
receipts_root instance-attribute
sha3_uncles instance-attribute
state_root instance-attribute
timestamp instance-attribute
total_difficulty class-attribute
instance-attribute
transactions_root instance-attribute
uncles class-attribute
instance-attribute
uncles = Field(default_factory=list)
normalize_timestamp class-attribute
instance-attribute
normalize_timestamp = field_validator(
"timestamp", mode="before"
)(load_datetime_string)
model_post_init
model_post_init(__context)
Source code in eth_rpc/_request.py
| def model_post_init(self, __context):
network = self.__class__._network
object.__setattr__(self, "_network", network)
# overwrite the .rpc() classmethod
object.__setattr__(self, "rpc", self._rpc)
|
rpc classmethod
This uses the default network, unless a network has been provided
Source code in eth_rpc/_request.py
| @classmethod
def rpc(cls) -> "RPC":
"""
This uses the default network, unless a network has been provided
"""
from ._transport import _force_get_global_rpc
if cls._network is None:
return _force_get_global_rpc()
response = _force_get_global_rpc(cls._network)
return response
|
set_network
Source code in eth_rpc/utils/model.py
| def set_network(self, network: type[Network] | None):
object.__setattr__(self, "_network", network)
|
has_log
Source code in eth_rpc/models/block.py
| def has_log(self, topic: HexStr):
t = bytes.fromhex(topic.replace("0x", ""))
return t in BloomFilter(self.logs_bloom)
|
compress
Source code in eth_rpc/models/block.py
| def compress(self) -> bytes:
return zlib.compress(self.model_dump_json().encode("utf-8"))
|
priority_fee classmethod
Source code in eth_rpc/block.py
| @classmethod
def priority_fee(cls) -> RPCResponseModel[NoArgs, HexInteger]:
return RPCResponseModel(
cls.rpc().max_priority_fee_per_gas,
)
|
fee_history classmethod
fee_history(
block_count=4,
lower_percentile=25,
upper_percentile=75,
block_number="latest",
)
Source code in eth_rpc/block.py
| @classmethod
def fee_history(
cls,
block_count: int = 4,
lower_percentile: int = 25,
upper_percentile: int = 75,
block_number: BlockReference = "latest",
) -> RPCResponseModel[FeeHistoryArgs, FeeHistory]:
return RPCResponseModel(
cls.rpc().fee_history,
FeeHistoryArgs(
block_count=block_count,
block_number=block_number,
percentiles=[
lower_percentile,
upper_percentile,
],
),
)
|
get_block_transaction_count classmethod
get_block_transaction_count(block_number)
Source code in eth_rpc/block.py
| @classmethod
def get_block_transaction_count(
cls, block_number: HexInteger
) -> RPCResponseModel[BlockNumberArg, int]:
return RPCResponseModel(
cls.rpc().get_block_tx_count_by_number,
BlockNumberArg(
block_number=block_number,
),
)
|
load_by_number classmethod
load_by_number(block_number, with_tx_data=False)
Source code in eth_rpc/block.py
| @classmethod
def load_by_number(
cls,
block_number: int | HexInteger | BLOCK_STRINGS,
with_tx_data: bool = False,
) -> RPCResponseModel[GetBlockByNumberArgs, "Block[Network]"]:
return RPCResponseModel(
cls.rpc().get_block_by_number,
GetBlockByNumberArgs(
block_number=(
HexInteger(block_number)
if isinstance(block_number, int)
else block_number
),
with_tx_data=with_tx_data,
),
)
|
get_number classmethod
Source code in eth_rpc/block.py
| @classmethod
def get_number(cls) -> RPCResponseModel[NoArgs, HexInteger]:
return RPCResponseModel(
cls.rpc().block_number,
)
|
load_by_datetime async
classmethod
load_by_datetime(
when, low=None, high=None, apprx_block_time=12
)
Searches for a block, finding the first block before a datetime. Recursively searches, using low and high as the boundaries for the binary search.
Source code in eth_rpc/block.py
| @classmethod
async def load_by_datetime(
cls,
when: datetime,
low: int | None = None,
high: int | None = None,
apprx_block_time=12,
) -> "Block[Network]":
"""
Searches for a block, finding the first block before a datetime.
Recursively searches, using low and high as the boundaries for the binary search.
"""
Network = cls._network
if not when.tzinfo:
when = when.replace(tzinfo=timezone.utc)
if not (low and high):
now = datetime.now(timezone.utc)
diff = now - when
day_diff = diff.days
seconds_diff = day_diff * 24 * 3600
block_number = await cls[Network].get_number() # type: ignore
if not low:
low = int(
max(block_number - (seconds_diff / (apprx_block_time * 0.8)), 0)
)
if not high:
high = int(
min(
block_number - (seconds_diff / (apprx_block_time * 1.2)),
block_number,
)
)
if when < datetime(
year=2015,
month=7,
day=30,
hour=3,
minute=26,
second=13,
tzinfo=timezone.utc,
):
raise ValueError("Block before genesis")
if high > low:
mid = (high + low) // 2
mid_block = await cls[Network].load_by_number(mid) # type: ignore
if mid_block.timestamp == when:
return mid_block
elif mid_block.timestamp > when:
return await cls[Network].load_by_datetime(when, low, mid - 1) # type: ignore
else:
return await cls[Network].load_by_datetime(when, mid + 1, high) # type: ignore
return await cls[Network].load_by_number(high) # type: ignore
|
latest classmethod
latest(with_tx_data=False)
Source code in eth_rpc/block.py
| @classmethod
def latest(
cls, with_tx_data: bool = False
) -> RPCResponseModel[GetBlockByNumberArgs, "Block[Network]"]:
return cls.load_by_number("latest", with_tx_data=with_tx_data)
|
pending classmethod
pending(with_tx_data=False)
Source code in eth_rpc/block.py
| @classmethod
def pending(
cls, with_tx_data: bool = False
) -> RPCResponseModel[GetBlockByNumberArgs, "Block[Network]"]:
return cls.load_by_number("pending", with_tx_data=with_tx_data)
|
load_by_hash classmethod
load_by_hash(block_hash, with_tx_data=False)
Source code in eth_rpc/block.py
| @classmethod
def load_by_hash(
cls, block_hash: HexStr, with_tx_data: bool = False
) -> RPCResponseModel[GetBlockByHashArgs, "Block[Network]"]:
return RPCResponseModel(
cls.rpc().get_block_by_hash,
GetBlockByHashArgs(
block_hash=block_hash,
with_tx_data=with_tx_data,
),
)
|
subscribe_from async
classmethod
subscribe_from(start_block=None, with_tx_data=True)
Source code in eth_rpc/block.py
| @classmethod
async def subscribe_from(
cls,
start_block: int | None = None,
with_tx_data: bool = True,
) -> AsyncIterator["Block[Network]"]:
queue = asyncio.Queue[Block[Network]]()
should_publish_blocks = asyncio.Event()
asyncio.create_task(
cls.listen(
queue=queue,
publish_blocks=should_publish_blocks,
with_tx_data=with_tx_data,
)
)
latest = await cls.latest()
if not start_block:
start_block = latest.number
assert start_block
# NOTE: you pull latest twice because there can be a backfill while you're populating
for num in range(start_block, latest.number + 1):
yield await cls.load_by_number(num, with_tx_data=with_tx_data)
should_publish_blocks.set()
while True:
block = await queue.get()
if block.number > latest.number:
yield block
|
listen async
classmethod
listen(
*,
queue,
publish_blocks=DEFAULT_EVENT,
with_tx_data=True,
subscription_type="newHeads"
)
Source code in eth_rpc/block.py
| @classmethod
async def listen(
cls,
*,
# TODO: typehinting this is tricky because the type of the Queue is conditional based on the subscription type
queue: asyncio.Queue,
publish_blocks: asyncio.Event = DEFAULT_EVENT,
with_tx_data: bool = True,
subscription_type: SUBSCRIPTION_TYPE = "newHeads",
):
internal_queue: asyncio.Queue = asyncio.Queue()
flush_queue: bool = True
async for block in cls._listen(
with_tx_data=with_tx_data, subscription_type=subscription_type
):
if publish_blocks.is_set():
if flush_queue:
while not internal_queue.empty():
staged_block = await internal_queue.get()
await queue.put(staged_block)
flush_queue = False
await queue.put(block)
else:
await internal_queue.put(block)
|
convert async
classmethod
Source code in eth_rpc/block.py
| @classmethod
async def convert(cls, block_value: BLOCK_STRINGS | int) -> int:
if isinstance(block_value, int):
return block_value
block = await cls.load_by_number(block_value)
return block.number
|
decompress classmethod
Convert gzip compressed block to a Block
Source code in eth_rpc/block.py
| @classmethod
def decompress(cls, raw_bytes: bytes) -> "Block":
"""Convert gzip compressed block to a Block"""
return Block.model_validate_json(zlib.decompress(raw_bytes))
|
parent_block
Source code in eth_rpc/block.py
| def parent_block(self) -> RPCResponseModel[GetBlockByHashArgs, "Block[Network]"]:
return self.load_by_hash(self.parent_hash)
|