Skip to content

UniswapV3

eth_typeshed.uniswap_v3 #

V3PoolCreatedEvent module-attribute #

V3PoolCreatedEvent = Event[V3PoolCreatedEventType](
    name="PoolCreated"
)

V3SwapEvent module-attribute #

V3SwapEvent = Event[V3SwapEventType](name='Swap')

NONFUNGIBLE_POSITION_MANAGER_ADDRESS module-attribute #

NONFUNGIBLE_POSITION_MANAGER_ADDRESS = HexAddress(
    HexStr("0xC36442b4a4522E871399CD717aBDD847Ab11FE88")
)

V3PoolCreatedEventType #

Bases: BaseModel

token0 instance-attribute #

token0

token1 instance-attribute #

token1

fee instance-attribute #

fee

tick_spacing instance-attribute #

tick_spacing

pool instance-attribute #

pool

V3SwapEventType #

Bases: BaseModel

sender instance-attribute #

sender

recipient instance-attribute #

recipient

amount0 instance-attribute #

amount0

amount1 instance-attribute #

amount1

sqrt_price_x96 instance-attribute #

sqrt_price_x96

liquidity instance-attribute #

liquidity

tick instance-attribute #

tick

GetPoolRequest #

Bases: BaseModel

token_a instance-attribute #

token_a

token_b instance-attribute #

token_b

fee instance-attribute #

fee

UniswapV3Factory #

UniswapV3Factory(**kwargs)

Bases: ProtocolBase

This is how we make the ProtocolBase inject the ContractFunc via the type signature. After a lot of research, this goes slightly outside of the bounds for type hinting, but I think the improved expressiveness makes it fully worthwhile.

Source code in eth_rpc/contract/base.py
def __init__(self, **kwargs):
    """
    This is how we make the ProtocolBase inject the ContractFunc via the type signature.
    After a lot of research, this goes slightly outside of the bounds for type hinting,
    but I think the improved expressiveness makes it fully worthwhile.
    """
    super().__init__(**kwargs)

    for alias, func in self._func_sigs.items():
        name = alias
        if is_annotation(func):
            annotation_args = get_args(func)
            args = annotation_args[0]
            for annotation in annotation_args:
                if isinstance(annotation, Name):
                    name = annotation.value
        else:
            args = func
        T, U = get_args(args)

        setattr(
            self,
            alias,
            ContractFunc[T, U](  # type: ignore
                func=FuncSignature[T, U](name=name, alias=alias),  # type: ignore
                contract=self,
            ),
        )

address instance-attribute #

address

code_override class-attribute instance-attribute #

code_override = Field(default=None)

functions class-attribute instance-attribute #

functions = Field(default_factory=list)

sync property #

sync

model_config class-attribute instance-attribute #

model_config = ConfigDict(extra='allow')

get_pool class-attribute instance-attribute #

get_pool = METHOD

model_post_init #

model_post_init(__context)
Source code in eth_rpc/_request.py
def model_post_init(self, __context):
    network = self.__class__._network
    object.__setattr__(self, "_network", network)
    # overwrite the .rpc() classmethod
    object.__setattr__(self, "rpc", self._rpc)

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

add_func #

add_func(func)
Source code in eth_rpc/contract/contract.py
def add_func(self, func: "FuncSignature"):
    if func not in self.functions:
        self.functions.append(ContractFunc(func=func, contract=self))

get_storage_at #

get_storage_at(*, slot, block_number='latest', sync=False)
Source code in eth_rpc/contract/contract.py
def get_storage_at(
    self, *, slot: int | HexStr, block_number="latest", sync: bool = False
) -> MaybeAwaitable[HexStr]:
    return run(
        self._get_storage_at,
        slot=slot,
        block_number=block_number,
        sync=sync,
    )

get_code #

get_code(*, block_number=None, block_hash=None, sync=False)
Source code in eth_rpc/contract/contract.py
def get_code(
    self,
    *,
    block_number: int | BLOCK_STRINGS | None = None,
    block_hash: HexStr | None = None,
    sync: bool = False,
) -> MaybeAwaitable[HexStr]:
    return run(
        self._get_code,
        block_number=block_number,
        block_hash=block_hash,
        sync=sync,
    )

create2 #

create2(salt, keccak_init_code)

EIP-104 https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1014.md

Source code in eth_rpc/contract/contract.py
def create2(self, salt: bytes, keccak_init_code: bytes) -> HexAddress:
    """
    EIP-104
    https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1014.md
    """
    pre = "0xff"
    b_pre = bytes.fromhex(pre[2:])
    b_address = bytes.fromhex(self.address[2:])

    b_result = keccak_256(b_pre + b_address + salt + keccak_init_code)
    result_address = "0x" + b_result[12:].hex()

    return HexAddress(HexStr(result_address))

