Skip to content

Commit 5a44687

Browse files
committed
Start of materialized view command
1 parent f1b7255 commit 5a44687

File tree

1 file changed

+208
-0
lines changed

1 file changed

+208
-0
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#!/usr/bin/env python
2+
"""Iceberg Materialized View handler for SingleStoreDB Fusion."""
3+
from __future__ import annotations
4+
5+
import json
6+
import uuid
7+
from typing import Any
8+
from typing import Dict
9+
from typing import Optional
10+
11+
from singlestoredb.connection import connect
12+
from singlestoredb.fusion import result
13+
from singlestoredb.fusion.handler import SQLHandler
14+
15+
16+
class CreateIcebergMaterializedView(SQLHandler):
17+
"""
18+
CREATE ICEBERG MATERIALIZED VIEW
19+
[ if_not_exists ]
20+
view_name
21+
ON iceberg_table
22+
[ catalog ]
23+
[ storage ]
24+
;
25+
26+
# If not exists
27+
if_not_exists = IF NOT EXISTS
28+
29+
# View name
30+
view_name = <table>
31+
32+
# Iceberg table
33+
iceberg_table = <table>
34+
35+
# Catalog
36+
catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]
37+
_catalog_config = CONFIG '<catalog-config>'
38+
_catalog_creds = CREDENTIALS '<catalog-creds>'
39+
40+
# Storage
41+
storage = LINK [ _link_config ] [ _link_creds ]
42+
_link_config = S3 CONFIG '<link-config>'
43+
_link_creds = CREDENTIALS '<link-creds>'
44+
45+
Description
46+
-----------
47+
Create an Iceberg materialized view that syncs data from an Iceberg table
48+
to a SingleStore table with automatic updates.
49+
50+
Arguments
51+
---------
52+
* ``<catalog-config>`` and ``<catalog-creds>``: The catalog configuration.
53+
* ``<link-config>`` and ``<link-creds>``: The storage link configuration.
54+
55+
Remarks
56+
-------
57+
* ``CATALOG`` specifies the details of the catalog to connect to.
58+
* ``LINK`` specifies the details of the data storage to connect to.
59+
* The materialized view will keep the SingleStore table in sync with the
60+
Iceberg table through an underlying pipeline.
61+
62+
Examples
63+
--------
64+
The following statement creates an Iceberg materialized view::
65+
66+
CREATE ICEBERG MATERIALIZED VIEW my_db.all_sales_orders
67+
ON my_catalog.sales.orders
68+
CATALOG CONFIG '{
69+
"catalog_type": "GLUE",
70+
"table_format": "ICEBERG",
71+
"catalog_id": "123456789012",
72+
"catalog_region": "us-east-1"
73+
}'
74+
LINK S3 CONFIG '{
75+
"region": "us-east-1",
76+
"endpoint_url": "s3://my-bucket"
77+
}'
78+
;
79+
80+
"""
81+
82+
def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
83+
# Parse view name
84+
if isinstance(params['view_name'], str):
85+
view_database = None
86+
view_table = params['view_name']
87+
else:
88+
view_database, view_table = params['view_name']
89+
90+
# Parse Iceberg table reference
91+
iceberg_parts = params['iceberg_table']
92+
if isinstance(iceberg_parts, str):
93+
# Simple table name
94+
catalog_name = None
95+
iceberg_database = None
96+
iceberg_table = iceberg_parts
97+
elif len(iceberg_parts) == 2:
98+
# database.table
99+
catalog_name = None
100+
iceberg_database, iceberg_table = iceberg_parts
101+
elif len(iceberg_parts) == 3:
102+
# catalog.database.table
103+
catalog_name, iceberg_database, iceberg_table = iceberg_parts
104+
else:
105+
raise ValueError(
106+
'Iceberg table reference must be in format: '
107+
'[catalog.]database.table',
108+
)
109+
110+
# Iceberg expects lowercase
111+
if iceberg_database:
112+
iceberg_database = iceberg_database.lower()
113+
if catalog_name:
114+
catalog_name = catalog_name.lower()
115+
116+
# Parse catalog configuration
117+
catalog_config = json.loads(
118+
params['catalog'].get('catalog_config', '{}') or '{}',
119+
)
120+
catalog_creds = json.loads(
121+
params['catalog'].get('catalog_creds', '{}') or '{}',
122+
)
123+
124+
# Parse storage configuration
125+
storage_config = json.loads(
126+
(params.get('storage') or {}).get('link_config', '{}') or '{}',
127+
)
128+
storage_creds = json.loads(
129+
(params.get('storage') or {}).get('link_creds', '{}') or '{}',
130+
)
131+
132+
storage_config['provider'] = 'S3'
133+
134+
# Validate required fields
135+
if iceberg_database is None:
136+
raise ValueError(
137+
'Database name must be specified for Iceberg table',
138+
)
139+
140+
if view_database is None:
141+
with connect() as conn:
142+
with conn.cursor() as cur:
143+
cur.execute('SELECT DATABASE()')
144+
res = cur.fetchone()
145+
if not res:
146+
raise ValueError(
147+
'No database selected. Please specify database '
148+
'name for materialized view',
149+
)
150+
if isinstance(res, (tuple, list)):
151+
view_database = res[0]
152+
elif isinstance(res, dict):
153+
view_database = list(res.values())[0]
154+
else:
155+
raise ValueError(
156+
'Unexpected result type from SELECT DATABASE()',
157+
)
158+
159+
# Merge configurations
160+
config = {}
161+
config.update(catalog_config)
162+
config.update(storage_config)
163+
config['table_id'] = f'{iceberg_database}.{iceberg_table}'
164+
config_json = json.dumps(config)
165+
166+
creds = {}
167+
creds.update(catalog_creds)
168+
creds.update(storage_creds)
169+
creds_json = json.dumps(creds)
170+
171+
# Create a unique pipeline name
172+
pipeline_name = f'iceberg_mv_{view_database}_{view_table}_{uuid.uuid4().hex[:8]}'
173+
174+
print('ICEBERG TABLE', iceberg_database, iceberg_table)
175+
print('DB TABLE', view_database, view_table)
176+
print('CONFIG', config)
177+
print('CREDS', creds)
178+
179+
# Create and start the pipeline
180+
with connect() as conn:
181+
with conn.cursor() as cur:
182+
# Create the pipeline
183+
cur.execute(rf'''
184+
CREATE PIPELINE `{pipeline_name}` AS
185+
LOAD DATA S3 ''
186+
CONFIG '{config_json}'
187+
CREDENTIALS '{creds_json}'
188+
REPLACE INTO TABLE
189+
`{view_database}`.`{view_table}`
190+
FORMAT ICEBERG
191+
''')
192+
193+
# Start the pipeline
194+
cur.execute(rf'START PIPELINE `{pipeline_name}`')
195+
196+
# Return result
197+
res = result.FusionSQLResult()
198+
res.add_field('MaterializedView', result.STRING)
199+
res.add_field('Pipeline', result.STRING)
200+
res.add_field('Status', result.STRING)
201+
res.set_rows([
202+
(f'{view_database}.{view_table}', pipeline_name, 'Created'),
203+
])
204+
205+
return res
206+
207+
208+
CreateIcebergMaterializedView.register(overwrite=True)

0 commit comments

Comments
 (0)