Skip to content

Commit 7562397

Browse files
authored
Add database/table create/delete function for Timestream. (#470)
1 parent c6c367f commit 7562397

File tree

6 files changed

+505
-559
lines changed

6 files changed

+505
-559
lines changed

awswrangler/timestream.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,3 +234,199 @@ def query(sql: str, boto3_session: Optional[boto3.Session] = None) -> pd.DataFra
234234
if col["type"] == "VARCHAR":
235235
df[col["name"]] = df[col["name"]].astype("string")
236236
return df
237+
238+
239+
def create_database(
240+
database: str,
241+
kms_key_id: Optional[str] = None,
242+
tags: Optional[Dict[str, str]] = None,
243+
boto3_session: Optional[boto3.Session] = None,
244+
) -> str:
245+
"""Create a new Timestream database.
246+
247+
Note
248+
----
249+
If the KMS key is not specified, the database will be encrypted with a
250+
Timestream managed KMS key located in your account.
251+
252+
Parameters
253+
----------
254+
database: str
255+
Database name.
256+
kms_key_id: Optional[str]
257+
The KMS key for the database. If the KMS key is not specified,
258+
the database will be encrypted with a Timestream managed KMS key located in your account.
259+
tags: Optional[Dict[str, str]]
260+
Key/Value dict to put on the database.
261+
Tags enable you to categorize databases and/or tables, for example,
262+
by purpose, owner, or environment.
263+
e.g. {"foo": "boo", "bar": "xoo"})
264+
boto3_session : boto3.Session(), optional
265+
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
266+
267+
Returns
268+
-------
269+
str
270+
The Amazon Resource Name that uniquely identifies this database. (ARN)
271+
272+
Examples
273+
--------
274+
Creating a database.
275+
276+
>>> import awswrangler as wr
277+
>>> arn = wr.timestream.create_database("MyDatabase")
278+
279+
"""
280+
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
281+
args: Dict[str, Any] = {"DatabaseName": database}
282+
if kms_key_id is not None:
283+
args["KmsKeyId"] = kms_key_id
284+
if tags is not None:
285+
args["Tags"] = [{"Key": k, "Value": v} for k, v in tags.items()]
286+
response: Dict[str, Dict[str, Any]] = client.create_database(**args)
287+
return cast(str, response["Database"]["Arn"])
288+
289+
290+
def delete_database(
291+
database: str,
292+
boto3_session: Optional[boto3.Session] = None,
293+
) -> None:
294+
"""Delete a given Timestream database. This is an irreversible operation.
295+
296+
After a database is deleted, the time series data from its tables cannot be recovered.
297+
298+
All tables in the database must be deleted first, or a ValidationException error will be thrown.
299+
300+
Due to the nature of distributed retries,
301+
the operation can return either success or a ResourceNotFoundException.
302+
Clients should consider them equivalent.
303+
304+
Parameters
305+
----------
306+
database: str
307+
Database name.
308+
boto3_session : boto3.Session(), optional
309+
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
310+
311+
Returns
312+
-------
313+
None
314+
None.
315+
316+
Examples
317+
--------
318+
Deleting a database
319+
320+
>>> import awswrangler as wr
321+
>>> arn = wr.timestream.delete_database("MyDatabase")
322+
323+
"""
324+
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
325+
client.delete_database(DatabaseName=database)
326+
327+
328+
def create_table(
329+
database: str,
330+
table: str,
331+
memory_retention_hours: int,
332+
magnetic_retention_days: int,
333+
tags: Optional[Dict[str, str]] = None,
334+
boto3_session: Optional[boto3.Session] = None,
335+
) -> str:
336+
"""Create a new Timestream database.
337+
338+
Note
339+
----
340+
If the KMS key is not specified, the database will be encrypted with a
341+
Timestream managed KMS key located in your account.
342+
343+
Parameters
344+
----------
345+
database: str
346+
Database name.
347+
table: str
348+
Table name.
349+
memory_retention_hours: int
350+
The duration for which data must be stored in the memory store.
351+
magnetic_retention_days: int
352+
The duration for which data must be stored in the magnetic store.
353+
tags: Optional[Dict[str, str]]
354+
Key/Value dict to put on the table.
355+
Tags enable you to categorize databases and/or tables, for example,
356+
by purpose, owner, or environment.
357+
e.g. {"foo": "boo", "bar": "xoo"})
358+
boto3_session : boto3.Session(), optional
359+
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
360+
361+
Returns
362+
-------
363+
str
364+
The Amazon Resource Name that uniquely identifies this database. (ARN)
365+
366+
Examples
367+
--------
368+
Creating a table.
369+
370+
>>> import awswrangler as wr
371+
>>> arn = wr.timestream.create_table(
372+
... database="MyDatabase",
373+
... table="MyTable",
374+
... memory_retention_hours=3,
375+
... magnetic_retention_days=7
376+
... )
377+
378+
"""
379+
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
380+
args: Dict[str, Any] = {
381+
"DatabaseName": database,
382+
"TableName": table,
383+
"RetentionProperties": {
384+
"MemoryStoreRetentionPeriodInHours": memory_retention_hours,
385+
"MagneticStoreRetentionPeriodInDays": magnetic_retention_days,
386+
},
387+
}
388+
if tags is not None:
389+
args["Tags"] = [{"Key": k, "Value": v} for k, v in tags.items()]
390+
response: Dict[str, Dict[str, Any]] = client.create_table(**args)
391+
return cast(str, response["Table"]["Arn"])
392+
393+
394+
def delete_table(
395+
database: str,
396+
table: str,
397+
boto3_session: Optional[boto3.Session] = None,
398+
) -> None:
399+
"""Delete a given Timestream table.
400+
401+
This is an irreversible operation.
402+
403+
After a Timestream database table is deleted, the time series data stored in the table cannot be recovered.
404+
405+
Due to the nature of distributed retries,
406+
the operation can return either success or a ResourceNotFoundException.
407+
Clients should consider them equivalent.
408+
409+
Parameters
410+
----------
411+
database: str
412+
Database name.
413+
table: str
414+
Table name.
415+
boto3_session : boto3.Session(), optional
416+
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
417+
418+
Returns
419+
-------
420+
None
421+
None.
422+
423+
Examples
424+
--------
425+
Deleting a table
426+
427+
>>> import awswrangler as wr
428+
>>> arn = wr.timestream.delete_table("MyDatabase", "MyTable")
429+
430+
"""
431+
client: boto3.client = _utils.client(service_name="timestream-write", session=boto3_session)
432+
client.delete_table(DatabaseName=database, TableName=table)

docs/source/api.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,12 @@ Amazon Timestream
159159
.. autosummary::
160160
:toctree: stubs
161161

162-
write
162+
create_database
163+
create_table
164+
delete_database
165+
delete_table
163166
query
167+
write
164168

165169
Amazon EMR
166170
----------

tests/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,3 +231,14 @@ def mysql_table():
231231
with con.cursor() as cursor:
232232
cursor.execute(f"DROP TABLE IF EXISTS test.{name}")
233233
con.close()
234+
235+
236+
@pytest.fixture(scope="function")
237+
def timestream_database_and_table():
238+
name = f"tbl_{get_time_str_with_random_suffix()}"
239+
print(f"Timestream name: {name}")
240+
wr.timestream.create_database(name)
241+
wr.timestream.create_table(name, name, 1, 1)
242+
yield name
243+
wr.timestream.delete_table(name, name)
244+
wr.timestream.delete_database(name)

tests/test_timestream.py

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
99

1010

11-
def test_write_simple():
11+
def test_basic_scenario(timestream_database_and_table):
12+
name = timestream_database_and_table
1213
df = pd.DataFrame(
1314
{
1415
"time": [datetime.now(), datetime.now(), datetime.now()],
@@ -19,16 +20,34 @@ def test_write_simple():
1920
)
2021
rejected_records = wr.timestream.write(
2122
df=df,
22-
database="sampleDB",
23-
table="sampleTable",
23+
database=name,
24+
table=name,
2425
time_col="time",
2526
measure_col="measure",
2627
dimensions_cols=["dim0", "dim1"],
2728
)
2829
assert len(rejected_records) == 0
30+
df = wr.timestream.query(
31+
f"""
32+
SELECT
33+
1 as col_int,
34+
try_cast(now() as time) as col_time,
35+
TRUE as col_bool,
36+
current_date as col_date,
37+
'foo' as col_str,
38+
measure_value::double,
39+
measure_name,
40+
time
41+
FROM "{name}"."{name}"
42+
ORDER BY time
43+
DESC LIMIT 10
44+
"""
45+
)
46+
assert df.shape == (3, 8)
2947

3048

31-
def test_write_real():
49+
def test_real_csv_load_scenario(timestream_database_and_table):
50+
name = timestream_database_and_table
3251
df = pd.read_csv(
3352
"https://raw.githubusercontent.com/awslabs/amazon-timestream-tools/master/sample_apps/data/sample.csv",
3453
names=[
@@ -52,39 +71,21 @@ def test_write_real():
5271
df_memory = df[df.measure_kind == "memory_utilization"]
5372
rejected_records = wr.timestream.write(
5473
df=df_cpu,
55-
database="sampleDB",
56-
table="sampleTable",
74+
database=name,
75+
table=name,
5776
time_col="time",
5877
measure_col="measure",
5978
dimensions_cols=["index", "region", "az", "hostname"],
6079
)
6180
assert len(rejected_records) == 0
6281
rejected_records = wr.timestream.write(
6382
df=df_memory,
64-
database="sampleDB",
65-
table="sampleTable",
83+
database=name,
84+
table=name,
6685
time_col="time",
6786
measure_col="measure",
6887
dimensions_cols=["index", "region", "az", "hostname"],
6988
)
7089
assert len(rejected_records) == 0
71-
72-
73-
def test_query():
74-
df = wr.timestream.query(
75-
"""
76-
SELECT
77-
1 as col_int,
78-
try_cast(now() as time) as col_time,
79-
TRUE as col_bool,
80-
current_date as col_date,
81-
'foo' as col_str,
82-
measure_value::double,
83-
measure_name,
84-
time
85-
FROM "sampleDB"."sampleTable"
86-
ORDER BY time
87-
DESC LIMIT 10
88-
"""
89-
)
90-
assert df.shape == (10, 8)
90+
df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{name}"."{name}"')
91+
assert df["counter"].iloc[0] == 126_000

0 commit comments

Comments
 (0)