Skip to content

Commit f30f750

Browse files
fix: Fix issues where duration type returned as int (#1875)
1 parent 6454aff commit f30f750

File tree

13 files changed

+131
-45
lines changed

13 files changed

+131
-45
lines changed

bigframes/core/local_data.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import pyarrow as pa
3131
import pyarrow.parquet # type: ignore
3232

33+
from bigframes.core import pyarrow_utils
3334
import bigframes.core.schema as schemata
3435
import bigframes.dtypes
3536

@@ -113,7 +114,9 @@ def to_arrow(
113114
schema = self.data.schema
114115
if duration_type == "int":
115116
schema = _schema_durations_to_ints(schema)
116-
batches = map(functools.partial(_cast_pa_batch, schema=schema), batches)
117+
batches = map(
118+
functools.partial(pyarrow_utils.cast_batch, schema=schema), batches
119+
)
117120

118121
if offsets_col is not None:
119122
return schema.append(pa.field(offsets_col, pa.int64())), _append_offsets(
@@ -468,14 +471,6 @@ def _schema_durations_to_ints(schema: pa.Schema) -> pa.Schema:
468471
)
469472

470473

471-
# TODO: Use RecordBatch.cast once min pyarrow>=16.0
472-
def _cast_pa_batch(batch: pa.RecordBatch, schema: pa.Schema) -> pa.RecordBatch:
473-
return pa.record_batch(
474-
[arr.cast(type) for arr, type in zip(batch.columns, schema.types)],
475-
schema=schema,
476-
)
477-
478-
479474
def _pairwise(iterable):
480475
do_yield = False
481476
a = None

bigframes/core/pyarrow_utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ def chunk_by_row_count(
7474
yield buffer.take_as_batches(len(buffer))
7575

7676

77+
def cast_batch(batch: pa.RecordBatch, schema: pa.Schema) -> pa.RecordBatch:
78+
if batch.schema == schema:
79+
return batch
80+
# TODO: Use RecordBatch.cast once min pyarrow>=16.0
81+
return pa.record_batch(
82+
[arr.cast(type) for arr, type in zip(batch.columns, schema.types)],
83+
schema=schema,
84+
)
85+
86+
7787
def truncate_pyarrow_iterable(
7888
batches: Iterable[pa.RecordBatch], max_results: int
7989
) -> Iterator[pa.RecordBatch]:

bigframes/dtypes.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ class SimpleDtypeInfo:
247247
"decimal128(38, 9)[pyarrow]",
248248
"decimal256(76, 38)[pyarrow]",
249249
"binary[pyarrow]",
250+
"duration[us][pyarrow]",
250251
]
251252

252253
DTYPE_STRINGS = typing.get_args(DtypeString)
@@ -421,6 +422,8 @@ def is_bool_coercable(type_: ExpressionType) -> bool:
421422
# special case - both "Int64" and "int64[pyarrow]" are accepted
422423
BIGFRAMES_STRING_TO_BIGFRAMES["int64[pyarrow]"] = INT_DTYPE
423424

425+
BIGFRAMES_STRING_TO_BIGFRAMES["duration[us][pyarrow]"] = TIMEDELTA_DTYPE
426+
424427
# For the purposes of dataframe.memory_usage
425428
DTYPE_BYTE_SIZES = {
426429
type_info.dtype: type_info.logical_bytes for type_info in SIMPLE_TYPES

bigframes/session/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def arrow_batches(self) -> Iterator[pyarrow.RecordBatch]:
5050
result_rows = 0
5151

5252
for batch in self._arrow_batches:
53+
batch = pyarrow_utils.cast_batch(batch, self.schema.to_pyarrow())
5354
result_rows += batch.num_rows
5455

5556
maximum_result_rows = bigframes.options.compute.maximum_result_rows

bigframes/testing/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ def convert_pandas_dtypes(df: pd.DataFrame, bytes_col: bool):
185185
"timestamp_col"
186186
]
187187

188+
if not isinstance(df["duration_col"].dtype, pd.ArrowDtype):
189+
df["duration_col"] = df["duration_col"].astype(pd.Int64Dtype())
190+
arrow_table = pa.Table.from_pandas(
191+
pd.DataFrame(df, columns=["duration_col"]),
192+
schema=pa.schema([("duration_col", pa.duration("us"))]),
193+
)
194+
df["duration_col"] = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)[
195+
"duration_col"
196+
]
197+
188198
# Convert geography types columns.
189199
if "geography_col" in df.columns:
190200
df["geography_col"] = df["geography_col"].astype(

tests/data/scalars.jsonl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
{"bool_col": true, "bytes_col": "SGVsbG8sIFdvcmxkIQ==", "date_col": "2021-07-21", "datetime_col": "2021-07-21 11:39:45", "geography_col": "POINT(-122.0838511 37.3860517)", "int64_col": "123456789", "int64_too": "0", "numeric_col": "1.23456789", "float64_col": "1.25", "rowindex": 0, "rowindex_2": 0, "string_col": "Hello, World!", "time_col": "11:41:43.076160", "timestamp_col": "2021-07-21T17:43:43.945289Z"}
2-
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "1991-02-03", "datetime_col": "1991-01-02 03:45:06", "geography_col": "POINT(-71.104 42.315)", "int64_col": "-987654321", "int64_too": "1", "numeric_col": "1.23456789", "float64_col": "2.51", "rowindex": 1, "rowindex_2": 1, "string_col": "こんにちは", "time_col": "11:14:34.701606", "timestamp_col": "2021-07-21T17:43:43.945289Z"}
3-
{"bool_col": true, "bytes_col": "wqFIb2xhIE11bmRvIQ==", "date_col": "2023-03-01", "datetime_col": "2023-03-01 10:55:13", "geography_col": "POINT(-0.124474760143016 51.5007826749545)", "int64_col": "314159", "int64_too": "0", "numeric_col": "101.1010101", "float64_col": "2.5e10", "rowindex": 2, "rowindex_2": 2, "string_col": " ¡Hola Mundo! ", "time_col": "23:59:59.999999", "timestamp_col": "2023-03-01T10:55:13.250125Z"}
4-
{"bool_col": null, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": null, "int64_too": "1", "numeric_col": null, "float64_col": null, "rowindex": 3, "rowindex_2": 3, "string_col": null, "time_col": null, "timestamp_col": null}
5-
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "2021-07-21", "datetime_col": null, "geography_col": null, "int64_col": "-234892", "int64_too": "-2345", "numeric_col": null, "float64_col": null, "rowindex": 4, "rowindex_2": 4, "string_col": "Hello, World!", "time_col": null, "timestamp_col": null}
6-
{"bool_col": false, "bytes_col": "R8O8dGVuIFRhZw==", "date_col": "1980-03-14", "datetime_col": "1980-03-14 15:16:17", "geography_col": null, "int64_col": "55555", "int64_too": "0", "numeric_col": "5.555555", "float64_col": "555.555", "rowindex": 5, "rowindex_2": 5, "string_col": "Güten Tag!", "time_col": "15:16:17.181921", "timestamp_col": "1980-03-14T15:16:17.181921Z"}
7-
{"bool_col": true, "bytes_col": "SGVsbG8JQmlnRnJhbWVzIQc=", "date_col": "2023-05-23", "datetime_col": "2023-05-23 11:37:01", "geography_col": "LINESTRING(-0.127959 51.507728, -0.127026 51.507473)", "int64_col": "101202303", "int64_too": "2", "numeric_col": "-10.090807", "float64_col": "-123.456", "rowindex": 6, "rowindex_2": 6, "string_col": "capitalize, This ", "time_col": "01:02:03.456789", "timestamp_col": "2023-05-23T11:42:55.000001Z"}
8-
{"bool_col": true, "bytes_col": null, "date_col": "2038-01-20", "datetime_col": "2038-01-19 03:14:08", "geography_col": null, "int64_col": "-214748367", "int64_too": "2", "numeric_col": "11111111.1", "float64_col": "42.42", "rowindex": 7, "rowindex_2": 7, "string_col": " سلام", "time_col": "12:00:00.000001", "timestamp_col": "2038-01-19T03:14:17.999999Z"}
9-
{"bool_col": false, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": "2", "int64_too": "1", "numeric_col": null, "float64_col": "6.87", "rowindex": 8, "rowindex_2": 8, "string_col": "T", "time_col": null, "timestamp_col": null}
1+
{"bool_col": true, "bytes_col": "SGVsbG8sIFdvcmxkIQ==", "date_col": "2021-07-21", "datetime_col": "2021-07-21 11:39:45", "geography_col": "POINT(-122.0838511 37.3860517)", "int64_col": "123456789", "int64_too": "0", "numeric_col": "1.23456789", "float64_col": "1.25", "rowindex": 0, "rowindex_2": 0, "string_col": "Hello, World!", "time_col": "11:41:43.076160", "timestamp_col": "2021-07-21T17:43:43.945289Z", "duration_col": 4}
2+
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "1991-02-03", "datetime_col": "1991-01-02 03:45:06", "geography_col": "POINT(-71.104 42.315)", "int64_col": "-987654321", "int64_too": "1", "numeric_col": "1.23456789", "float64_col": "2.51", "rowindex": 1, "rowindex_2": 1, "string_col": "こんにちは", "time_col": "11:14:34.701606", "timestamp_col": "2021-07-21T17:43:43.945289Z", "duration_col": -1000000}
3+
{"bool_col": true, "bytes_col": "wqFIb2xhIE11bmRvIQ==", "date_col": "2023-03-01", "datetime_col": "2023-03-01 10:55:13", "geography_col": "POINT(-0.124474760143016 51.5007826749545)", "int64_col": "314159", "int64_too": "0", "numeric_col": "101.1010101", "float64_col": "2.5e10", "rowindex": 2, "rowindex_2": 2, "string_col": " ¡Hola Mundo! ", "time_col": "23:59:59.999999", "timestamp_col": "2023-03-01T10:55:13.250125Z", "duration_col": 0}
4+
{"bool_col": null, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": null, "int64_too": "1", "numeric_col": null, "float64_col": null, "rowindex": 3, "rowindex_2": 3, "string_col": null, "time_col": null, "timestamp_col": null, "duration_col": null}
5+
{"bool_col": false, "bytes_col": "44GT44KT44Gr44Gh44Gv", "date_col": "2021-07-21", "datetime_col": null, "geography_col": null, "int64_col": "-234892", "int64_too": "-2345", "numeric_col": null, "float64_col": null, "rowindex": 4, "rowindex_2": 4, "string_col": "Hello, World!", "time_col": null, "timestamp_col": null, "duration_col": 31540000000000}
6+
{"bool_col": false, "bytes_col": "R8O8dGVuIFRhZw==", "date_col": "1980-03-14", "datetime_col": "1980-03-14 15:16:17", "geography_col": null, "int64_col": "55555", "int64_too": "0", "numeric_col": "5.555555", "float64_col": "555.555", "rowindex": 5, "rowindex_2": 5, "string_col": "Güten Tag!", "time_col": "15:16:17.181921", "timestamp_col": "1980-03-14T15:16:17.181921Z", "duration_col": 4}
7+
{"bool_col": true, "bytes_col": "SGVsbG8JQmlnRnJhbWVzIQc=", "date_col": "2023-05-23", "datetime_col": "2023-05-23 11:37:01", "geography_col": "LINESTRING(-0.127959 51.507728, -0.127026 51.507473)", "int64_col": "101202303", "int64_too": "2", "numeric_col": "-10.090807", "float64_col": "-123.456", "rowindex": 6, "rowindex_2": 6, "string_col": "capitalize, This ", "time_col": "01:02:03.456789", "timestamp_col": "2023-05-23T11:42:55.000001Z", "duration_col": null}
8+
{"bool_col": true, "bytes_col": null, "date_col": "2038-01-20", "datetime_col": "2038-01-19 03:14:08", "geography_col": null, "int64_col": "-214748367", "int64_too": "2", "numeric_col": "11111111.1", "float64_col": "42.42", "rowindex": 7, "rowindex_2": 7, "string_col": " سلام", "time_col": "12:00:00.000001", "timestamp_col": "2038-01-19T03:14:17.999999Z", "duration_col": 4}
9+
{"bool_col": false, "bytes_col": null, "date_col": null, "datetime_col": null, "geography_col": null, "int64_col": "2", "int64_too": "1", "numeric_col": null, "float64_col": "6.87", "rowindex": 8, "rowindex_2": 8, "string_col": "T", "time_col": null, "timestamp_col": null, "duration_col": 432000000000}

tests/data/scalars_schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,11 @@
7171
"mode": "NULLABLE",
7272
"name": "timestamp_col",
7373
"type": "TIMESTAMP"
74+
},
75+
{
76+
"mode": "NULLABLE",
77+
"name": "duration_col",
78+
"type": "INTEGER",
79+
"description": "#microseconds"
7480
}
7581
]

