Skip to content

Commit adb2c1c

Browse files
authored
fix: Handle duplicate key violation in voter contributions | NPG-000 (#662)
# Description Changes how voter contribution is uploaded to DB by keeping track of the unique key constraint tuple, and processing the network with the highest priority first. Duplicate records are ignored and an error is logged. break: CLI option for `network_ids` help docs updated to specify the valid network names. ## Type of change Please delete options that are not relevant. - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [x] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## Checklist - [x] My code follows the style guidelines of this project - [x] I have performed a self-review of my code - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [ ] Any dependent changes have been merged and published in downstream modules
1 parent f9cce5c commit adb2c1c

File tree

2 files changed

+50
-15
lines changed

2 files changed

+50
-15
lines changed

utilities/ideascale-importer/ideascale_importer/cli/snapshot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def import_snapshot(
1919
network_ids: List[str] = typer.Option(
2020
...,
2121
envvar="SNAPSHOT_NETWORK_IDS",
22-
help="Network id to pass as parameter to snapshot_tool",
22+
help=("Network id to pass as parameter to snapshot_tool. Valid values are: 'mainnet' 'preprod' 'testnet'"),
2323
),
2424
snapshot_tool_path: str = typer.Option(default="snapshot_tool", envvar="SNAPSHOT_TOOL_PATH", help="Path to the snapshot tool"),
2525
catalyst_toolbox_path: str = typer.Option(

utilities/ideascale-importer/ideascale_importer/snapshot_importer.py

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@
77
import json
88
import os
99
import re
10-
from typing import Dict, List, Tuple, Optional
10+
from typing import Dict, List, Literal, Set, Tuple, Optional
1111
from loguru import logger
1212
from pydantic import BaseModel
1313

14-
from ideascale_importer.gvc import Client as GvcClient
1514
import ideascale_importer.db
1615
from ideascale_importer.db import models
1716
from ideascale_importer.utils import run_cmd
@@ -59,7 +58,7 @@ class Registration(BaseModel):
5958
class CatalystToolboxDreps(BaseModel):
6059
"""Represents the input format of the dreps file of catalyst-toolbox."""
6160

62-
reps: List[str]
61+
reps: List[str] = []
6362

6463

6564
class OutputDirectoryDoesNotExist(Exception):
@@ -288,7 +287,7 @@ def __init__(
288287
eventdb_url: str,
289288
event_id: int,
290289
output_dir: str,
291-
network_ids: List[str],
290+
network_ids: List[Literal["mainnet", "preprod", "testnet"]],
292291
snapshot_tool_path: str,
293292
catalyst_toolbox_path: str,
294293
gvc_api_url: str,
@@ -321,7 +320,7 @@ def __init__(
321320
self.dreps_json = "[]"
322321

323322
if not os.path.exists(output_dir):
324-
raise OutputDirectoryDoesNotExist(output_dir)
323+
os.makedirs(output_dir)
325324

326325
self.output_dir = output_dir
327326

@@ -397,6 +396,12 @@ async def _run_snapshot_tool(self):
397396

398397
params = self.network_params[network_id]
399398

399+
match network_id:
400+
case "mainnet":
401+
snapshot_net = "mainnet"
402+
case _:
403+
snapshot_net = "testnet"
404+
400405
if self.ssh_config is None:
401406
snapshot_tool_cmd = (
402407
f"{self.snapshot_tool_path}"
@@ -405,7 +410,7 @@ async def _run_snapshot_tool(self):
405410
f" --db-host {db_host}"
406411
f" --db {db_name}"
407412
f" --min-slot 0 --max-slot {params.registration_snapshot_slot}"
408-
f" --network-id {network_id}"
413+
f" --network-id {snapshot_net}"
409414
f" --out-file {params.snapshot_tool_out_file}"
410415
)
411416

@@ -473,12 +478,16 @@ async def _run_catalyst_toolbox_snapshot(self):
473478
)
474479

475480
for network_id, params in self.network_params.items():
481+
discr = "test"
482+
if network_id == "main" or network_id == "mainnet":
483+
discr = "production"
476484
with logger.contextualize(network_id=network_id):
477485
catalyst_toolbox_cmd = (
478486
f"{self.catalyst_toolbox_path} snapshot"
479487
f" -s {params.snapshot_tool_out_file}"
480488
f" -m {self.event_parameters.min_stake_threshold}"
481489
f" -v {self.event_parameters.voting_power_cap}"
490+
f" -d {discr}"
482491
f" {params.catalyst_toolbox_out_file}"
483492
)
484493

@@ -681,8 +690,9 @@ async def _write_db_data(self):
681690

682691
voters: Dict[str, models.Voter] = {}
683692
contributions: List[models.Contribution] = []
693+
uniq_contrib_keys: Set[Tuple[str, str, str]] = set([])
684694

685-
for network_id, network_processed_snapshot in catalyst_toolbox_data.items():
695+
async def process_voters(network_id, network_processed_snapshot):
686696
network_report = network_snapshot_reports[network_id]
687697

688698
for ctd in network_processed_snapshot.voters:
@@ -694,19 +704,19 @@ async def _write_db_data(self):
694704
# This can be removed once it's fixed in catalyst-toolbox
695705
if not voting_key.startswith("0x"):
696706
voting_key = "0x" + voting_key
707+
stake_public_key = snapshot_contribution.stake_public_key
708+
voting_group = ctd.hir.voting_group
697709

698-
delegation_data = registration_delegation_data[network_id][
699-
f"{snapshot_contribution.stake_public_key}{voting_key}"
700-
]
710+
delegation_data = registration_delegation_data[network_id][f"{stake_public_key}{voting_key}"]
701711

702712
contribution = models.Contribution(
703-
stake_public_key=snapshot_contribution.stake_public_key,
713+
stake_public_key=stake_public_key,
704714
snapshot_id=0,
705715
voting_key=voting_key,
706716
voting_weight=delegation_data["voting_weight"],
707717
voting_key_idx=delegation_data["voting_key_idx"],
708718
value=snapshot_contribution.value,
709-
voting_group=ctd.hir.voting_group,
719+
voting_group=voting_group,
710720
reward_address=snapshot_contribution.reward_address,
711721
)
712722

@@ -717,8 +727,23 @@ async def _write_db_data(self):
717727
voting_power=ctd.hir.voting_power,
718728
)
719729

720-
contributions.append(contribution)
721-
voters[f"{voter.voting_key}{voter.voting_group}"] = voter
730+
# uniq_key that mirrors the unique key constraint in the DB
731+
uniq_key = (stake_public_key, voting_key, voting_group)
732+
733+
# Add uniq_key if not already present, and append
734+
# contribution and voter models.
735+
if uniq_key not in uniq_contrib_keys:
736+
uniq_contrib_keys.add(uniq_key)
737+
contributions.append(contribution)
738+
voters[f"{voter.voting_key}{voter.voting_group}"] = voter
739+
else:
740+
logger.error(
741+
"Duplicate unique contribution key found, ignoring voter contribution",
742+
network_id=network_id,
743+
uniq_key=str(uniq_key),
744+
contribution=str(contribution),
745+
voter=str(voter),
746+
)
722747

723748
await asyncio.sleep(0)
724749

@@ -743,6 +768,16 @@ async def _write_db_data(self):
743768
total_unique_rewards=len(network_report.unique_rewards),
744769
)
745770

771+
# Process the snapshot from the highest_priority_network first to get the
772+
# uniq_contrib_keys.
773+
if highest_priority_network in catalyst_toolbox_data:
774+
network_processed_snapshot = catalyst_toolbox_data.pop(highest_priority_network)
775+
await process_voters(highest_priority_network, network_processed_snapshot)
776+
777+
# Process the rest of the network data.
778+
for network_id, network_processed_snapshot in catalyst_toolbox_data.items():
779+
await process_voters(network_id, network_processed_snapshot)
780+
746781
conn = await ideascale_importer.db.connect(self.eventdb_url)
747782

748783
async with conn.transaction():

0 commit comments

Comments
 (0)