Skip to content

Transaction

eth_rpc.Transaction #

Bases: Request, Transaction, Generic[Network]

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    alias_generator=to_camel,
    populate_by_name=True,
    from_attributes=True,
)

hash instance-attribute #

hash

access_list class-attribute instance-attribute #

access_list = None

chain_id class-attribute instance-attribute #

chain_id = None

from_ class-attribute instance-attribute #

from_ = Field(alias='from')

gas instance-attribute #

gas

gas_price instance-attribute #

gas_price

max_fee_per_gas class-attribute instance-attribute #

max_fee_per_gas = None

max_priority_fee_per_gas class-attribute instance-attribute #

max_priority_fee_per_gas = None

input instance-attribute #

input

nonce instance-attribute #

nonce

r instance-attribute #

r

s instance-attribute #

s

v instance-attribute #

v

to instance-attribute #

to

type class-attribute instance-attribute #

type = None

value instance-attribute #

value

y_parity class-attribute instance-attribute #

y_parity = None

block_hash instance-attribute #

block_hash

block_number instance-attribute #

block_number

transaction_index instance-attribute #

transaction_index

get_block #

get_block(with_tx_data=False)
Source code in eth_rpc/models/transaction.py
def get_block(self, with_tx_data: bool = False):
    from eth_rpc.block import Block

    """Load a block hash"""

    return Block.load_by_hash(self.block_hash, with_tx_data=with_tx_data)

model_post_init #

model_post_init(__context)
Source code in eth_rpc/_request.py
def model_post_init(self, __context):
    network = self.__class__._network
    object.__setattr__(self, "_network", network)
    # overwrite the .rpc() classmethod
    object.__setattr__(self, "rpc", self._rpc)

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

get_by_hash classmethod #

get_by_hash(tx_hash)
Source code in eth_rpc/transaction.py
@classmethod
def get_by_hash(
    self, tx_hash: HexStr
) -> RPCResponseModel[TransactionRequest, Optional["Transaction[Network]"]]:
    return RPCResponseModel(
        self.rpc().get_tx_by_hash,
        TransactionRequest(
            tx_hash=tx_hash,
        ),
    )

get_pending_by_hash classmethod #

get_pending_by_hash(tx_hash)
Source code in eth_rpc/transaction.py
@classmethod
def get_pending_by_hash(
    self, tx_hash: HexStr
) -> RPCResponseModel[TransactionRequest, PendingTransaction]:
    return RPCResponseModel(
        self.rpc().get_pending_tx_by_hash,
        TransactionRequest(
            tx_hash=tx_hash,
        ),
    )

get_receipt_by_hash classmethod #

get_receipt_by_hash(tx_hash)
Source code in eth_rpc/transaction.py
@classmethod
def get_receipt_by_hash(
    self, tx_hash: HexStr
) -> RPCResponseModel[TransactionRequest, "Transaction[Network]"]:
    return RPCResponseModel(
        self.rpc().get_tx_receipt,
        TransactionRequest(
            tx_hash=tx_hash,
        ),
    )

send_raw_transaction classmethod #

send_raw_transaction(tx)
Source code in eth_rpc/transaction.py
@classmethod
def send_raw_transaction(
    cls, tx: HexStr
) -> RPCResponseModel[RawTransaction, HexStr]:
    return RPCResponseModel(
        cls.rpc().send_raw_tx,
        RawTransaction(
            signed_tx=tx,
        ),
    )

get_by_index classmethod #

get_by_index(
    transaction_index, block_hash=None, block_number=None
)
Source code in eth_rpc/transaction.py
@classmethod
def get_by_index(
    self,
    transaction_index: int,
    block_hash: HexStr | None = None,
    block_number: int | BLOCK_STRINGS | None = None,
) -> RPCResponseModel[
    GetTransactionByBlockHash | GetTransactionByBlockNumber, "Transaction[Network]"
]:
    if block_hash is None and block_number is None:
        raise ValueError("Must provide either block_hash or block_number")
    if block_hash:
        return RPCResponseModel(
            self.rpc().get_tx_by_block_hash,
            GetTransactionByBlockHash(
                block_hash=block_hash,
                index=HexInteger(transaction_index),
            ),
        )
    block_number = cast(int | BLOCK_STRINGS, block_number)
    return RPCResponseModel(
        self.rpc().get_tx_by_block_number,
        GetTransactionByBlockNumber(
            block_number=(
                HexInteger(block_number)
                if isinstance(block_number, int)
                else block_number
            ),
            index=HexInteger(transaction_index),
        ),
    )

subscribe_pending async classmethod #

subscribe_pending()
Source code in eth_rpc/transaction.py
@classmethod
async def subscribe_pending(  # noqa: C901
    self,
) -> AsyncIterator[PendingTransaction]:  # noqa: C901
    rpc = _force_get_global_rpc()
    async for w3_connection in connect(
        rpc.wss,
        ping_interval=60,
        ping_timeout=60,
        max_queue=10000,
        open_timeout=30,
    ):
        try:
            await self._send_subscription_request(
                w3_connection,
            )
            subscription_response: SubscriptionResponse = json.loads(
                await w3_connection.recv()
            )
            if not subscription_response.get("result"):
                raise ValueError(subscription_response)
        except Exception as e:
            raise e

        while True:
            try:
                message = await asyncio.wait_for(w3_connection.recv(), timeout=32.0)
                message_json: JsonPendingWssResponse = json.loads(message)
                if "params" not in message_json:
                    raise ValueError(message_json)

                transaction_hash: HexStr = message_json["params"]["result"]
                transaction = await self.get_pending_by_hash(transaction_hash)
                if transaction:
                    yield transaction
            except asyncio.exceptions.TimeoutError:
                pass
            except (
                ConnectionClosedError,
                ConnectionResetError,
                OSError,  # No route to host
                asyncio.exceptions.IncompleteReadError,  # TODO: should this be handled differently?
            ) as err:
                logger.error("connection terminated unexpectedly: %s", err)
                await asyncio.sleep(1)
                # we're in an iterator, so make a new connection and continue listening
                break
            except Exception as err:
                logger.error("unknown connection error: %s", err)
                await asyncio.sleep(1)
                # we're in an iterator, so make a new connection and continue listening
                break

receipt #

receipt()
Source code in eth_rpc/transaction.py
def receipt(
    self,
) -> RPCResponseModel[TransactionRequest, Optional[TransactionReceipt]]:
    return RPCResponseModel(
        self.rpc().get_tx_receipt,
        TransactionRequest(
            tx_hash=self.hash,
        ),
    )