Skip to content

Commit fff424c

Browse files
committed
WIP: fixes I don't think we should have needed 🤷
1 parent b37e584 commit fff424c

File tree

3 files changed

+33
-30
lines changed

3 files changed

+33
-30
lines changed

src/amp/loaders/implementations/snowflake_loader.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,12 @@ def _get_or_create_channel(self, table_name: str, channel_suffix: str = 'default
566566
client = self.streaming_clients[table_name]
567567

568568
# Open channel - returns (channel, status) tuple
569-
channel, status = client.open_channel(channel_name=channel_name)
570-
571-
if status != 'OPEN':
572-
raise RuntimeError(f'Failed to open streaming channel {channel_name}: status={status}')
569+
# Status indicates channel state but appears to be a channel object representation
570+
result = client.open_channel(channel_name=channel_name)
571+
if isinstance(result, tuple):
572+
channel, status = result
573+
else:
574+
channel = result
573575

574576
self.streaming_channels[channel_key] = channel
575577
self.logger.info(f'Opened Snowpipe Streaming channel: {channel_name}')
@@ -584,9 +586,9 @@ def disconnect(self) -> None:
584586
for channel_key, channel in self.streaming_channels.items():
585587
try:
586588
channel.close()
587-
self.logger.debug(f'Closed channel: {channel.name}')
589+
self.logger.debug(f'Closed channel: {channel_key}')
588590
except Exception as e:
589-
self.logger.warning(f'Error closing channel: {e}')
591+
self.logger.warning(f'Error closing channel {channel_key}: {e}')
590592

591593
self.streaming_channels.clear()
592594

@@ -1132,8 +1134,11 @@ def _create_table_from_schema(self, schema: pa.Schema, table_name: str) -> None:
11321134
# Build CREATE TABLE statement
11331135
columns = []
11341136
for field in schema:
1137+
# Special case: _meta_block_ranges should be VARIANT for optimal JSON querying
1138+
if field.name == '_meta_block_ranges':
1139+
snowflake_type = 'VARIANT'
11351140
# Handle complex types
1136-
if pa.types.is_timestamp(field.type):
1141+
elif pa.types.is_timestamp(field.type):
11371142
if field.type.tz is not None:
11381143
snowflake_type = 'TIMESTAMP_TZ'
11391144
else:
@@ -1184,7 +1189,7 @@ def _create_table_from_schema(self, schema: pa.Schema, table_name: str) -> None:
11841189
def _get_loader_batch_metadata(self, batch: pa.RecordBatch, duration: float, **kwargs) -> Dict[str, Any]:
11851190
"""Get Snowflake-specific metadata for batch operation"""
11861191
return {
1187-
'loading_method': 'stage' if self.use_stage else 'insert',
1192+
'loading_method': self.loading_method,
11881193
'warehouse': self.config.warehouse,
11891194
'database': self.config.database,
11901195
'schema': self.config.schema,
@@ -1195,7 +1200,7 @@ def _get_loader_table_metadata(
11951200
) -> Dict[str, Any]:
11961201
"""Get Snowflake-specific metadata for table operation"""
11971202
return {
1198-
'loading_method': 'stage' if self.use_stage else 'insert',
1203+
'loading_method': self.loading_method,
11991204
'warehouse': self.config.warehouse,
12001205
'database': self.config.database,
12011206
'schema': self.config.schema,
@@ -1314,9 +1319,9 @@ def _handle_reorg(self, invalidation_ranges: List[BlockRange], table_name: str)
13141319
try:
13151320
channel.close()
13161321
del self.streaming_channels[channel_key]
1317-
self.logger.debug(f'Closed streaming channel: {channel.name}')
1322+
self.logger.debug(f'Closed streaming channel: {channel_key}')
13181323
except Exception as e:
1319-
self.logger.warning(f'Error closing channel {channel.name}: {e}')
1324+
self.logger.warning(f'Error closing channel {channel_key}: {e}')
13201325
# Continue closing other channels even if one fails
13211326

13221327
self.logger.info(

src/amp/streaming/reorg.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __next__(self) -> ResponseBatchWithReorg:
7474
return ResponseBatchWithReorg.reorg_batch(invalidation_ranges)
7575

7676
# Check if we have a pending batch from a previous reorg detection
77+
# REVIEW: I think we should remove this
7778
if hasattr(self, '_pending_batch'):
7879
pending = self._pending_batch
7980
delattr(self, '_pending_batch')

tests/integration/test_snowflake_loader.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
try:
2626
from src.amp.loaders.base import LoadMode
2727
from src.amp.loaders.implementations.snowflake_loader import SnowflakeLoader
28+
from src.amp.streaming.types import BatchMetadata, BlockRange, ResponseBatch, ResponseBatchWithReorg
2829
except ImportError:
2930
pytest.skip('amp modules not available', allow_module_level=True)
3031

@@ -127,7 +128,8 @@ def test_batch_loading(self, snowflake_config, medium_test_table, test_table_nam
127128
loader = SnowflakeLoader(snowflake_config)
128129

129130
with loader:
130-
result = loader.load_table(medium_test_table, test_table_name, create_table=True)
131+
# Use smaller batch size to force multiple batches (medium_test_table has 10000 rows)
132+
result = loader.load_table(medium_test_table, test_table_name, create_table=True, batch_size=5000)
131133

132134
assert result.success is True
133135
assert result.rows_loaded == medium_test_table.num_rows
@@ -333,11 +335,10 @@ def test_stage_and_compression_options(self, snowflake_config, medium_test_table
333335
"""Test different stage and compression options"""
334336
cleanup_tables.append(test_table_name)
335337

336-
# Test with different compression
338+
# Test with stage loading method
337339
config = {
338340
**snowflake_config,
339341
'loading_method': 'stage',
340-
'compression': 'zstd',
341342
}
342343
loader = SnowflakeLoader(config)
343344

@@ -614,23 +615,19 @@ def test_streaming_with_reorg(self, snowflake_config, test_table_name, cleanup_t
614615
data2 = pa.RecordBatch.from_pydict({'id': [3, 4], 'value': [300, 400]})
615616

616617
# Create response batches
617-
response1 = ResponseBatchWithReorg(
618-
is_reorg=False,
619-
data=ResponseBatch(
620-
data=data1, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110)])
621-
),
618+
batch1 = ResponseBatch(
619+
data=data1, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=100, end=110)])
622620
)
621+
response1 = ResponseBatchWithReorg.data_batch(batch1)
623622

624-
response2 = ResponseBatchWithReorg(
625-
is_reorg=False,
626-
data=ResponseBatch(
627-
data=data2, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160)])
628-
),
623+
batch2 = ResponseBatch(
624+
data=data2, metadata=BatchMetadata(ranges=[BlockRange(network='ethereum', start=150, end=160)])
629625
)
626+
response2 = ResponseBatchWithReorg.data_batch(batch2)
630627

