Skip to content

Commit e3cfbad

Browse files
kukushkingmalachi-constantjaidisido
authored
Timestream - array cols to str (#1368)
Co-authored-by: Lucas Hanson <[email protected]> Co-authored-by: jaidisido <[email protected]>
1 parent 07bda17 commit e3cfbad

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

awswrangler/timestream.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def _cast_value(value: str, dtype: str) -> Any: # pylint: disable=too-many-bran
100100
return datetime.strptime(value, "%Y-%m-%d").date()
101101
if dtype == "TIME":
102102
return datetime.strptime(value[:-3], "%H:%M:%S.%f").time()
103+
if dtype == "ARRAY":
104+
return str(value)
103105
raise ValueError(f"Not supported Amazon Timestream type: {dtype}")
104106

105107

@@ -110,9 +112,11 @@ def _process_row(schema: List[Dict[str, str]], row: Dict[str, Any]) -> List[Any]
110112
row_processed.append(None)
111113
elif "ScalarValue" in col:
112114
row_processed.append(_cast_value(value=col["ScalarValue"], dtype=col_schema["type"]))
115+
elif "ArrayValue" in col:
116+
row_processed.append(_cast_value(value=col["ArrayValue"], dtype="ARRAY"))
113117
else:
114118
raise ValueError(
115-
f"Query with non ScalarType/NullValue for column {col_schema['name']}. "
119+
f"Query with non ScalarType/ArrayColumnInfo/NullValue for column {col_schema['name']}. "
116120
f"Expected {col_schema['type']} instead of {col}"
117121
)
118122
return row_processed
@@ -129,9 +133,12 @@ def _rows_to_df(rows: List[List[Any]], schema: List[Dict[str, str]]) -> pd.DataF
129133
def _process_schema(page: Dict[str, Any]) -> List[Dict[str, str]]:
130134
schema: List[Dict[str, str]] = []
131135
for col in page["ColumnInfo"]:
132-
if "ScalarType" not in col["Type"]:
133-
raise ValueError(f"Query with non ScalarType for column {col['Name']}: {col['Type']}")
134-
schema.append({"name": col["Name"], "type": col["Type"]["ScalarType"]})
136+
if "ScalarType" in col["Type"]:
137+
schema.append({"name": col["Name"], "type": col["Type"]["ScalarType"]})
138+
elif "ArrayColumnInfo" in col["Type"]:
139+
schema.append({"name": col["Name"], "type": col["Type"]["ArrayColumnInfo"]})
140+
else:
141+
raise ValueError(f"Query with non ScalarType or ArrayColumnInfo for column {col['Name']}: {col['Type']}")
135142
return schema
136143

137144

0 commit comments

Comments
 (0)