tests/system/small/pandas/core/methods/test_describe.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@ def test_df_describe_non_temporal(scalars_dfs):
2121
pytest.importorskip("pandas", minversion="2.0.0")
2222
scalars_df, scalars_pandas_df = scalars_dfs
2323
# excluding temporal columns here because BigFrames cannot perform percentiles operations on them
24-
unsupported_columns = ["datetime_col", "timestamp_col", "time_col", "date_col"]
24+
unsupported_columns = [
25+
"datetime_col",
26+
"timestamp_col",
27+
"time_col",
28+
"date_col",
29+
"duration_col",
30+
]
2531
bf_result = scalars_df.drop(columns=unsupported_columns).describe().to_pandas()
2632

2733
modified_pd_df = scalars_pandas_df.drop(columns=unsupported_columns)

tests/system/small/test_dataframe.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def test_df_info(scalars_dfs):
553553
expected = (
554554
"<class 'bigframes.dataframe.DataFrame'>\n"
555555
"Index: 9 entries, 0 to 8\n"
556-
"Data columns (total 13 columns):\n"
556+
"Data columns (total 14 columns):\n"
557557
" # Column Non-Null Count Dtype\n"
558558
"--- ------------- ---------------- ------------------------------\n"
559559
" 0 bool_col 8 non-null boolean\n"
@@ -569,8 +569,9 @@ def test_df_info(scalars_dfs):
569569
" 10 string_col 8 non-null string\n"
570570
" 11 time_col 6 non-null time64[us][pyarrow]\n"
571571
" 12 timestamp_col 6 non-null timestamp[us, tz=UTC][pyarrow]\n"
572-
"dtypes: Float64(1), Int64(3), binary[pyarrow](1), boolean(1), date32[day][pyarrow](1), decimal128(38, 9)[pyarrow](1), geometry(1), string(1), time64[us][pyarrow](1), timestamp[us, tz=UTC][pyarrow](1), timestamp[us][pyarrow](1)\n"
573-
"memory usage: 1269 bytes\n"
572+
" 13 duration_col 7 non-null duration[us][pyarrow]\n"
573+
"dtypes: Float64(1), Int64(3), binary[pyarrow](1), boolean(1), date32[day][pyarrow](1), decimal128(38, 9)[pyarrow](1), duration[us][pyarrow](1), geometry(1), string(1), time64[us][pyarrow](1), timestamp[us, tz=UTC][pyarrow](1), timestamp[us][pyarrow](1)\n"
574+
"memory usage: 1341 bytes\n"
574575
)
575576

