Skip to content

Commit 342fe54

Browse files
authored
feat(Flow): Provide an alternative for Mercury events using Obsrvr Flow extracted invocations (#28)
* feat(Flow): alternative contract invocation source * refactor: remove emails from load_sink_events * feat(Flow): hook up invocations loader * fix(Mercury): quantize sink events to kg resolution * test(Flow): test invocation source and loader * test(Flow): fix data fixture persistent/detached state * fix(Flow): postpone OBSRVR_FLOW_DB_URI evaluation
1 parent d32880f commit 342fe54

File tree

14 files changed

+532
-25
lines changed

14 files changed

+532
-25
lines changed

poetry.lock

Lines changed: 102 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ alembic = "^1.16.4"
2121
click = "^8.2.1"
2222
pygithub = "^2.7.0"
2323
pydantic-settings = "^2.10.1"
24+
psycopg = {extras = ["binary"], version = "^3.2.9"}
2425

2526
[tool.poetry.group.dev.dependencies]
2627
pytest = "^8.4.1"

sc_audit/cli.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
from sc_audit.loader.retirement_from_block import load_retirement_from_block
2222
from sc_audit.loader.retirements import load_retirements
2323
from sc_audit.loader.sink_events import load_sink_events
24+
from sc_audit.loader.sink_invocations import load_sink_invocations
2425
from sc_audit.loader.sink_status import load_sink_statuses
2526
from sc_audit.loader.sinking_txs import load_sinking_txs
2627
from sc_audit.sources.sink_events import MercuryError
28+
from sc_audit.sources.sink_invocations import ObsrvrError
2729
from sc_audit.views.inventory import view_inventory
2830
from sc_audit.views.retirement import view_retirements
2931
from sc_audit.views.sink_status import view_sinking_txs
@@ -181,12 +183,21 @@ def db_load_sinking_txs():
181183
sink_cursor: int = get_latest_attr('sink_tx') # type: ignore[return-value]
182184
num_sinking_txs = load_sinking_txs(cursor=sink_cursor)
183185
click.echo(f"Loaded {num_sinking_txs} sinking transactions")
186+
187+
# TODO: separate classic and SinkContract txs
188+
# attempt to load SinkContract txs from Obsrvr or Mercury
184189
try:
185-
num_sink_events, _ = load_sink_events(cursor=sink_cursor)
186-
print(f"Loaded {num_sink_events} sink events")
187-
except MercuryError as exc:
188-
click.echo(f"Couldn't load sink events from Mercury")
190+
num_sink_invocations = load_sink_invocations(cursor=sink_cursor)
191+
print(f"Loaded {num_sink_invocations} sink invocations")
192+
except ObsrvrError as exc:
193+
click.echo(f"Couldn't load sink invocations from Obsrvr")
189194
click.echo(repr(exc), err=True)
195+
try:
196+
num_sink_events = load_sink_events(cursor=sink_cursor)
197+
print(f"Loaded {num_sink_events} sink events")
198+
except MercuryError as exc:
199+
click.echo(f"Couldn't load sink events from Mercury")
200+
click.echo(repr(exc), err=True)
190201

191202

192203
@load.command(name="retirements")

sc_audit/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pathlib import Path
22
from typing import Literal
33

4-
from pydantic import HttpUrl, computed_field
4+
from pydantic import HttpUrl, PostgresDsn, computed_field
55
from pydantic_settings import BaseSettings, SettingsConfigDict
66
from stellar_sdk import Asset
77

@@ -54,6 +54,9 @@ class Settings(BaseSettings):
5454
RETROSHADES_MD5: str = "3dd6e7b71adb9fda05a5b45a154611f3"
5555
MERCURY_KEY: str = ""
5656

57+
OBSRVR_FLOW_DB_URI: PostgresDsn | None = None
58+
OBSRVR_FLOW_TABLE: str = "extracted_contract_invocations"
59+
5760
@computed_field
5861
@property
5962
def CARBON_ASSET(self) -> Asset:

sc_audit/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from decimal import Decimal
2+
3+
4+
KG = Decimal("0.001")
5+
UNIT_IN_STROOPS = Decimal(10_000_000)

sc_audit/loader/__main__.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
from sc_audit.loader.retirement_from_block import load_retirement_from_block
2424
from sc_audit.loader.retirements import load_retirements
2525
from sc_audit.loader.sink_events import load_sink_events
26+
from sc_audit.loader.sink_invocations import load_sink_invocations
2627
from sc_audit.loader.sink_status import load_sink_statuses
2728
from sc_audit.loader.sinking_txs import load_sinking_txs
2829
from sc_audit.sources.sink_events import MercuryError
30+
from sc_audit.sources.sink_invocations import ObsrvrError
2931

3032

3133
def catch_up_from_sources():
@@ -51,12 +53,20 @@ def catch_up_from_sources():
5153
print(f"Loaded {num_distribution_txs} distribution outflows")
5254
num_sinking_txs = load_sinking_txs(cursor=sink_cursor)
5355
print(f"Loaded {num_sinking_txs} sinking transactions")
56+
57+
# attempt to load SinkContract txs from Obsrvr or Mercury
5458
try:
55-
num_sink_events, _ = load_sink_events(cursor=sink_cursor)
56-
print(f"Loaded {num_sink_events} sink events")
57-
except MercuryError as exc:
58-
print(f"Couldn't load sink events from Mercury")
59+
num_sink_invocations = load_sink_invocations(cursor=sink_cursor)
60+
print(f"Loaded {num_sink_invocations} sink invocations")
61+
except ObsrvrError as exc:
62+
print(f"Couldn't load sink invocations from Obsrvr")
5963
print(repr(exc), file=sys.stderr)
64+
try:
65+
num_sink_events = load_sink_events(cursor=sink_cursor)
66+
print(f"Loaded {num_sink_events} sink events")
67+
except MercuryError as exc:
68+
print(f"Couldn't load sink events from Mercury")
69+
print(repr(exc), file=sys.stderr)
6070

6171
num_minting_txs = load_minted_blocks(cursor=mint_cursor)
6272
print(f"Loaded {num_minting_txs} minted blocks")

sc_audit/loader/sink_events.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from sc_audit.sources.sink_events import get_sink_events
1616

1717

18-
def load_sink_events(cursor: int=settings.FIRST_SINK_CURSOR) -> tuple[int, list[tuple[str, str]]]:
18+
def load_sink_events(cursor: int=settings.FIRST_SINK_CURSOR) -> int:
1919
"""
2020
Load (all) sink events from Mercury Retroshades into the DB.
2121
@@ -29,17 +29,13 @@ def load_sink_events(cursor: int=settings.FIRST_SINK_CURSOR) -> tuple[int, list[
2929
separately.
3030
"""
3131
number_loaded = 0
32-
recipient_emails: dict[str, str] = {}
3332

3433
with Session.begin() as session:
3534
last_ledger = 0
3635
# For each new ledger, reset tx_order to 2^16 to avoid collisions with SinkingTxs.
3736
# This is a pragmatic choice; ideally, the order should come from Retroshades.
3837
tx_order = first_tx_index = 2**16
3938
for sink_event in get_sink_events(cursor):
40-
if sink_event.email:
41-
recipient_emails[sink_event.recipient] = sink_event.email
42-
4339
vcs_project_id = try_project_id(sink_event.project_id)
4440

4541
# FIXME: the TOID should come from Retroshades if possible
@@ -78,7 +74,7 @@ def load_sink_events(cursor: int=settings.FIRST_SINK_CURSOR) -> tuple[int, list[
7874

7975
number_loaded = len(session.new)
8076

81-
return number_loaded, list(recipient_emails.items())
77+
return number_loaded
8278

8379

8480
@lru_cache(maxsize=32)

0 commit comments

Comments
 (0)