Skip to content

Commit dfc08ed

Browse files
committed
Add auth, retry logic, and proper logging to Kafka streaming loader
- Add --auth and --auth-token CLI options - Add connection retry with exponential backoff (--max-retries, --retry-delay) - Convert print statements to proper logging - Add --state-dir for configurable LMDB state location - Fix resume behavior: use LMDB state when --start-block not provided
1 parent 1c79ead commit dfc08ed

File tree

2 files changed

+81
-27
lines changed

2 files changed

+81
-27
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,4 @@ data/
6060
# Build artifacts
6161
*.tar.gz
6262
*.zip
63-
.amp_state/
63+
.amp_state

apps/kafka_streaming_loader.py

Lines changed: 80 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,41 @@
44
import argparse
55
import logging
66
import os
7+
import time
78
from pathlib import Path
89

910
from amp.client import Client
1011
from amp.loaders.types import LabelJoinConfig
1112
from amp.streaming import BlockRange, ResumeWatermark
1213

14+
logger = logging.getLogger('amp.kafka_streaming_loader')
15+
16+
RETRYABLE_ERRORS = (
17+
ConnectionError,
18+
TimeoutError,
19+
OSError,
20+
)
21+
22+
23+
def retry_with_backoff(func, max_retries=5, initial_delay=1.0, max_delay=60.0, backoff_factor=2.0):
24+
"""Execute function with exponential backoff retry on transient errors."""
25+
delay = initial_delay
26+
last_exception = None
27+
28+
for attempt in range(max_retries + 1):
29+
try:
30+
return func()
31+
except RETRYABLE_ERRORS as e:
32+
last_exception = e
33+
if attempt == max_retries:
34+
logger.error(f'Max retries ({max_retries}) exceeded: {e}')
35+
raise
36+
logger.warning(f'Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...')
37+
time.sleep(delay)
38+
delay = min(delay * backoff_factor, max_delay)
39+
40+
raise last_exception
41+
1342

1443
def get_block_hash(client: Client, raw_dataset: str, block_num: int) -> str:
1544
"""Get block hash from dataset.blocks table."""
@@ -22,8 +51,12 @@ def get_block_hash(client: Client, raw_dataset: str, block_num: int) -> str:
2251
def get_latest_block(client: Client, raw_dataset: str) -> int:
2352
"""Get latest block number from dataset.blocks table."""
2453
query = f'SELECT block_num FROM "{raw_dataset}".blocks ORDER BY block_num DESC LIMIT 1'
54+
logger.debug(f'Fetching latest block from {raw_dataset}')
55+
logger.debug(f'Query: {query}')
2556
result = client.get_sql(query, read_all=True)
26-
return result.to_pydict()['block_num'][0]
57+
block_num = result.to_pydict()['block_num'][0]
58+
logger.info(f'Latest block in {raw_dataset}: {block_num}')
59+
return block_num
2760

2861

2962
def create_watermark(client: Client, raw_dataset: str, network: str, start_block: int) -> ResumeWatermark:
@@ -45,13 +78,20 @@ def main(
4578
start_block: int = None,
4679
label_csv: str = None,
4780
state_dir: str = '.amp_state',
81+
auth: bool = False,
82+
auth_token: str = None,
83+
max_retries: int = 5,
84+
retry_delay: float = 1.0,
4885
):
49-
client = Client(amp_server)
50-
print(f'Connected to {amp_server}')
86+
def connect():
87+
return Client(amp_server, auth=auth, auth_token=auth_token)
88+
89+
client = retry_with_backoff(connect, max_retries=max_retries, initial_delay=retry_delay)
90+
logger.info(f'Connected to {amp_server}')
5191

