Skip to content

Commit 3ec0145

Browse files
Fix missing timestamp data type in Timestream (#1881)
Co-authored-by: kukushking <[email protected]>
1 parent 5fd8fa9 commit 3ec0145

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

awswrangler/_data_types.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,12 @@ def pyarrow2timestream(dtype: pa.DataType) -> str: # pylint: disable=too-many-b
263263
return "BOOLEAN"
264264
if pa.types.is_string(dtype):
265265
return "VARCHAR"
266+
if pa.types.is_date(dtype):
267+
return "DATE"
268+
if pa.types.is_time(dtype):
269+
return "TIME"
270+
if pa.types.is_timestamp(dtype):
271+
return "TIMESTAMP"
266272
raise exceptions.UnsupportedType(f"Unsupported Amazon Timestream measure type: {dtype}")
267273

268274

awswrangler/timestream.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ def _df2list(df: pd.DataFrame) -> List[List[Any]]:
2727
return parameters
2828

2929

30+
def _format_measure(measure_name: str, measure_value: Any, measure_type: str) -> Dict[str, str]:
31+
return {
32+
"Name": measure_name,
33+
"Value": str(round(measure_value.timestamp() * 1_000) if measure_type == "TIMESTAMP" else measure_value),
34+
"Type": measure_type,
35+
}
36+
37+
3038
def _write_batch(
3139
database: str,
3240
table: str,
@@ -66,7 +74,7 @@ def _write_batch(
6674
record["MeasureName"] = measure_cols_names[0]
6775
record["MeasureValueType"] = "MULTI"
6876
record["MeasureValues"] = [
69-
{"Name": measure_name, "Value": str(measure_value), "Type": measure_value_type}
77+
_format_measure(measure_name, measure_value, measure_value_type)
7078
for measure_name, measure_value, measure_value_type in zip(
7179
measure_cols_names, rec[measure_cols_loc:dimensions_cols_loc], measure_types
7280
)

tests/test_timestream.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime as dt
12
import logging
23
from datetime import datetime
34

@@ -312,3 +313,34 @@ def test_create_table_additional_kwargs(timestream_database_and_table, timestrea
312313
wr.timestream.delete_table(database=timestream_database_and_table, table=f"{timestream_database_and_table}_3")
313314
tables_in_db = wr.timestream.list_tables(database=timestream_database_and_table)
314315
assert f"{timestream_database_and_table}_3" not in tables_in_db
316+
317+
318+
def test_timestamp_measure_column(timestream_database_and_table):
319+
df = pd.DataFrame(
320+
{
321+
"time": [datetime.now()] * 3,
322+
"dim0": ["foo", "boo", "bar"],
323+
"dim1": [1, 2, 3],
324+
"measure_f": [1.1, 1.2, 1.3],
325+
"measure_t": [datetime.now(dt.timezone.utc)] * 3,
326+
}
327+
)
328+
329+
rejected_records = wr.timestream.write(
330+
df=df,
331+
database=timestream_database_and_table,
332+
table=timestream_database_and_table,
333+
time_col="time",
334+
measure_col=["measure_f", "measure_t"],
335+
dimensions_cols=["dim0", "dim1"],
336+
)
337+
assert len(rejected_records) == 0
338+
339+
df = wr.timestream.query(
340+
f"""
341+
SELECT
342+
*
343+
FROM "{timestream_database_and_table}"."{timestream_database_and_table}"
344+
""",
345+
)
346+
assert df["measure_t"].dtype == "datetime64[ns]"

0 commit comments

Comments
 (0)