NonfungiblePositionManager #

NonfungiblePositionManager(**kwargs)

Bases: ProtocolBase

This is how we make the ProtocolBase inject the ContractFunc via the type signature. After a lot of research, this goes slightly outside of the bounds for type hinting, but I think the improved expressiveness makes it fully worthwhile.

Source code in eth_rpc/contract/base.py
def __init__(self, **kwargs):
    """
    This is how we make the ProtocolBase inject the ContractFunc via the type signature.
    After a lot of research, this goes slightly outside of the bounds for type hinting,
    but I think the improved expressiveness makes it fully worthwhile.
    """
    super().__init__(**kwargs)

    for alias, func in self._func_sigs.items():
        name = alias
        if is_annotation(func):
            annotation_args = get_args(func)
            args = annotation_args[0]
            for annotation in annotation_args:
                if isinstance(annotation, Name):
                    name = annotation.value
        else:
            args = func
        T, U = get_args(args)

        setattr(
            self,
            alias,
            ContractFunc[T, U](  # type: ignore
                func=FuncSignature[T, U](name=name, alias=alias),  # type: ignore
                contract=self,
            ),
        )

address instance-attribute #

address

code_override class-attribute instance-attribute #

code_override = Field(default=None)

functions class-attribute instance-attribute #

functions = Field(default_factory=list)

sync property #

sync

model_config class-attribute instance-attribute #

model_config = ConfigDict(extra='allow')

balance_of class-attribute instance-attribute #

balance_of = METHOD

token_of_owner_by_index class-attribute instance-attribute #

token_of_owner_by_index = METHOD

positions class-attribute instance-attribute #

positions = METHOD

model_post_init #

model_post_init(__context)
Source code in eth_rpc/_request.py
def model_post_init(self, __context):
    network = self.__class__._network
    object.__setattr__(self, "_network", network)
    # overwrite the .rpc() classmethod
    object.__setattr__(self, "rpc", self._rpc)

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

add_func #

add_func(func)
Source code in eth_rpc/contract/contract.py
def add_func(self, func: "FuncSignature"):
    if func not in self.functions:
        self.functions.append(ContractFunc(func=func, contract=self))

get_storage_at #

get_storage_at(*, slot, block_number='latest', sync=False)
Source code in eth_rpc/contract/contract.py
def get_storage_at(
    self, *, slot: int | HexStr, block_number="latest", sync: bool = False
) -> MaybeAwaitable[HexStr]:
    return run(
        self._get_storage_at,
        slot=slot,
        block_number=block_number,
        sync=sync,
    )

get_code #

get_code(*, block_number=None, block_hash=None, sync=False)
Source code in eth_rpc/contract/contract.py
def get_code(
    self,
    *,
    block_number: int | BLOCK_STRINGS | None = None,
    block_hash: HexStr | None = None,
    sync: bool = False,
) -> MaybeAwaitable[HexStr]:
    return run(
        self._get_code,
        block_number=block_number,
        block_hash=block_hash,
        sync=sync,
    )

create2 #

create2(salt, keccak_init_code)

EIP-104 https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1014.md

Source code in eth_rpc/contract/contract.py
def create2(self, salt: bytes, keccak_init_code: bytes) -> HexAddress:
    """
    EIP-104
    https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1014.md
    """
    pre = "0xff"
    b_pre = bytes.fromhex(pre[2:])
    b_address = bytes.fromhex(self.address[2:])

    b_result = keccak_256(b_pre + b_address + salt + keccak_init_code)
    result_address = "0x" + b_result[12:].hex()

    return HexAddress(HexStr(result_address))

get_all_indices async #

get_all_indices(owner)
Source code in eth_typeshed/uniswap_v3/nonfungible_position_manager.py
async def get_all_indices(self, owner: HexAddress) -> list[int]:
    balance = await self.balance_of(OwnerRequest(owner=owner)).get()
    calls = []
    for i in range(balance):
        calls.append(
            self.token_of_owner_by_index(
                OwnerTokenRequest(
                    owner=owner,
                    index=primitives.uint256(i),
                )
            )
        )
    return await multicall.execute(*calls)

get_all_positions async #

get_all_positions(owner)
Source code in eth_typeshed/uniswap_v3/nonfungible_position_manager.py
async def get_all_positions(self, owner: HexAddress) -> list[Position]:
    indices = await self.get_all_indices(owner)
    return await multicall.execute(
        *[self.positions(primitives.uint256(index)) for index in indices]
    )

