Skip to content

Commit f87fb81

Browse files
authored
feat: Ingest SinkEvents as SinkingTxs (#21)
* feat: fetch sink events from retroshades * test: mock sink event source * test: patch mercury key * build: update dependencies * feat: add contract_id field to SinkingTx * feat: sink event loader (scaffolding) * fix: take tx_order from the 2^16 to 2^20 range * feat: dedicated impact project loader * fix: apply foreign key constraint in sqlite * test: interleaved sink events and classic txs * feat(cli): add sink events loader to CLI * perf: move test mappers to migrations.env
1 parent 3ceb239 commit f87fb81

28 files changed

+1483
-882
lines changed

poetry.lock

Lines changed: 799 additions & 804 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sc_audit/backup/dump.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def dump_table(db_model: type[ScBase] | str, output_path: Path | None = None) ->
2727
ndjson_table = table_to_ndjson(db_table)
2828
if output_path is not None:
2929
with output_path.open('w') as ofile:
30-
ofile.write(ndjson_table)
30+
ofile.write(ndjson_table + "\n")
3131
else:
3232
return ndjson_table
3333

sc_audit/backup/restore.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def df_from_ndjson(json_path: Path) -> pd.DataFrame:
7070
table_name = json_path.stem
7171
db_table = ScBase.metadata.tables[table_name]
7272
with open(json_path, 'r') as infile:
73-
ndjson = infile.read()
73+
ndjson = infile.read().rstrip("\n")
7474

7575
# turn json lines into a valid array
7676
json_array = "[" + ndjson.replace("\n", ",") + "]"

sc_audit/cli.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
from sc_audit.loader.__main__ import catch_up_from_sources
1717
from sc_audit.loader.distribution_outflows import load_distribution_txs
1818
from sc_audit.loader.get_latest import get_latest_attr
19+
from sc_audit.loader.impact_projects import load_impact_projects
1920
from sc_audit.loader.minted_blocks import load_minted_blocks
2021
from sc_audit.loader.retirement_from_block import load_retirement_from_block
2122
from sc_audit.loader.retirements import load_retirements
23+
from sc_audit.loader.sink_events import load_sink_events
2224
from sc_audit.loader.sink_status import load_sink_statuses
2325
from sc_audit.loader.sinking_txs import load_sinking_txs
26+
from sc_audit.sources.sink_events import MercuryError
2427
from sc_audit.views.inventory import view_inventory
2528
from sc_audit.views.retirement import view_retirements
2629
from sc_audit.views.sink_status import view_sinking_txs
@@ -149,6 +152,13 @@ def load():
149152
pass
150153

151154

155+
@load.command(name="impact-projects")
156+
def db_load_impact_projects():
157+
"""Load impact projects into the DB"""
158+
num_impact_projects = load_impact_projects()
159+
click.echo(f"Loaded {num_impact_projects} impact projects")
160+
161+
152162
@load.command(name="minted-blocks")
153163
def db_load_minted_blocks():
154164
"""Load minted blocks into the DB"""
@@ -171,6 +181,12 @@ def db_load_sinking_txs():
171181
sink_cursor = get_latest_attr('sink_tx')
172182
num_sinking_txs = load_sinking_txs(cursor=sink_cursor) # type: ignore[arg-type]
173183
click.echo(f"Loaded {num_sinking_txs} sinking transactions")
184+
try:
185+
num_sink_events = load_sink_events(cursor=sink_cursor) # type: ignore[arg-type]
186+
print(f"Loaded {num_sink_events} sink events")
187+
except MercuryError as exc:
188+
click.echo(f"Couldn't load sink events from Mercury")
189+
click.echo(repr(exc), err=True)
174190

175191

176192
@load.command(name="retirements")

sc_audit/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def get_default_db_path(db_name="sc-audit.sqlite3") -> Path:
3333

3434

3535
class Settings(BaseSettings):
36-
model_config = SettingsConfigDict(env_prefix='SC_')
36+
model_config = SettingsConfigDict(env_prefix='SC_', env_file=".env")
3737

3838
DBAPI_URL: str = f"sqlite+pysqlite:///{get_default_db_path()}"
3939
TABLE_PREFIX: DbTablePrefix = None
@@ -50,6 +50,10 @@ class Settings(BaseSettings):
5050
FIRST_MINT_CURSOR: int = 164806777139924992
5151
FIRST_DIST_CURSOR: int = 164810659791396865
5252

53+
RETROSHADES_URL: HttpUrl = HttpUrl("https://api.mercurydata.app/retroshadesv1")
54+
RETROSHADES_MD5: str = "571c515522665c5da4d18f7ccbf8eb3a"
55+
MERCURY_KEY: str = ""
56+
5357
@computed_field
5458
@property
5559
def CARBON_ASSET(self) -> Asset:

sc_audit/db_schema/sink.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class SinkingTxBase(MappedAsDataclass, kw_only=True):
2626

2727
hash: Mapped[hashpk]
2828
created_at: Mapped[dt.datetime]
29+
contract_id: Mapped[strkey | None]
2930
funder: Mapped[strkey]
3031
recipient: Mapped[strkey]
3132
carbon_amount: Mapped[kgdecimal]

sc_audit/loader/__main__.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
"""
22
Load all database records in the order:
3-
1. Retirements
4-
2. Sinking Transactions
5-
3. Minted Blocks
6-
4. Retirement from Block
7-
5. Sink Statuses
3+
1. Impact Projects
4+
2. Distribution Outflows
5+
3. Sinking Transactions
6+
4. Minted Blocks
7+
5. Retirements
8+
6. Retirement from Block
9+
7. Sink Statuses
810
911
Loading from scratch may fail if a Horizon instance with pruned history is selected.
1012
You may select another instance with the SC_HORIZON_URL env variable.
1113
1214
Author: Alex Olieman <https://keybase.io/alioli>
1315
"""
1416
import datetime as dt
17+
import sys
1518

1619
from sc_audit.loader.distribution_outflows import load_distribution_txs
1720
from sc_audit.loader.get_latest import get_latest_attr
21+
from sc_audit.loader.impact_projects import load_impact_projects
1822
from sc_audit.loader.minted_blocks import load_minted_blocks
1923
from sc_audit.loader.retirement_from_block import load_retirement_from_block
2024
from sc_audit.loader.retirements import load_retirements
25+
from sc_audit.loader.sink_events import load_sink_events
2126
from sc_audit.loader.sink_status import load_sink_statuses
2227
from sc_audit.loader.sinking_txs import load_sinking_txs
28+
from sc_audit.sources.sink_events import MercuryError
2329

2430

2531
def catch_up_from_sources():
@@ -39,10 +45,19 @@ def catch_up_from_sources():
3945
) # type: ignore[return-value]
4046

