Skip to content

Commit 437ae35

Browse files
author
Tommy Healy
committed
Add DynamoDB connector to Python SDK
1 parent 1d0c8de commit 437ae35

15 files changed

Lines changed: 526 additions & 28 deletions

File tree

README.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,13 @@ Use a local Tinybird container for development without affecting cloud workspace
516516
### Connections
517517

518518
```python
519-
from tinybird_sdk import define_gcs_connection, define_kafka_connection, define_s3_connection, secret
519+
from tinybird_sdk import (
520+
define_dynamodb_connection,
521+
define_gcs_connection,
522+
define_kafka_connection,
523+
define_s3_connection,
524+
secret,
525+
)
520526

521527
events_kafka = define_kafka_connection(
522528
"events_kafka",
@@ -543,6 +549,42 @@ landing_gcs = define_gcs_connection(
543549
"service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
544550
},
545551
)
552+
553+
events_dynamodb = define_dynamodb_connection(
554+
"events_dynamodb",
555+
{
556+
"region": "us-east-1",
557+
"arn": secret("DYNAMODB_ROLE_ARN"),
558+
},
559+
)
560+
```
561+
562+
The DynamoDB connector uses an IAM role to authenticate. Reference the connection from
563+
a datasource to import a DynamoDB table:
564+
565+
```python
566+
from tinybird_sdk import column, define_datasource, engine, t
567+
568+
orders = define_datasource(
569+
"orders",
570+
{
571+
"schema": {
572+
"id": column(t.string(), {"json_path": "$.Item.id"}),
573+
"_record": column(t.string(), {"json_path": "$.NewImage"}),
574+
"_timestamp": column(t.date_time64(3), {"json_path": "$.ApproximateCreationDateTime"}),
575+
"_event_name": column(t.string().low_cardinality(), {"json_path": "$.eventName"}),
576+
"_is_deleted": column(t.uint8(), {"json_path": "$._is_deleted"}),
577+
},
578+
"engine": engine.replacing_merge_tree(
579+
{"sorting_key": ["id"], "ver": "_timestamp", "is_deleted": "_is_deleted"}
580+
),
581+
"dynamodb": {
582+
"connection": events_dynamodb,
583+
"table_arn": "arn:aws:dynamodb:us-east-1:123456789012:table/orders",
584+
"export_bucket": "s3://my-tinybird-dynamodb-exports",
585+
},
586+
},
587+
)
546588
```
547589

548590
### Datasources

src/tinybird_sdk/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
"define_kafka_connection": ("tinybird_sdk.schema", "define_kafka_connection"),
2727
"define_s3_connection": ("tinybird_sdk.schema", "define_s3_connection"),
2828
"define_gcs_connection": ("tinybird_sdk.schema", "define_gcs_connection"),
29+
"define_dynamodb_connection": ("tinybird_sdk.schema", "define_dynamodb_connection"),
2930
"get_connection_type": ("tinybird_sdk.schema", "get_connection_type"),
3031
"is_connection_definition": ("tinybird_sdk.schema", "is_connection_definition"),
3132
"is_kafka_connection_definition": ("tinybird_sdk.schema", "is_kafka_connection_definition"),
3233
"is_s3_connection_definition": ("tinybird_sdk.schema", "is_s3_connection_definition"),
3334
"is_gcs_connection_definition": ("tinybird_sdk.schema", "is_gcs_connection_definition"),
35+
"is_dynamodb_connection_definition": ("tinybird_sdk.schema", "is_dynamodb_connection_definition"),
3436
"secret": ("tinybird_sdk.schema", "secret"),
3537
"define_token": ("tinybird_sdk.schema", "define_token"),
3638
"is_token_definition": ("tinybird_sdk.schema", "is_token_definition"),

