Skip to content

Commit 216448f

Browse files
committed
Fix deduplication bug
1 parent ba13ba3 commit 216448f

File tree

3 files changed

+11
-31
lines changed

3 files changed

+11
-31
lines changed

apps/kafka_streaming_loader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@
1313

1414
def get_block_hash(client: Client, raw_dataset: str, block_num: int) -> str:
1515
"""Get block hash from dataset.blocks table."""
16-
query = f'SELECT hash FROM {raw_dataset}.blocks WHERE block_num = {block_num} LIMIT 1'
16+
query = f'SELECT hash FROM "{raw_dataset}".blocks WHERE block_num = {block_num} LIMIT 1'
1717
result = client.get_sql(query, read_all=True)
1818
hash_val = result.to_pydict()['hash'][0]
1919
return '0x' + hash_val.hex() if isinstance(hash_val, bytes) else hash_val
2020

2121

2222
def get_latest_block(client: Client, raw_dataset: str) -> int:
2323
"""Get latest block number from dataset.blocks table."""
24-
query = f'SELECT block_num FROM {raw_dataset}.blocks ORDER BY block_num DESC LIMIT 1'
24+
query = f'SELECT block_num FROM "{raw_dataset}".blocks ORDER BY block_num DESC LIMIT 1'
2525
result = client.get_sql(query, read_all=True)
2626
return result.to_pydict()['block_num'][0]
2727

apps/queries/erc20_transfers.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ from (
3939
l.timestamp,
4040
l.address,
4141
evm_decode(l.topic1, l.topic2, l.topic3, l.data, 'Transfer(address indexed from, address indexed to, uint256 value)') as dec
42-
from eth_firehose.logs l
42+
from 'edgeandnode/ethereum_mainnet'.logs l
4343
where
4444
l.topic0 = evm_topic('Transfer(address indexed from, address indexed to, uint256 value)') and
4545
l.topic3 IS NULL

src/amp/loaders/base.py

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ def load_stream_continuous(
494494
table_name,
495495
connection_name,
496496
response.metadata.ranges,
497+
response.metadata.ranges_complete,
497498
**filtered_kwargs,
498499
)
499500

@@ -670,6 +671,7 @@ def _process_batch_non_transactional(
670671
table_name: str,
671672
connection_name: str,
672673
ranges: Optional[List[BlockRange]],
674+
ranges_complete: bool,
673675
**kwargs,
674676
) -> Optional[LoadResult]:
675677
"""
@@ -682,46 +684,24 @@ def _process_batch_non_transactional(
682684
table_name: Target table name
683685
connection_name: Connection identifier
684686
ranges: Block ranges for this batch (if available)
687+
ranges_complete: Whether this batch marks a watermark boundary
685688
**kwargs: Additional options passed to load_batch
686689
687690
Returns:
688691
LoadResult, or None if batch was skipped as duplicate
689692
"""
690-
# Check if batch already processed (idempotency / exactly-once)
691-
if ranges and self.state_enabled:
692-
try:
693-
batch_ids = [BatchIdentifier.from_block_range(br) for br in ranges]
694-
is_duplicate = self.state_store.is_processed(connection_name, table_name, batch_ids)
695-
696-
if is_duplicate:
697-
# Skip this batch - already processed
698-
self.logger.info(
699-
f'Skipping duplicate batch: {len(ranges)} ranges already processed for {table_name}'
700-
)
701-
return LoadResult(
702-
rows_loaded=0,
703-
duration=0.0,
704-
ops_per_second=0.0,
705-
table_name=table_name,
706-
loader_type=self.__class__.__name__.replace('Loader', '').lower(),
707-
success=True,
708-
metadata={'operation': 'skip_duplicate', 'ranges': [r.to_dict() for r in ranges]},
709-
)
710-
except ValueError as e:
711-
# BlockRange missing hash - log and continue without idempotency check
712-
self.logger.warning(f'Cannot check for duplicates: {e}. Processing batch anyway.')
713-
714693
# Load batch
715694
result = self.load_batch(batch_data, table_name, **kwargs)
716695

717-
if result.success and ranges and self.state_enabled:
718-
# Mark batch as processed (for exactly-once semantics)
696+
if result.success and ranges and self.state_enabled and ranges_complete:
697+
# Only mark ranges at watermark boundaries (ranges_complete=true)
698+
# Multiple batches can have the same BlockRange, so we only checkpoint at watermarks
719699
try:
720700
batch_ids = [BatchIdentifier.from_block_range(br) for br in ranges]
721701
self.state_store.mark_processed(connection_name, table_name, batch_ids)
702+
self.logger.debug(f'Marked watermark as processed: {len(ranges)} ranges for {table_name}')
722703
except Exception as e:
723-
self.logger.error(f'Failed to mark batches as processed: {e}')
724-
# Continue anyway - state store provides resume capability
704+
self.logger.error(f'Failed to mark watermark as processed: {e}')
725705

726706
return result
727707

0 commit comments

Comments
 (0)