ProcessedTick #

Bases: BaseModel

index instance-attribute #

index

tick instance-attribute #

tick

token0 instance-attribute #

token0

token1 instance-attribute #

token1

active_liquidity class-attribute instance-attribute #

active_liquidity = 0

liquidity_net property #

liquidity_net

price0 property #

price0

price1 property #

price1

Slot0 #

Bases: BaseModel

sqrt_price class-attribute instance-attribute #

sqrt_price = Field(serialization_alias='sqrtPriceX96')

tick instance-attribute #

tick

observation_index class-attribute instance-attribute #

observation_index = Field(
    serialization_alias="observationIndex"
)

observation_cardinality class-attribute instance-attribute #

observation_cardinality = Field(
    serialization_alias="observationCardinality"
)

observation_cardinality_next class-attribute instance-attribute #

observation_cardinality_next = Field(
    serialization_alias="observationCardinality"
)

fee_protocol class-attribute instance-attribute #

fee_protocol = Field(serialization_alias='feeProtocol')

unlocked instance-attribute #

unlocked

Tick #

Bases: BaseModel

liquidity_gross instance-attribute #

liquidity_gross

liquidity_net instance-attribute #

liquidity_net

fee_growth_outside0 instance-attribute #

fee_growth_outside0

fee_growth_outside1 instance-attribute #

fee_growth_outside1

tick_cumulative_outside instance-attribute #

tick_cumulative_outside

seconds_per_liquidity_outside instance-attribute #

seconds_per_liquidity_outside

seconds_outside instance-attribute #

seconds_outside

initialized instance-attribute #

initialized

UniswapV3Pool #

UniswapV3Pool(**kwargs)

Bases: ProtocolBase

This is how we make the ProtocolBase inject the ContractFunc via the type signature. After a lot of research, this goes slightly outside of the bounds for type hinting, but I think the improved expressiveness makes it fully worthwhile.

Source code in eth_rpc/contract/base.py
def __init__(self, **kwargs):
    """
    This is how we make the ProtocolBase inject the ContractFunc via the type signature.
    After a lot of research, this goes slightly outside of the bounds for type hinting,
    but I think the improved expressiveness makes it fully worthwhile.
    """
    super().__init__(**kwargs)

    for alias, func in self._func_sigs.items():
        name = alias
        if is_annotation(func):
            annotation_args = get_args(func)
            args = annotation_args[0]
            for annotation in annotation_args:
                if isinstance(annotation, Name):
                    name = annotation.value
        else:
            args = func
        T, U = get_args(args)

        setattr(
            self,
            alias,
            ContractFunc[T, U](  # type: ignore
                func=FuncSignature[T, U](name=name, alias=alias),  # type: ignore
                contract=self,
            ),
        )

address instance-attribute #

address

code_override class-attribute instance-attribute #

code_override = Field(default=None)

functions class-attribute instance-attribute #

functions = Field(default_factory=list)

sync property #

sync

model_config class-attribute instance-attribute #

model_config = ConfigDict(extra='allow')

tick_bitmap class-attribute instance-attribute #

tick_bitmap = METHOD

fee class-attribute instance-attribute #

fee = METHOD

slot0 class-attribute instance-attribute #

slot0 = METHOD

token0 class-attribute instance-attribute #

token0 = METHOD

token1 class-attribute instance-attribute #

token1 = METHOD

ticks class-attribute instance-attribute #

ticks = METHOD

liquidity class-attribute instance-attribute #

liquidity = METHOD

tick_spacing class-attribute instance-attribute #

tick_spacing = METHOD

fee_growth_global0 class-attribute instance-attribute #

fee_growth_global0 = METHOD

fee_growth_global1 class-attribute instance-attribute #

fee_growth_global1 = METHOD

model_post_init #

model_post_init(__context)
Source code in eth_rpc/_request.py
def model_post_init(self, __context):
    network = self.__class__._network
    object.__setattr__(self, "_network", network)
    # overwrite the .rpc() classmethod
    object.__setattr__(self, "rpc", self._rpc)

rpc classmethod #

rpc()

This uses the default network, unless a network has been provided

Source code in eth_rpc/_request.py
@classmethod
def rpc(cls) -> "RPC":
    """
    This uses the default network, unless a network has been provided
    """
    from ._transport import _force_get_global_rpc

    if cls._network is None:
        return _force_get_global_rpc()
    response = _force_get_global_rpc(cls._network)
    return response

add_func #