576577
scalars_df, _ = scalars_dfs
@@ -1694,6 +1695,7 @@ def test_get_dtypes(scalars_df_default_index):
16941695
"string_col": pd.StringDtype(storage="pyarrow"),
16951696
"time_col": pd.ArrowDtype(pa.time64("us")),
16961697
"timestamp_col": pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
1698+
"duration_col": pd.ArrowDtype(pa.duration("us")),
16971699
}
16981700
pd.testing.assert_series_equal(
16991701
dtypes,
@@ -4771,6 +4773,9 @@ def test_df_to_json_local_str(scalars_df_index, scalars_pandas_df_index):
47714773
def test_df_to_json_local_file(scalars_df_index, scalars_pandas_df_index):
47724774
# TODO: supply a reason why this isn't compatible with pandas 1.x
47734775
pytest.importorskip("pandas", minversion="2.0.0")
4776+
# duration not fully supported at pandas level
4777+
scalars_df_index = scalars_df_index.drop(columns="duration_col")
4778+
scalars_pandas_df_index = scalars_pandas_df_index.drop(columns="duration_col")
47744779
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
47754780
scalars_df_index.to_json(bf_result_file, orient="table")
47764781
# default_handler for arrow types that have no default conversion
@@ -4882,6 +4887,7 @@ def test_df_to_orc(scalars_df_index, scalars_pandas_df_index):
48824887
"time_col",
48834888
"timestamp_col",
48844889
"geography_col",
4890+
"duration_col",
48854891
]
48864892