631628
# Simulate reorg event
632-
reorg_response = ResponseBatchWithReorg(
633-
is_reorg=True, invalidation_ranges=[BlockRange(network='ethereum', start=150, end=200)]
629+
reorg_response = ResponseBatchWithReorg.reorg_batch(
630+
[BlockRange(network='ethereum', start=150, end=200)]
634631
)
635632

636633
# Process streaming data
@@ -647,8 +644,8 @@ def test_streaming_with_reorg(self, snowflake_config, test_table_name, cleanup_t
647644
assert results[2].is_reorg
648645

649646
# Verify reorg deleted the second batch
650-
loader.cursor.execute(f'SELECT id FROM {test_table_name} ORDER BY id')
651-
remaining_ids = [row['ID'] for row in loader.cursor.fetchall()]
647+
loader.cursor.execute(f'SELECT "id" FROM {test_table_name} ORDER BY "id"')
648+
remaining_ids = [row['id'] for row in loader.cursor.fetchall()]
652649
assert remaining_ids == [1, 2] # 3 and 4 deleted by reorg
653650

654651

@@ -704,8 +701,8 @@ def test_streaming_connection(self, snowflake_streaming_config):
704701
loader.connect()
705702
assert loader._is_connected is True
706703
assert loader.connection is not None
707-
# Streaming client is lazily initialized (created on first load, not at connection time)
708-
assert loader.streaming_client is None
704+
# Streaming clients dict is initialized empty (clients created on first load per table)
705+
assert loader.streaming_clients == {}
709706

710707
loader.disconnect()
711708
assert loader._is_connected is False

0 commit comments

Comments
 (0)