Bases: Request
, Transaction
, Generic[Network]
model_config class-attribute
instance-attribute
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)
access_list class-attribute
instance-attribute
chain_id class-attribute
instance-attribute
from_ class-attribute
instance-attribute
from_ = Field(alias='from')
gas_price instance-attribute
max_fee_per_gas class-attribute
instance-attribute
max_priority_fee_per_gas class-attribute
instance-attribute
max_priority_fee_per_gas = None
type class-attribute
instance-attribute
y_parity class-attribute
instance-attribute
block_hash instance-attribute
block_number instance-attribute
transaction_index instance-attribute
get_block
get_block(with_tx_data=False)
Source code in eth_rpc/models/transaction.py
| def get_block(self, with_tx_data: bool = False):
from eth_rpc.block import Block
"""Load a block hash"""
return Block.load_by_hash(self.block_hash, with_tx_data=with_tx_data)
|
model_post_init
model_post_init(__context)
Source code in eth_rpc/_request.py
| def model_post_init(self, __context):
network = self.__class__._network
object.__setattr__(self, "_network", network)
# overwrite the .rpc() classmethod
object.__setattr__(self, "rpc", self._rpc)
|
rpc classmethod
This uses the default network, unless a network has been provided
Source code in eth_rpc/_request.py
| @classmethod
def rpc(cls) -> "RPC":
"""
This uses the default network, unless a network has been provided
"""
from ._transport import _force_get_global_rpc
if cls._network is None:
return _force_get_global_rpc()
response = _force_get_global_rpc(cls._network)
return response
|
get_by_hash classmethod
Source code in eth_rpc/transaction.py
| @classmethod
def get_by_hash(
self, tx_hash: HexStr
) -> RPCResponseModel[TransactionRequest, Optional["Transaction[Network]"]]:
return RPCResponseModel(
self.rpc().get_tx_by_hash,
TransactionRequest(
tx_hash=tx_hash,
),
)
|
get_pending_by_hash classmethod
get_pending_by_hash(tx_hash)
Source code in eth_rpc/transaction.py
| @classmethod
def get_pending_by_hash(
self, tx_hash: HexStr
) -> RPCResponseModel[TransactionRequest, PendingTransaction]:
return RPCResponseModel(
self.rpc().get_pending_tx_by_hash,
TransactionRequest(
tx_hash=tx_hash,
),
)
|
get_receipt_by_hash classmethod
get_receipt_by_hash(tx_hash)
Source code in eth_rpc/transaction.py
| @classmethod
def get_receipt_by_hash(
self, tx_hash: HexStr
) -> RPCResponseModel[TransactionRequest, "Transaction[Network]"]:
return RPCResponseModel(
self.rpc().get_tx_receipt,
TransactionRequest(
tx_hash=tx_hash,
),
)
|
send_raw_transaction classmethod
Source code in eth_rpc/transaction.py
| @classmethod
def send_raw_transaction(
cls, tx: HexStr
) -> RPCResponseModel[RawTransaction, HexStr]:
return RPCResponseModel(
cls.rpc().send_raw_tx,
RawTransaction(
signed_tx=tx,
),
)
|
get_by_index classmethod
get_by_index(
transaction_index, block_hash=None, block_number=None
)
Source code in eth_rpc/transaction.py
| @classmethod
def get_by_index(
self,
transaction_index: int,
block_hash: HexStr | None = None,
block_number: int | BLOCK_STRINGS | None = None,
) -> RPCResponseModel[
GetTransactionByBlockHash | GetTransactionByBlockNumber, "Transaction[Network]"
]:
if block_hash is None and block_number is None:
raise ValueError("Must provide either block_hash or block_number")
if block_hash:
return RPCResponseModel(
self.rpc().get_tx_by_block_hash,
GetTransactionByBlockHash(
block_hash=block_hash,
index=HexInteger(transaction_index),
),
)
block_number = cast(int | BLOCK_STRINGS, block_number)
return RPCResponseModel(
self.rpc().get_tx_by_block_number,
GetTransactionByBlockNumber(
block_number=(
HexInteger(block_number)
if isinstance(block_number, int)
else block_number
),
index=HexInteger(transaction_index),
),
)
|
subscribe_pending async
classmethod
Source code in eth_rpc/transaction.py
| @classmethod
async def subscribe_pending( # noqa: C901
self,
) -> AsyncIterator[PendingTransaction]: # noqa: C901
rpc = _force_get_global_rpc()
async for w3_connection in connect(
rpc.wss,
ping_interval=60,
ping_timeout=60,
max_queue=10000,
open_timeout=30,
):
try:
await self._send_subscription_request(
w3_connection,
)
subscription_response: SubscriptionResponse = json.loads(
await w3_connection.recv()
)
if not subscription_response.get("result"):
raise ValueError(subscription_response)
except Exception as e:
raise e
while True:
try:
message = await asyncio.wait_for(w3_connection.recv(), timeout=32.0)
message_json: JsonPendingWssResponse = json.loads(message)
if "params" not in message_json:
raise ValueError(message_json)
transaction_hash: HexStr = message_json["params"]["result"]
transaction = await self.get_pending_by_hash(transaction_hash)
if transaction:
yield transaction
except asyncio.exceptions.TimeoutError:
pass
except (
ConnectionClosedError,
ConnectionResetError,
OSError, # No route to host
asyncio.exceptions.IncompleteReadError, # TODO: should this be handled differently?
) as err:
logger.error("connection terminated unexpectedly: %s", err)
await asyncio.sleep(1)
# we're in an iterator, so make a new connection and continue listening
break
except Exception as err:
logger.error("unknown connection error: %s", err)
await asyncio.sleep(1)
# we're in an iterator, so make a new connection and continue listening
break
|
receipt
Source code in eth_rpc/transaction.py
| def receipt(
self,
) -> RPCResponseModel[TransactionRequest, Optional[TransactionReceipt]]:
return RPCResponseModel(
self.rpc().get_tx_receipt,
TransactionRequest(
tx_hash=self.hash,
),
)
|