Skip to content

Commit cefd9ef

Browse files
committed
Support --start-block=latest to start from current chain tip
Allows starting from the latest block instead of only specific block numbers or LMDB state. Useful for fresh deployments without state.
1 parent c770308 commit cefd9ef

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

apps/kafka_streaming_loader.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def main(
7676
query_file: str,
7777
raw_dataset: str,
7878
network: str,
79-
start_block: int = None,
79+
start_block: str = None,
8080
label_csv: str = None,
8181
state_dir: str = '.amp_state',
8282
auth: bool = False,
@@ -115,12 +115,17 @@ def connect():
115115
with open(query_file) as f:
116116
query = f.read()
117117

118-
if start_block is not None:
119-
resume_watermark = create_watermark(client, raw_dataset, network, start_block) if start_block > 0 else None
120-
logger.info(f'Starting query from block {start_block}')
118+
if start_block == 'latest':
119+
block = get_latest_block(client, raw_dataset)
120+
resume_watermark = create_watermark(client, raw_dataset, network, block)
121+
logger.info(f'Starting from latest block {block}')
122+
elif start_block is not None:
123+
block = int(start_block)
124+
resume_watermark = create_watermark(client, raw_dataset, network, block) if block > 0 else None
125+
logger.info(f'Starting from block {block}')
121126
else:
122127
resume_watermark = None
123-
logger.info('Resuming from LMDB state (or starting from latest if no state)')
128+
logger.info('Resuming from LMDB state')
124129
logger.info(f'Streaming to Kafka: {kafka_brokers} -> {topic}')
125130

126131
batch_count = 0
@@ -155,7 +160,7 @@ def stream_batches():
155160
'--raw-dataset', required=True, help='Dataset name for the raw dataset of the chain (e.g., anvil, eth_firehose)'
156161
)
157162
parser.add_argument('--network', default='anvil')
158-
parser.add_argument('--start-block', type=int, help='Start from specific block (default: latest - 10)')
163+
parser.add_argument('--start-block', type=str, help='Start from specific block number or "latest"')
159164
parser.add_argument('--label-csv', help='Optional CSV for label joining')
160165
parser.add_argument('--state-dir', default='.amp_state', help='Directory for LMDB state storage')
161166
parser.add_argument('--auth', action='store_true', help='Enable auth using ~/.amp/cache or AMP_AUTH_TOKEN env var')

0 commit comments

Comments
 (0)