src/tinybird_sdk/generator/connection.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from ..schema.connection import (
66
ConnectionDefinition,
7+
DynamoDBConnectionDefinition,
78
GCSConnectionDefinition,
89
KafkaConnectionDefinition,
910
S3ConnectionDefinition,
@@ -70,6 +71,17 @@ def _generate_gcs_connection(connection: GCSConnectionDefinition) -> str:
7071
return "\n".join(parts)
7172

7273

74+
def _generate_dynamodb_connection(connection: DynamoDBConnectionDefinition) -> str:
75+
options = connection.options
76+
parts = [
77+
"TYPE dynamodb",
78+
f"DYNAMODB_ARN {options.arn}",
79+
f"DYNAMODB_REGION {options.region}",
80+
]
81+
82+
return "\n".join(parts)
83+
84+
7385
def generate_connection(connection: ConnectionDefinition) -> GeneratedConnection:
7486
if isinstance(connection, KafkaConnectionDefinition):
7587
return GeneratedConnection(
@@ -80,9 +92,9 @@ def generate_connection(connection: ConnectionDefinition) -> GeneratedConnection
8092
name=connection._name, content=_generate_s3_connection(connection)
8193
)
8294
if isinstance(connection, GCSConnectionDefinition):
83-
return GeneratedConnection(
84-
name=connection._name, content=_generate_gcs_connection(connection)
85-
)
95+
return GeneratedConnection(name=connection._name, content=_generate_gcs_connection(connection))
96+
if isinstance(connection, DynamoDBConnectionDefinition):
97+
return GeneratedConnection(name=connection._name, content=_generate_dynamodb_connection(connection))
8698
raise ValueError(f"Unsupported connection type: {connection._connectionType}")
8799

88100

src/tinybird_sdk/generator/datasource.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,15 @@ def _generate_import_config(import_config: Any) -> str:
124124
return "\n".join(lines)
125125

126126

127+
def _generate_dynamodb_config(dynamodb: Any) -> str:
128+
lines = [
129+
f"IMPORT_CONNECTION_NAME {dynamodb.connection._name}",
130+
f"IMPORT_TABLE_ARN {dynamodb.table_arn}",
131+
f"IMPORT_EXPORT_BUCKET {dynamodb.export_bucket}",
132+
]
133+
return "\n".join(lines)
134+
135+
127136
def _generate_forward_query(forward_query: str | None) -> str | None:
128137
if not forward_query or not forward_query.strip():
129138
return None
@@ -193,6 +202,9 @@ def generate_datasource(datasource: DatasourceDefinition) -> GeneratedDatasource
193202
if datasource.options.gcs:
194203
parts.extend(["", _generate_import_config(datasource.options.gcs)])
195204

205+
if datasource.options.dynamodb:
206+
parts.extend(["", _generate_dynamodb_config(datasource.options.dynamodb)])
207+
196208
forward_query = _generate_forward_query(datasource.options.forward_query)
197209
if forward_query:
198210
parts.extend(["", forward_query])

src/tinybird_sdk/migrate/emit_ts.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .parser_utils import parse_literal_from_datafile
99
from .types import (
1010
DatasourceModel,
11+
DynamoDBConnectionModel,
1112
GCSConnectionModel,
1213
KafkaConnectionModel,
1314
ParsedResource,
@@ -261,6 +262,14 @@ def _emit_datasource(ds: DatasourceModel) -> str:
261262
lines.append(f" 'from_timestamp': {_escape_string(ds.gcs.from_timestamp)},")
262263
lines.append(" },")
263264

265+
if ds.dynamodb:
266+
connection_var = to_snake_case(ds.dynamodb.connection_name)
267+
lines.append(" 'dynamodb': {")
268+
lines.append(f" 'connection': {connection_var},")
269+
lines.append(f" 'table_arn': {_escape_string(ds.dynamodb.table_arn)},")
270+
lines.append(f" 'export_bucket': {_escape_string(ds.dynamodb.export_bucket)},")
271+
lines.append(" },")
272+
264273
if ds.forward_query:
265274
lines.append(" 'forward_query': '''")
266275
lines.append(ds.forward_query)
@@ -336,13 +345,26 @@ def _emit_gcs_connection(connection: GCSConnectionModel) -> str:
336345
return "\n".join(lines)
337346

338347

348+
def _emit_dynamodb_connection(connection: DynamoDBConnectionModel) -> str:
349+
variable_name = to_snake_case(connection.name)
350+
lines: list[str] = []
351+
lines.append(f"{variable_name} = define_dynamodb_connection({_escape_string(connection.name)}, {{")
352+
lines.append(f" 'region': {_escape_string(connection.region)},")
353+
lines.append(f" 'arn': {_escape_string(connection.arn)},")
354+
lines.append("})")
355+
lines.append("")
356+
return "\n".join(lines)
357+
358+
339359
def _emit_connection(
340-
connection: KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel,
360+
connection: KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel | DynamoDBConnectionModel,
341361
) -> str:
342362
if isinstance(connection, S3ConnectionModel):
343363
return _emit_s3_connection(connection)
344364
if isinstance(connection, GCSConnectionModel):
345365
return _emit_gcs_connection(connection)
366+
if isinstance(connection, DynamoDBConnectionModel):
367+
return _emit_dynamodb_connection(connection)
346368
return _emit_kafka_connection(connection)
347369

348370

@@ -474,6 +496,8 @@ def emit_migration_file_content(resources: list[ParsedResource]) -> str:
474496
imports.add("define_s3_connection")
475497
elif isinstance(conn, GCSConnectionModel):
476498
imports.add("define_gcs_connection")
499+
elif isinstance(conn, DynamoDBConnectionModel):
500+
imports.add("define_dynamodb_connection")
477501
if needs_column:
478502
imports.add("column")
479503
if needs_params:

src/tinybird_sdk/migrate/parse_connection.py

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
11
from __future__ import annotations
22

3-
from .parser_utils import (
4-
MigrationParseError,
5-
is_blank,
6-
parse_directive_line,
7-
parse_quoted_value,
8-
read_directive_block,
9-
split_lines,
10-
)
3+
from .parser_utils import MigrationParseError, is_blank, parse_directive_line, parse_quoted_value, read_directive_block, split_lines
114
from .types import (
125
ConnectionModel,
6+
DynamoDBConnectionModel,
137
GCSConnectionModel,
148
KafkaConnectionModel,
159
ResourceFile,
@@ -30,6 +24,8 @@
3024
"S3_ACCESS_KEY",
3125
"S3_SECRET",
3226
"GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON",
27+
"DYNAMODB_ARN",
28+
"DYNAMODB_REGION",
3329
}
3430

3531

@@ -58,6 +54,9 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
5854
access_secret: str | None = None
5955
service_account_credentials_json: str | None = None
6056

57+
dynamodb_arn: str | None = None
58+
dynamodb_region: str | None = None
59+
6160
i = 0
6261
while i < len(lines):
6362
raw_line = lines[i]
@@ -119,6 +118,10 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
119118
access_secret = parse_quoted_value(value)
120119
elif name == "GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON":
121120
service_account_credentials_json = parse_quoted_value(value)
121+
elif name == "DYNAMODB_ARN":
122+
dynamodb_arn = parse_quoted_value(value)
123+
elif name == "DYNAMODB_REGION":
124+
dynamodb_region = parse_quoted_value(value)
122125
else:
123126
raise MigrationParseError(
124127
resource.file_path,
@@ -135,12 +138,20 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
135138
)
136139

137140
if connection_type == "kafka":
138-
if region or arn or access_key or access_secret or service_account_credentials_json:
141+
if (
142+
region
143+
or arn
144+
or access_key
145+
or access_secret
146+
or service_account_credentials_json
147+
or dynamodb_arn
148+
or dynamodb_region
149+
):
139150
raise MigrationParseError(
140151
resource.file_path,
141152
"connection",
142153
resource.name,
143-
"S3/GCS directives are not valid for kafka connections.",
154+
"S3/GCS/DynamoDB directives are not valid for kafka connections.",
144155
)
145156

146157
if not bootstrap_servers:
@@ -175,12 +186,14 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
175186
or schema_registry_url
176187
or ssl_ca_pem
177188
or service_account_credentials_json
189+
or dynamodb_arn
190+
or dynamodb_region
178191
):
179192
raise MigrationParseError(
180193
resource.file_path,
181194
"connection",
182195
resource.name,
183-
"Kafka/GCS directives are not valid for s3 connections.",
196+
"Kafka/GCS/DynamoDB directives are not valid for s3 connections.",
184197
)
185198

186199
if not region:
@@ -231,12 +244,14 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
231244
or arn
232245
or access_key
233246
or access_secret
247+
or dynamodb_arn
248+
or dynamodb_region
234249
):
235250
raise MigrationParseError(
236251
resource.file_path,
237252
"connection",
238253
resource.name,
239-
"Kafka/S3 directives are not valid for gcs connections.",
254+
"Kafka/S3/DynamoDB directives are not valid for gcs connections.",
240255
)
241256

