Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from collections import abc
import datetime
import fnmatch
import inspect
import logging
import os
import secrets
Expand Down Expand Up @@ -1344,12 +1346,24 @@ def read_json(
def _check_file_size(self, filepath: str):
max_size = 1024 * 1024 * 1024 # 1 GB in bytes
if filepath.startswith("gs://"): # GCS file path
bucket_name, blob_path = filepath.split("/", 3)[2:]

client = storage.Client()
bucket_name, blob_name = filepath.split("/", 3)[2:]
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.reload()
file_size = blob.size

list_blobs_params = inspect.signature(bucket.list_blobs).parameters
if "match_glob" in list_blobs_params:
# Modern, efficient method for new library versions
matching_blobs = bucket.list_blobs(match_glob=blob_path)
file_size = sum(blob.size for blob in matching_blobs)
else:
# Fallback method for older library versions
prefix = blob_path.split("*", 1)[0]
all_blobs = bucket.list_blobs(prefix=prefix)
matching_blobs = [
blob for blob in all_blobs if fnmatch.fnmatch(blob.name, blob_path)
]
file_size = sum(blob.size for blob in matching_blobs)
elif os.path.exists(filepath): # local file path
file_size = os.path.getsize(filepath)
else:
Expand Down
26 changes: 26 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,32 @@ def test_read_csv_raises_error_for_invalid_index_col(
session.read_csv(path, engine="bigquery", index_col=index_col)


def test_read_csv_for_gcs_wildcard_path(session, df_and_gcs_csv):
scalars_pandas_df, path = df_and_gcs_csv
path = path.replace(".csv", "*.csv")

index_col = "rowindex"
bf_df = session.read_csv(path, engine="bigquery", index_col=index_col)

# Convert default pandas dtypes to match BigQuery DataFrames dtypes.
# Also, `expand=True` is needed to read from wildcard paths. See details:
# https://github.com/fsspec/gcsfs/issues/616,
if not pd.__version__.startswith("1."):
storage_options = {"expand": True}
else:
storage_options = None
pd_df = session.read_csv(
path,
index_col=index_col,
dtype=scalars_pandas_df.dtypes.to_dict(),
storage_options=storage_options,
)

assert bf_df.shape == pd_df.shape
assert bf_df.columns.tolist() == pd_df.columns.tolist()
pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas())


def test_read_csv_for_names(session, df_and_gcs_csv_for_two_columns):
_, path = df_and_gcs_csv_for_two_columns

Expand Down