add_func(func)
Source code in eth_rpc/contract/contract.py
def add_func(self, func: "FuncSignature"):
    if func not in self.functions:
        self.functions.append(ContractFunc(func=func, contract=self))

get_storage_at #

get_storage_at(*, slot, block_number='latest', sync=False)
Source code in eth_rpc/contract/contract.py
def get_storage_at(
    self, *, slot: int | HexStr, block_number="latest", sync: bool = False
) -> MaybeAwaitable[HexStr]:
    return run(
        self._get_storage_at,
        slot=slot,
        block_number=block_number,
        sync=sync,
    )

get_code #

get_code(*, block_number=None, block_hash=None, sync=False)
Source code in eth_rpc/contract/contract.py
def get_code(
    self,
    *,
    block_number: int | BLOCK_STRINGS | None = None,
    block_hash: HexStr | None = None,
    sync: bool = False,
) -> MaybeAwaitable[HexStr]:
    return run(
        self._get_code,
        block_number=block_number,
        block_hash=block_hash,
        sync=sync,
    )

create2 #

create2(salt, keccak_init_code)

EIP-104 https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1014.md

Source code in eth_rpc/contract/contract.py
def create2(self, salt: bytes, keccak_init_code: bytes) -> HexAddress:
    """
    EIP-104
    https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1014.md
    """
    pre = "0xff"
    b_pre = bytes.fromhex(pre[2:])
    b_address = bytes.fromhex(self.address[2:])

    b_result = keccak_256(b_pre + b_address + salt + keccak_init_code)
    result_address = "0x" + b_result[12:].hex()

    return HexAddress(HexStr(result_address))

get_price async #

get_price(token0=True)
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_price(self, token0: bool = True):
    slot0 = await self.slot0().get()
    _token0 = await self.token0().get()
    _token1 = await self.token1().get()
    token0_decimals = await ERC20(address=_token0).decimals().get()
    token1_decimals = await ERC20(address=_token1).decimals().get()
    return self.sqrt_price_x96_to_token_prices(
        slot0.sqrt_price,
        token0_decimals,
        token1_decimals,
        token0,
    )

sqrt_price_x96_to_token_prices staticmethod #

sqrt_price_x96_to_token_prices(
    sqrt_price_x96,
    token0_decimals,
    token1_decimals,
    token0=True,
)
Source code in eth_typeshed/uniswap_v3/pool.py
@staticmethod
def sqrt_price_x96_to_token_prices(
    sqrt_price_x96: int,
    token0_decimals: int,
    token1_decimals: int,
    token0: bool = True,
) -> Decimal:
    num = Decimal(sqrt_price_x96 * sqrt_price_x96)
    price1 = (
        (num / Q192)
        * (Decimal(10) ** token0_decimals)
        / (Decimal(10) ** token1_decimals)
    )
    price0 = Decimal(1) / price1

    return price0 if token0 else price1

all_tick_values async #

all_tick_values()
Source code in eth_typeshed/uniswap_v3/pool.py
async def all_tick_values(self):
    tick_spacing = await self.get_tick_spacing()
    min_tick, max_tick = await self.tick_range()
    return list(range(min_tick, max_tick, tick_spacing))

get_token0 async #

get_token0()
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_token0(self):
    if not self._token0:
        self._token0 = ERC20(await self.token0().get())
    return self._token0

get_token1 async #

get_token1()
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_token1(self):
    if not self._token1:
        self._token1 = ERC20(await self.token1().get())
    return self._token1

get_tick_spacing async #

get_tick_spacing()
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_tick_spacing(self):
    if not self._tick_spacing:
        self._tick_spacing = await self.tick_spacing().get()
    return self._tick_spacing

get_swaps async #

get_swaps(start_block, end_block)
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_swaps(
    self, start_block, end_block
) -> dict[int, list[V3SwapEventType]]:
    ticks: dict[int, list[V3SwapEventType]] = {}
    tick_values = await self.all_tick_values()
    async for event in V3SwapEvent.set_filter(addresses=[self.address]).backfill(
        start_block, end_block
    ):
        tick = event.event.tick
        closest_tick = max(t for t in tick_values if t <= tick)
        if tick in ticks:
            ticks[closest_tick].append(event.event)
        else:
            ticks[closest_tick] = [event.event]
    return ticks

get_ticks async #

get_ticks(indices)
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_ticks(self, indices: Iterable[int]) -> list[ProcessedTick]:
    calls = []
    for tick in indices:
        calls.append(self.ticks(primitives.int24(tick)))
    ticks: list[Tick] = await multicall.execute(*calls)

    processed_ticks = []
    for idx, tick_data in zip(indices, ticks):
        processed_ticks.append(
            ProcessedTick(
                index=idx,
                tick=tick_data,
                token0=await self.get_token0(),
                token1=await self.get_token1(),
            )
        )
    return processed_ticks