48874893
bf_result_file = tempfile.TemporaryFile()

tests/system/small/test_dataframe_io.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_sql_executes(scalars_df_default_index, bigquery_client):
5555
"""
5656
# Do some operations to make for more complex SQL.
5757
df = (
58-
scalars_df_default_index.drop(columns=["geography_col"])
58+
scalars_df_default_index.drop(columns=["geography_col", "duration_col"])
5959
.groupby("string_col")
6060
.max()
6161
)
@@ -87,7 +87,7 @@ def test_sql_executes_and_includes_named_index(
8787
"""
8888
# Do some operations to make for more complex SQL.
8989
df = (
90-
scalars_df_default_index.drop(columns=["geography_col"])
90+
scalars_df_default_index.drop(columns=["geography_col", "duration_col"])
9191
.groupby("string_col")
9292
.max()
9393
)
@@ -120,7 +120,7 @@ def test_sql_executes_and_includes_named_multiindex(
120120
"""
121121
# Do some operations to make for more complex SQL.
122122
df = (
123-
scalars_df_default_index.drop(columns=["geography_col"])
123+
scalars_df_default_index.drop(columns=["geography_col", "duration_col"])
124124
.groupby(["string_col", "bool_col"])
125125
.max()
126126
)
@@ -999,14 +999,16 @@ def test_to_sql_query_unnamed_index_included(
999999
scalars_df_default_index: bpd.DataFrame,
10001000
scalars_pandas_df_default_index: pd.DataFrame,
10011001
):
1002-
bf_df = scalars_df_default_index.reset_index(drop=True)
1002+
bf_df = scalars_df_default_index.reset_index(drop=True).drop(columns="duration_col")
10031003
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=True)
10041004
assert len(idx_labels) == 1
10051005
assert len(idx_ids) == 1
10061006
assert idx_labels[0] is None
10071007
assert idx_ids[0].startswith("bigframes")
10081008

1009-
pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
1009+
pd_df = scalars_pandas_df_default_index.reset_index(drop=True).drop(
1010+
columns="duration_col"
1011+
)
10101012
roundtrip = session.read_gbq(sql, index_col=idx_ids)
10111013
roundtrip.index.names = [None]
10121014
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df, check_index_type=False)
@@ -1017,14 +1019,18 @@ def test_to_sql_query_named_index_included(
10171019
scalars_df_default_index: bpd.DataFrame,
10181020
scalars_pandas_df_default_index: pd.DataFrame,
10191021
):
1020-
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True)
1022+
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True).drop(
1023+
columns="duration_col"
1024+
)
10211025
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=True)
10221026
assert len(idx_labels) == 1
10231027
assert len(idx_ids) == 1
10241028
assert idx_labels[0] == "rowindex_2"
10251029
assert idx_ids[0] == "rowindex_2"
10261030

1027-
pd_df = scalars_pandas_df_default_index.set_index("rowindex_2", drop=True)
1031+
pd_df = scalars_pandas_df_default_index.set_index("rowindex_2", drop=True).drop(
1032+
columns="duration_col"
1033+
)
10281034
roundtrip = session.read_gbq(sql, index_col=idx_ids)
10291035
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df)
10301036

@@ -1034,12 +1040,14 @@ def test_to_sql_query_unnamed_index_excluded(
10341040
scalars_df_default_index: bpd.DataFrame,
10351041
scalars_pandas_df_default_index: pd.DataFrame,
10361042
):
1037-
bf_df = scalars_df_default_index.reset_index(drop=True)
1043+
bf_df = scalars_df_default_index.reset_index(drop=True).drop(columns="duration_col")
10381044
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=False)
10391045
assert len(idx_labels) == 0
10401046
assert len(idx_ids) == 0
10411047

1042-
pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
1048+
pd_df = scalars_pandas_df_default_index.reset_index(drop=True).drop(
1049+
columns="duration_col"
1050+
)
10431051
roundtrip = session.read_gbq(sql)
10441052
utils.assert_pandas_df_equal(
10451053
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
@@ -1051,14 +1059,18 @@ def test_to_sql_query_named_index_excluded(
10511059
scalars_df_default_index: bpd.DataFrame,
10521060
scalars_pandas_df_default_index: pd.DataFrame,
10531061
):
1054-
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True)
1062+
bf_df = scalars_df_default_index.set_index("rowindex_2", drop=True).drop(
1063+
columns="duration_col"
1064+
)
10551065
sql, idx_ids, idx_labels = bf_df._to_sql_query(include_index=False)
10561066
assert len(idx_labels) == 0
10571067
assert len(idx_ids) == 0
10581068

1059-
pd_df = scalars_pandas_df_default_index.set_index(
1060-
"rowindex_2", drop=True
1061-
).reset_index(drop=True)
1069+
pd_df = (
1070+
scalars_pandas_df_default_index.set_index("rowindex_2", drop=True)
1071+
.reset_index(drop=True)
1072+
.drop(columns="duration_col")
1073+
)
10621074
roundtrip = session.read_gbq(sql)
10631075
utils.assert_pandas_df_equal(
10641076
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True

0 commit comments

Comments
 (0)