242257
if not service_account_credentials_json:
@@ -255,6 +270,53 @@ def parse_connection_file(resource: ResourceFile) -> ConnectionModel:
255270
service_account_credentials_json=service_account_credentials_json,
256271
)
257272

273+
if connection_type == "dynamodb":
274+
if (
275+
bootstrap_servers
276+
or security_protocol
277+
or sasl_mechanism
278+
or key
279+
or secret
280+
or schema_registry_url
281+
or ssl_ca_pem
282+
or region
283+
or arn
284+
or access_key
285+
or access_secret
286+
or service_account_credentials_json
287+
):
288+
raise MigrationParseError(
289+
resource.file_path,
290+
"connection",
291+
resource.name,
292+
"Kafka/S3/GCS directives are not valid for dynamodb connections.",
293+
)
294+
295+
if not dynamodb_arn:
296+
raise MigrationParseError(
297+
resource.file_path,
298+
"connection",
299+
resource.name,
300+
"DYNAMODB_ARN is required for dynamodb connections.",
301+
)
302+
303+
if not dynamodb_region:
304+
raise MigrationParseError(
305+
resource.file_path,
306+
"connection",
307+
resource.name,
308+
"DYNAMODB_REGION is required for dynamodb connections.",
309+
)
310+
311+
return DynamoDBConnectionModel(
312+
kind="connection",
313+
name=resource.name,
314+
file_path=resource.file_path,
315+
connection_type="dynamodb",
316+
region=dynamodb_region,
317+
arn=dynamodb_arn,
318+
)
319+
258320
raise MigrationParseError(
259321
resource.file_path,
260322
"connection",

0 commit comments

Comments
 (0)