tvl async #

tvl(block_number=None)
Source code in eth_typeshed/uniswap_v3/pool.py
async def tvl(self, block_number=None) -> list[ProcessedTick]:
    liquidity, initialized_ticks, active_tick_index = await asyncio.gather(
        self.liquidity().get(),
        self.get_initialized_ticks(),
        self.active_tick(),
    )
    ticks = await self.get_ticks(initialized_ticks)

    if active_tick_index in initialized_ticks:
        active_tick = [t for t in ticks if t.index == active_tick_index][0]
        active_tick.active_liquidity = liquidity
        current_liquidity = liquidity - active_tick.liquidity_net
        previous_tick_net = active_tick.liquidity_net
    else:
        current_liquidity = liquidity
        previous_tick_net = 0

    for tick in [tick for tick in ticks if tick.index < active_tick_index][::-1]:
        tick.active_liquidity = current_liquidity - previous_tick_net
        current_liquidity = tick.active_liquidity
        previous_tick_net = tick.liquidity_net

    current_liquidity = liquidity
    previous_tick_net = 0
    for tick in [tick for tick in ticks if tick.index > active_tick_index]:
        tick.active_liquidity = current_liquidity + tick.liquidity_net
        current_liquidity = tick.active_liquidity
    return ticks

get_tick_range async #

get_tick_range(lower, upper)
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_tick_range(self, lower: int, upper: int) -> list[ProcessedTick]:
    if not self._tick_spacing:
        self._tick_spacing = await self.tick_spacing().get()
    assert self._tick_spacing
    return await self.get_ticks(range(lower, upper, self._tick_spacing))

tick_range async #

tick_range()
Source code in eth_typeshed/uniswap_v3/pool.py
async def tick_range(self):
    tick_spacing = await self.get_tick_spacing()
    min_tick = math.ceil(MIN_TICK / tick_spacing) * tick_spacing
    return min_tick, -min_tick

bitmap_range async #

bitmap_range()
Source code in eth_typeshed/uniswap_v3/pool.py
async def bitmap_range(self) -> tuple[int, int]:
    min_tick, max_tick = await self.tick_range()
    return (min_tick // 60 // 256), (max_tick // 60 // 256)

get_initialized_ticks async #

get_initialized_ticks()
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_initialized_ticks(self) -> list[int]:
    min_bitmap, max_bitmap = await self.bitmap_range()
    tick_spacing = await self.get_tick_spacing()
    bitmaps = await multicall.execute(
        *[
            self.tick_bitmap(primitives.int16(idx))
            for idx in range(min_bitmap, max_bitmap + 1)
        ]
    )
    initialized_ticks = []
    for idx, row in zip(range(min_bitmap, max_bitmap + 1), bitmaps):
        initialized_ticks.extend(tick_from_bitmap(idx, row, tick_spacing))
    return initialized_ticks

active_tick async #

active_tick()
Source code in eth_typeshed/uniswap_v3/pool.py
async def active_tick(self) -> int:
    slot0 = await self.slot0().get()
    if not self._tick_spacing:
        self._tick_spacing = await self.tick_spacing().get()
    return slot0.tick // self._tick_spacing * self._tick_spacing

liquidity_at_current_tick async #

liquidity_at_current_tick(tick)
Source code in eth_typeshed/uniswap_v3/pool.py
async def liquidity_at_current_tick(self, tick: int) -> tuple[float, float]:
    tick_spacing, slot0, liquidity = cast(
        tuple[int, Slot0, int],
        await multicall.execute(
            self.tick_spacing(),
            self.slot0(),
            self.liquidity(),
        ),
    )
    bottom_tick = tick
    top_tick = tick + tick_spacing

    price = tick_to_price(slot0.tick)
    sa = tick_to_price(bottom_tick // 2)
    sb = tick_to_price(top_tick // 2)
    sqrt_price = math.sqrt(price)

    amount0 = liquidity * (sb - sqrt_price) / (sqrt_price * sb)
    amount1 = liquidity * (sqrt_price - sa)

    return (amount0, amount1)

get_all_positions async #

get_all_positions()
Source code in eth_typeshed/uniswap_v3/pool.py
async def get_all_positions(self):
    events = []
    async for event in V3MintEvent.set_filter(addresses=[self.address]).backfill():
        events.append(event.event)
    return events