Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion v4-client-py-v2/dydx_v4_client/config.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
GAS_MULTIPLIER = 1.4
GAS_MULTIPLIER = 1.7
17 changes: 15 additions & 2 deletions v4-client-py-v2/dydx_v4_client/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def make_config(
make_secure = partial(make_config, secure_channel)
make_insecure = partial(make_config, insecure_channel)


mainnet_node = partial(
NodeConfig,
"dydx-mainnet-1",
Expand All @@ -56,6 +55,21 @@ def make_config(
make_mainnet = partial(make_secure, mainnet_node)


staging_node = partial(
NodeConfig,
"dydxprotocol-testnet",
chaintoken_denom="adv4tnt",
usdc_denom="ibc/8E27BA2D5493AF5636760E354E46004562C46AB7EC0CC4C1CA14E9E20E2545B5",
)
make_staging = partial(
make_secure,
staging_node,
rest_indexer="https://indexer.v4staging.dydx.exchange/",
websocket_indexer="wss://indexer.v4staging.dydx.exchange/v4/ws",
node_url="validator.v4staging.dydx.exchange",
)
STAGING = make_staging()

testnet_node = partial(
NodeConfig,
"dydx-testnet-4",
Expand All @@ -73,7 +87,6 @@ def make_config(
TESTNET_FAUCET = "https://faucet.v4testnet.dydx.exchange"
TESTNET_NOBLE = "https://rpc.testnet.noble.strange.love"


local_node = partial(
NodeConfig,
"localdydxprotocol",
Expand Down
137 changes: 137 additions & 0 deletions v4-client-py-v2/dydx_v4_client/node/authenticators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
__all__ = [
"SubAuthenticator",
"AuthenticatorType",
"Authenticator",
"composed_authenticator",
"signature_verification",
"message_filter",
"subaccount_filter",
"clob_pair_id_filter",
"decode_authenticator",
]

import base64
import binascii
import json
from typing import Union

from typing_extensions import Literal, TypedDict


class SubAuthenticator(TypedDict):
type: str
config: str


AuthenticatorType = Literal["AllOf", "AnyOf"]


class Authenticator(TypedDict):
type: AuthenticatorType
config: list[SubAuthenticator]


def b64encode(value: bytes) -> str:
"""Encodes bytes into a base64 string."""
return base64.b64encode(value).decode()


def b64decode(value: str) -> bytes:
"""Decodes a base64 string into bytes."""
return base64.b64decode(value)


def decode_authenticator(config: str, auth_type: str) -> Union[SubAuthenticator, Authenticator]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function used anywhere?
I'd prefer not to introduce redundant code.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very helpful when you want to validate what types of authenticators are added to the account, since it handles nested structure of the sub-authenticators as well performs decoding of the base64 encoded values.

"""Decodes a sub-authenticator configuration."""
if auth_type == "SignatureVerification":
return {
"type": auth_type,
"config": b64decode(config),
}
elif auth_type == "MessageFilter":
return {
"type": auth_type,
"config": b64decode(config).decode(),
}
elif auth_type == "SubaccountFilter":
return {
"type": auth_type,
"config": int.from_bytes(b64decode(config)),
}
elif auth_type == "ClobPairIdFilter":
return {
"type": auth_type,
"config": int.from_bytes(b64decode(config)),
}
elif auth_type in ["AllOf", "AnyOf"]:
decoded_subs = []
try:
subauth_config = json.loads(b64decode(config))
except (binascii.Error, UnicodeDecodeError):
subauth_config = json.loads(config)
for subauth in subauth_config:
decoded_sub = decode_authenticator(subauth['config'], subauth['type'])
decoded_subs.append(decoded_sub)
return {
"type": auth_type,
"config": decoded_subs,
}
else:
raise ValueError(f"Unknown SubAuthenticator type: {auth_type}")


def _prepare_authenticator(
auth: Union[Authenticator, SubAuthenticator]
) -> SubAuthenticator:
""" Converts Authenticator to SubAuthenticator to allow composition. """
if isinstance(auth["config"], list):
return {
"type": auth["type"],
"config": b64encode(json.dumps(auth["config"]).encode())
}
return auth


def composed_authenticator(
authenticator_type: AuthenticatorType,
sub_authenticators: list[Union[Authenticator, SubAuthenticator]],
) -> Authenticator:
"""Combines multiple sub-authenticators into a single one."""
dumped_subs = [_prepare_authenticator(sa) for sa in sub_authenticators]
return {
"type": authenticator_type,
"config": dumped_subs,
}


def signature_verification(pub_key: bytes) -> SubAuthenticator:
"""Enables authentication via a specific key."""
return {
"type": "SignatureVerification",
"config": b64encode(pub_key),
}


def message_filter(msg_type: str) -> SubAuthenticator:
"""Restricts authentication to certain message types."""
assert msg_type.startswith("/"), msg_type
return {
"type": "MessageFilter",
"config": b64encode(msg_type.encode()),
}


def subaccount_filter(subaccount_id: int) -> SubAuthenticator:
"""Restricts authentication to certain subaccount constraints."""
return {
"type": "SubaccountFilter",
"config": b64encode(subaccount_id.to_bytes()),
}


def clob_pair_id_filter(clob_pair_id: int) -> SubAuthenticator:
"""Restricts transactions to specific CLOB pair IDs."""
return {
"type": "ClobPairIdFilter",
"config": b64encode(clob_pair_id.to_bytes()),
}
53 changes: 46 additions & 7 deletions v4-client-py-v2/dydx_v4_client/node/builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List
from typing import List, Optional

import google
from google.protobuf.message import Message
Expand All @@ -17,6 +17,7 @@

from dydx_v4_client.node.fee import calculate_fee, Denom
from dydx_v4_client.wallet import Wallet
from v4_proto.dydxprotocol.accountplus.tx_pb2 import TxExtension


def as_any(message: Message):
Expand Down Expand Up @@ -50,6 +51,13 @@ def get_signature(key_pair, body, auth_info, account_number, chain_id):
)


@dataclass
class TxOptions:
authenticator_id: int
sequence: int
account_number: int


@dataclass
class Builder:
chain_id: str
Expand All @@ -69,17 +77,48 @@ def fee(self, gas_limit: int, *amount: List[Coin]) -> Fee:
gas_limit=gas_limit,
)

def build_transaction(self, wallet: Wallet, messages: List[Message], fee: Fee):
body = TxBody(messages=messages, memo=self.memo)
def build_transaction(
self,
wallet: Wallet,
messages: List[Message],
fee: Fee,
tx_options: Optional[TxOptions] = None,
):
non_critical_extension_options = []
if tx_options is not None:
tx_extension = TxExtension(
selected_authenticators=[tx_options.authenticator_id],
)
non_critical_extension_options.append(as_any(tx_extension))
body = TxBody(
messages=messages,
memo=self.memo,
non_critical_extension_options=non_critical_extension_options,
)
auth_info = AuthInfo(
signer_infos=[get_signer_info(wallet.public_key, wallet.sequence)],
signer_infos=[
get_signer_info(
wallet.public_key,
tx_options.sequence if tx_options else wallet.sequence,
)
],
fee=fee,
)
signature = get_signature(
wallet.key, body, auth_info, wallet.account_number, self.chain_id
wallet.key,
body,
auth_info,
tx_options.account_number if tx_options else wallet.account_number,
self.chain_id,
)

return Tx(body=body, auth_info=auth_info, signatures=[signature])

def build(self, wallet: Wallet, message: Message, fee: Fee = DEFAULT_FEE):
return self.build_transaction(wallet, [as_any(message)], fee)
def build(
self,
wallet: Wallet,
message: Message,
fee: Fee = DEFAULT_FEE,
tx_options: Optional[dict] = None,
):
return self.build_transaction(wallet, [as_any(message)], fee, tx_options)
Loading