Skip to content

Commit e2d3ac7

Browse files
committed
snowflake_loader: Snowpipe streaming support
1 parent 948890a commit e2d3ac7

File tree

7 files changed

+1236
-24
lines changed

7 files changed

+1236
-24
lines changed

docs/snowflake_loader.md

Whitespace-only changes.

pyproject.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,17 @@ dependencies = [
1212
"pandas>=2.3.1",
1313
"pyarrow>=20.0.0",
1414
"typer>=0.15.2",
15-
1615
# Flight SQL support
1716
"adbc-driver-manager>=1.5.0",
1817
"adbc-driver-postgresql>=1.5.0",
1918
"protobuf>=4.21.0",
20-
2119
# Ethereum/blockchain utilities
2220
"base58>=2.1.1",
2321
"eth-hash[pysha3]>=0.7.1",
2422
"eth-utils>=5.2.0",
25-
2623
# Google Cloud support
2724
"google-cloud-bigquery>=3.30.0",
2825
"google-cloud-storage>=3.1.0",
29-
3026
# Arro3 for enhanced PyArrow operations
3127
"arro3-core>=0.5.1",
3228
"arro3-compute>=0.5.1",
@@ -59,6 +55,7 @@ iceberg = [
5955

6056
snowflake = [
6157
"snowflake-connector-python>=3.5.0",
58+
"snowpipe-streaming>=1.0.0", # Snowpipe Streaming API
6259
]
6360

6461
lmdb = [
@@ -72,6 +69,7 @@ all_loaders = [
7269
"pyiceberg[sql-sqlite]>=0.10.0", # Apache Iceberg
7370
"pydantic>=2.0,<2.12", # PyIceberg 0.10.0 compatibility
7471
"snowflake-connector-python>=3.5.0", # Snowflake
72+
"snowpipe-streaming>=1.0.0", # Snowpipe Streaming API
7573
"lmdb>=1.4.0", # LMDB
7674
]
7775

src/amp/client.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,13 @@ def query_and_load(
211211
**{k: v for k, v in kwargs.items() if k in ['max_retries', 'retry_delay']},
212212
)
213213

214+
# Remove known LoadConfig params from kwargs, leaving loader-specific params
215+
for key in ['max_retries', 'retry_delay']:
216+
kwargs.pop(key, None)
217+
218+
# Remaining kwargs are loader-specific (e.g., channel_suffix for Snowflake)
219+
loader_specific_kwargs = kwargs
220+
214221
if read_all:
215222
self.logger.info(f'Loading entire query result to {loader_type}:{destination}')
216223
else:
@@ -221,20 +228,20 @@ def query_and_load(
221228
# Get the data and load
222229
if read_all:
223230
table = self.get_sql(query, read_all=True)
224-
return self._load_table(table, loader_type, destination, loader_config, load_config)
231+
return self._load_table(table, loader_type, destination, loader_config, load_config, **loader_specific_kwargs)
225232
else:
226233
batch_stream = self.get_sql(query, read_all=False)
227-
return self._load_stream(batch_stream, loader_type, destination, loader_config, load_config)
234+
return self._load_stream(batch_stream, loader_type, destination, loader_config, load_config, **loader_specific_kwargs)
228235

229236
def _load_table(
230-
self, table: pa.Table, loader: str, table_name: str, config: Dict[str, Any], load_config: LoadConfig
237+
self, table: pa.Table, loader: str, table_name: str, config: Dict[str, Any], load_config: LoadConfig, **kwargs
231238
) -> LoadResult:
232239
"""Load a complete Arrow Table"""
233240
try:
234241
loader_instance = create_loader(loader, config)
235242

236243
with loader_instance:
237-
return loader_instance.load_table(table, table_name, **load_config.__dict__)
244+
return loader_instance.load_table(table, table_name, **load_config.__dict__, **kwargs)
238245
except Exception as e:
239246
self.logger.error(f'Failed to load table: {e}')
240247
return LoadResult(
@@ -254,13 +261,14 @@ def _load_stream(
254261
table_name: str,
255262
config: Dict[str, Any],
256263
load_config: LoadConfig,
264+
**kwargs,
257265
) -> Iterator[LoadResult]:
258266
"""Load from a stream of batches"""
259267
try:
260268
loader_instance = create_loader(loader, config)
261269

262270
with loader_instance:
263-
yield from loader_instance.load_stream(batch_stream, table_name, **load_config.__dict__)
271+
yield from loader_instance.load_stream(batch_stream, table_name, **load_config.__dict__, **kwargs)
264272
except Exception as e:
265273
self.logger.error(f'Failed to load stream: {e}')
266274
yield LoadResult(

0 commit comments

Comments
 (0)