Skip to content

Commit 801bac7

Browse files
committed
fix: read_csv fails when check file size for wildcard gcs files
1 parent 209d0d4 commit 801bac7

File tree

2 files changed

+47
-31
lines changed

2 files changed

+47
-31
lines changed

bigframes/session/__init__.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from collections import abc
2020
import datetime
21+
import fnmatch
22+
import inspect
2123
import logging
2224
import os
2325
import secrets
@@ -1344,12 +1346,24 @@ def read_json(
13441346
def _check_file_size(self, filepath: str):
13451347
max_size = 1024 * 1024 * 1024 # 1 GB in bytes
13461348
if filepath.startswith("gs://"): # GCS file path
1349+
bucket_name, blob_path = filepath.split("/", 3)[2:]
1350+
13471351
client = storage.Client()
1348-
bucket_name, blob_name = filepath.split("/", 3)[2:]
13491352
bucket = client.bucket(bucket_name)
1350-
blob = bucket.blob(blob_name)
1351-
blob.reload()
1352-
file_size = blob.size
1353+
1354+
list_blobs_params = inspect.signature(bucket.list_blobs).parameters
1355+
if "match_glob" in list_blobs_params:
1356+
# Modern, efficient method for new library versions
1357+
matching_blobs = bucket.list_blobs(match_glob=blob_path)
1358+
file_size = sum(blob.size for blob in matching_blobs)
1359+
else:
1360+
# Fallback method for older library versions
1361+
prefix = blob_path.split("*", 1)[0]
1362+
all_blobs = bucket.list_blobs(prefix=prefix)
1363+
matching_blobs = [
1364+
blob for blob in all_blobs if fnmatch.fnmatch(blob.name, blob_path)
1365+
]
1366+
file_size = sum(blob.size for blob in matching_blobs)
13531367
elif os.path.exists(filepath): # local file path
13541368
file_size = os.path.getsize(filepath)
13551369
else:

tests/system/small/test_session.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,15 +1041,7 @@ def test_read_pandas_w_nested_json_fails(session, write_engine):
10411041
session.read_pandas(pd_s, write_engine=write_engine)
10421042

10431043

1044-
@pytest.mark.parametrize(
1045-
("write_engine"),
1046-
[
1047-
pytest.param("default"),
1048-
pytest.param("bigquery_inline"),
1049-
pytest.param("bigquery_streaming"),
1050-
pytest.param("bigquery_write"),
1051-
],
1052-
)
1044+
@all_write_engines
10531045
def test_read_pandas_w_nested_json(session, write_engine):
10541046
# TODO: supply a reason why this isn't compatible with pandas 1.x
10551047
pytest.importorskip("pandas", minversion="2.0.0")
@@ -1074,15 +1066,7 @@ def test_read_pandas_w_nested_json(session, write_engine):
10741066
pd.testing.assert_series_equal(bq_s, pd_s)
10751067

10761068

1077-
@pytest.mark.parametrize(
1078-
("write_engine"),
1079-
[
1080-
pytest.param("default"),
1081-
pytest.param("bigquery_inline"),
1082-
pytest.param("bigquery_load"),
1083-
pytest.param("bigquery_streaming"),
1084-
],
1085-
)
1069+
@all_write_engines
10861070
def test_read_pandas_w_nested_invalid_json(session, write_engine):
10871071
# TODO: supply a reason why this isn't compatible with pandas 1.x
10881072
pytest.importorskip("pandas", minversion="2.0.0")
@@ -1127,15 +1111,7 @@ def test_read_pandas_w_nested_json_index_fails(session, write_engine):
11271111
session.read_pandas(pd_idx, write_engine=write_engine)
11281112

11291113

1130-
@pytest.mark.parametrize(
1131-
("write_engine"),
1132-
[
1133-
pytest.param("default"),
1134-
pytest.param("bigquery_inline"),
1135-
pytest.param("bigquery_streaming"),
1136-
pytest.param("bigquery_write"),
1137-
],
1138-
)
1114+
@all_write_engines
11391115
def test_read_pandas_w_nested_json_index(session, write_engine):
11401116
# TODO: supply a reason why this isn't compatible with pandas 1.x
11411117
pytest.importorskip("pandas", minversion="2.0.0")
@@ -1287,6 +1263,32 @@ def test_read_csv_raises_error_for_invalid_index_col(
12871263
session.read_csv(path, engine="bigquery", index_col=index_col)
12881264

12891265

1266+
def test_read_csv_for_gcs_wildcard_path(session, df_and_gcs_csv):
1267+
scalars_pandas_df, path = df_and_gcs_csv
1268+
path = path.replace(".csv", "*.csv")
1269+
1270+
index_col = "rowindex"
1271+
bf_df = session.read_csv(path, engine="bigquery", index_col=index_col)
1272+
1273+
# Convert default pandas dtypes to match BigQuery DataFrames dtypes.
1274+
# Also, `expand=True` is needed to read from wildcard paths. See details:
1275+
# https://github.com/fsspec/gcsfs/issues/616,
1276+
if not pd.__version__.startswith("1."):
1277+
storage_options = {"expand": True}
1278+
else:
1279+
storage_options = None
1280+
pd_df = session.read_csv(
1281+
path,
1282+
index_col=index_col,
1283+
dtype=scalars_pandas_df.dtypes.to_dict(),
1284+
storage_options=storage_options,
1285+
)
1286+
1287+
assert bf_df.shape == pd_df.shape
1288+
assert bf_df.columns.tolist() == pd_df.columns.tolist()
1289+
pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas())
1290+
1291+
12901292
def test_read_csv_for_names(session, df_and_gcs_csv_for_two_columns):
12911293
_, path = df_and_gcs_csv_for_two_columns
12921294

0 commit comments

Comments
 (0)