5292
if label_csv and Path(label_csv).exists():
5393
client.configure_label('tokens', label_csv)
54-
print(f'Loaded {len(client.label_manager.get_label("tokens"))} labels from {label_csv}')
94+
logger.info(f'Loaded {len(client.label_manager.get_label("tokens"))} labels from {label_csv}')
5595
label_config = LabelJoinConfig(
5696
label_name='tokens', label_key_column='token_address', stream_key_column='token_address'
5797
)
@@ -71,25 +111,30 @@ def main(
71111
with open(query_file) as f:
72112
query = f.read()
73113

74-
if start_block is None:
75-
start_block = get_latest_block(client, raw_dataset)
114+
if start_block is not None:
115+
resume_watermark = create_watermark(client, raw_dataset, network, start_block) if start_block > 0 else None
116+
logger.info(f'Starting query from block {start_block}')
117+
else:
118+
resume_watermark = None
119+
logger.info('Resuming from LMDB state (or starting from latest if no state)')
120+
logger.info(f'Streaming to Kafka: {kafka_brokers} -> {topic}')
76121

77-
resume_watermark = create_watermark(client, raw_dataset, network, start_block) if start_block > 0 else None
122+
batch_count = 0
78123

79-
print(f'Starting query from block {start_block}')
80-
print(f'Streaming to Kafka: {kafka_brokers} -> {topic}\n')
124+
def stream_batches():
125+
nonlocal batch_count
126+
for result in client.sql(query).load(
127+
'kafka', topic, stream=True, label_config=label_config, resume_watermark=resume_watermark
128+
):
129+
if result.success:
130+
batch_count += 1
131+
if batch_count == 1 and result.metadata:
132+
logger.info(f'First batch: {result.metadata.get("block_ranges")}')
133+
logger.info(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s')
134+
else:
135+
logger.error(f'Batch error: {result.error}')
81136

82-
batch_count = 0
83-
for result in client.sql(query).load(
84-
'kafka', topic, stream=True, label_config=label_config, resume_watermark=resume_watermark
85-
):
86-
if result.success:
87-
batch_count += 1
88-
if batch_count == 1 and result.metadata:
89-
print(f'First batch: {result.metadata.get("block_ranges")}\n')
90-
print(f'Batch {batch_count}: {result.rows_loaded} rows in {result.duration:.2f}s')
91-
else:
92-
print(f'Error: {result.error}')
137+
retry_with_backoff(stream_batches, max_retries=max_retries, initial_delay=retry_delay)
93138

94139

95140
if __name__ == '__main__':
@@ -105,13 +150,18 @@ def main(
105150
parser.add_argument('--start-block', type=int, help='Start from specific block (default: latest - 10)')
106151
parser.add_argument('--label-csv', help='Optional CSV for label joining')
107152
parser.add_argument('--state-dir', default='.amp_state', help='Directory for LMDB state storage')
153+
parser.add_argument('--auth', action='store_true', help='Enable auth using ~/.amp/cache or AMP_AUTH_TOKEN env var')
154+
parser.add_argument('--auth-token', help='Explicit auth token (works independently, does not require --auth)')
155+
parser.add_argument('--max-retries', type=int, default=5, help='Max retries for connection failures (default: 5)')
156+
parser.add_argument('--retry-delay', type=float, default=1.0, help='Initial retry delay in seconds (default: 1.0)')
108157
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'])
109158
args = parser.parse_args()
110159

111-
if args.log_level:
112-
logging.basicConfig(
113-
level=getattr(logging, args.log_level), format='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
114-
)
160+
logging.basicConfig(
161+
level=logging.WARNING, format='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
162+
)
163+
log_level = getattr(logging, args.log_level) if args.log_level else logging.INFO
164+
logging.getLogger('amp').setLevel(log_level)
115165

116166
try:
117167
main(
@@ -124,9 +174,13 @@ def main(
124174
start_block=args.start_block,
125175
label_csv=args.label_csv,
126176
state_dir=args.state_dir,
177+
auth=args.auth,
178+
auth_token=args.auth_token,
179+
max_retries=args.max_retries,
180+
retry_delay=args.retry_delay,
127181
)
128182
except KeyboardInterrupt:
129-
print('\n\nStopped by user')
183+
logger.info('Stopped by user')
130184
except Exception as e:
131-
print(f'\nError: {e}')
185+
logger.error(f'Fatal error: {e}')
132186
raise

0 commit comments

Comments
 (0)