4147
print("Started catch-up from data sources...")
48+
num_impact_projects = load_impact_projects()
49+
print(f"Loaded {num_impact_projects} impact projects")
4250
num_distribution_txs = load_distribution_txs(cursor=dist_cursor)
4351
print(f"Loaded {num_distribution_txs} distribution outflows")
4452
num_sinking_txs = load_sinking_txs(cursor=sink_cursor)
4553
print(f"Loaded {num_sinking_txs} sinking transactions")
54+
try:
55+
num_sink_events = load_sink_events(cursor=sink_cursor) # type: ignore[arg-type]
56+
print(f"Loaded {num_sink_events} sink events")
57+
except MercuryError as exc:
58+
print(f"Couldn't load sink events from Mercury")
59+
print(repr(exc), file=sys.stderr)
60+
4661
num_minting_txs = load_minted_blocks(cursor=mint_cursor)
4762
print(f"Loaded {num_minting_txs} minted blocks")
4863
num_retirements = load_retirements(from_date=retirement_date)

sc_audit/loader/distribution_outflows.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
from decimal import Decimal
8+
89
from sc_audit.config import settings
910
from sc_audit.db_schema.distribution import DistributionTx
1011
from sc_audit.loader.utils import parse_iso_datetime

sc_audit/loader/impact_projects.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
Load impact projects into the DB.
3+
4+
Author: Alex Olieman <https://keybase.io/alioli>
5+
"""
6+
7+
from sqlalchemy import select
8+
from sc_audit.db_schema.base import intpk
9+
from sc_audit.db_schema.impact_project import VcsProject, get_vcs_project
10+
from sc_audit.session_manager import Session
11+
12+
13+
def load_impact_projects() -> int:
14+
"""
15+
Load impact projects from our own data schema into the DB.
16+
"""
17+
number_loaded = 0
18+
19+
with Session.begin() as session:
20+
existing_vcs_projects: set[intpk] = set(session.scalars(select(VcsProject.id)).all())
21+
if 1360 not in existing_vcs_projects:
22+
vcs_1360 = get_vcs_project(1360)
23+
session.add(vcs_1360)
24+
25+
number_loaded = len(session.new)
26+
27+
return number_loaded

sc_audit/loader/minted_blocks.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from sqlalchemy import select
1717

1818
from sc_audit.config import settings
19-
from sc_audit.db_schema.impact_project import UnknownVcsProject, VcsProject, get_vcs_project
19+
from sc_audit.db_schema.impact_project import UnknownVcsProject, get_vcs_project
2020
from sc_audit.db_schema.mint import MintedBlock, verra_carbon_pool
2121
from sc_audit.db_schema.retirement import Retirement
2222
from sc_audit.loader.utils import VcsSerialNumber, decode_hash_memo, parse_iso_datetime
@@ -36,21 +36,16 @@ def load_minted_blocks(cursor: int=settings.FIRST_MINT_CURSOR) -> int:
3636
pristine_blocks: list[MintedBlock] = []
3737
carbon_pool = index_carbon_pool()
3838
with Session.begin() as session:
39-
existing_vcs_projects: set[int] = set(session.scalars(select(VcsProject.id)).all())
4039
for mint_tx in get_minting_transactions(cursor):
41-
# ensure that the related VCS Project exists
40+
# ensure that the related VCS Project is available
4241
vcs_project_id = get_vcs_project_id(mint_tx)
43-
if vcs_project_id not in existing_vcs_projects:
44-
vcs_project = get_vcs_project(vcs_project_id)
45-
if vcs_project:
46-
existing_vcs_projects.add(vcs_project_id)
47-
session.add(vcs_project)
48-
else:
49-
raise UnknownVcsProject(
50-
f"VCS project {vcs_project_id} needs to be loaded before related transactions"
51-
" can be stored.",
52-
vcs_id=vcs_project_id
53-
)
42+
vcs_project = get_vcs_project(vcs_project_id)
43+
if not vcs_project:
44+
raise UnknownVcsProject(
45+
f"VCS project {vcs_project_id} needs to be loaded before related transactions"
46+
" can be stored.",
47+
vcs_id=vcs_project_id
48+
)
5449
# check whether the block is pristine
5550
serial_hash = decode_hash_memo(mint_tx['transaction']['memo'])
5651
if serial_hash in carbon_pool:
@@ -91,7 +86,7 @@ def load_minted_blocks(cursor: int=settings.FIRST_MINT_CURSOR) -> int:
9186
return len(retired_blocks) + len(pristine_blocks)
9287

9388

94-
def get_vcs_project_id(sinking_tx) -> Literal[1360]:
89+
def get_vcs_project_id(mint_tx) -> Literal[1360]:
9590
# TODO: support multiple VCS projects
9691
return 1360
9792

0 commit comments

Comments
 (0)