Skip to content

Commit 165c786

Browse files
authored
Add version param for wr.timestream.write (#847)
* Add version param for wr.timestream.write * Formatting
1 parent 237fcf6 commit 165c786

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

awswrangler/timestream.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def _write_batch(
3232
table: str,
3333
cols_names: List[str],
3434
measure_type: str,
35+
version: int,
3536
batch: List[Any],
3637
boto3_primitives: _utils.Boto3PrimitivesType,
3738
) -> List[Dict[str, str]]:
@@ -59,6 +60,7 @@ def _write_batch(
5960
"MeasureValue": str(rec[1]),
6061
"Time": str(round(rec[0].timestamp() * 1_000)),
6162
"TimeUnit": "MILLISECONDS",
63+
"Version": version,
6264
}
6365
for rec in batch
6466
],
@@ -117,6 +119,7 @@ def write(
117119
time_col: str,
118120
measure_col: str,
119121
dimensions_cols: List[str],
122+
version: int = 1,
120123
num_threads: int = 32,
121124
boto3_session: Optional[boto3.Session] = None,
122125
) -> List[Dict[str, str]]:
@@ -136,6 +139,9 @@ def write(
136139
DataFrame column name to be used as measure.
137140
dimensions_cols : List[str]
138141
List of DataFrame column names to be used as dimensions.
142+
version : int
143+
Version number used for upserts.
144+
Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.
139145
num_threads : str
140146
Number of thread to be used for concurrent writing.
141147
boto3_session : boto3.Session(), optional
@@ -185,6 +191,7 @@ def write(
185191
itertools.repeat(table),
186192
itertools.repeat(cols_names),
187193
itertools.repeat(measure_type),
194+
itertools.repeat(version),
188195
batches,
189196
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
190197
)

tests/test_timestream.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,59 @@ def test_basic_scenario(timestream_database_and_table):
4646
assert df.shape == (3, 8)
4747

4848

49+
def test_versioned(timestream_database_and_table):
50+
name = timestream_database_and_table
51+
time = [datetime.now(), datetime.now(), datetime.now()]
52+
dfs = [
53+
pd.DataFrame(
54+
{
55+
"time": time,
56+
"dim0": ["foo", "boo", "bar"],
57+
"dim1": [1, 2, 3],
58+
"measure": [1.0, 1.1, 1.2],
59+
}
60+
),
61+
pd.DataFrame(
62+
{
63+
"time": time,
64+
"dim0": ["foo", "boo", "bar"],
65+
"dim1": [1, 2, 3],
66+
"measure": [1.0, 1.1, 1.9],
67+
}
68+
),
69+
pd.DataFrame(
70+
{
71+
"time": time,
72+
"dim0": ["foo", "boo", "bar"],
73+
"dim1": [1, 2, 3],
74+
"measure": [1.0, 1.1, 1.9],
75+
}
76+
),
77+
]
78+
versions = [1, 1, 2]
79+
rejected_rec_nums = [0, 1, 0]
80+
for df, version, rejected_rec_num in zip(dfs, versions, rejected_rec_nums):
81+
rejected_records = wr.timestream.write(
82+
df=df,
83+
database=name,
84+
table=name,
85+
time_col="time",
86+
measure_col="measure",
87+
dimensions_cols=["dim0", "dim1"],
88+
version=version,
89+
)
90+
assert len(rejected_records) == rejected_rec_num
91+
df_out = wr.timestream.query(
92+
f"""
93+
SELECT
94+
*
95+
FROM "{name}"."{name}"
96+
DESC LIMIT 10
97+
"""
98+
)
99+
assert df_out.shape == (3, 5)
100+
101+
49102
def test_real_csv_load_scenario(timestream_database_and_table):
50103
name = timestream_database_and_table
51104
df = pd.read_csv(

0 commit comments

Comments
 (0)