Skip to content

Commit 4d71547

Browse files
Refactor load_table methods to reduce code duplication
This commit refactors the `load_table_from_uri`, `load_table_from_file`, `load_table_from_dataframe`, and `load_table_from_json` methods in the BigQuery client. A new private helper method, `_prepare_load_job`, has been introduced to centralize the creation and configuration of `LoadJob` objects. This eliminates redundant code for setting up job IDs, projects, and locations. A second helper, `_prepare_load_config`, was also added to standardize the handling of `LoadJobConfig` objects, merging user-provided configurations with client defaults. These changes make the loading methods more concise, easier to read, and simpler to maintain.
1 parent 84fa75b commit 4d71547

File tree

1 file changed

+73
-68
lines changed

1 file changed

+73
-68
lines changed

google/cloud/bigquery/client.py

Lines changed: 73 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2490,6 +2490,51 @@ def api_request(*args, **kwargs):
24902490
page_size=page_size,
24912491
)
24922492

2493+
def _prepare_load_config(
2494+
self, job_config: Optional[LoadJobConfig] = None
2495+
) -> LoadJobConfig:
2496+
"""Helper to construct a load job configuration.
2497+
2498+
Args:
2499+
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
2500+
A user-supplied job configuration.
2501+
2502+
Returns:
2503+
google.cloud.bigquery.job.LoadJobConfig:
2504+
The job configuration to use for a load job.
2505+
"""
2506+
if job_config is not None:
2507+
_verify_job_config_type(job_config, LoadJobConfig)
2508+
else:
2509+
job_config = job.LoadJobConfig()
2510+
2511+
return job_config._fill_from_default(self._default_load_job_config)
2512+
2513+
def _prepare_load_job(
2514+
self,
2515+
destination: Union[Table, TableReference, str],
2516+
job_config: LoadJobConfig,
2517+
job_id: Optional[str] = None,
2518+
job_id_prefix: Optional[str] = None,
2519+
location: Optional[str] = None,
2520+
project: Optional[str] = None,
2521+
source_uris: Optional[Sequence[str]] = None,
2522+
) -> job.LoadJob:
2523+
"""Helper for `load_table_from_` methods to prepare a LoadJob."""
2524+
job_id = _make_job_id(job_id, job_id_prefix)
2525+
2526+
if project is None:
2527+
project = self.project
2528+
2529+
if location is None:
2530+
location = self.location
2531+
2532+
job_ref = job._JobReference(job_id, project=project, location=location)
2533+
2534+
destination = _table_arg_to_table_ref(destination, default_project=self.project)
2535+
2536+
return job.LoadJob(job_ref, source_uris, destination, self, job_config)
2537+
24932538
def load_table_from_uri(
24942539
self,
24952540
source_uris: Union[str, Sequence[str]],
@@ -2547,31 +2592,21 @@ def load_table_from_uri(
25472592
If ``job_config`` is not an instance of
25482593
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
25492594
"""
2550-
job_id = _make_job_id(job_id, job_id_prefix)
2551-
2552-
if project is None:
2553-
project = self.project
2554-
2555-
if location is None:
2556-
location = self.location
2557-
2558-
job_ref = job._JobReference(job_id, project=project, location=location)
2559-
25602595
if isinstance(source_uris, str):
25612596
source_uris = [source_uris]
25622597

2563-
destination = _table_arg_to_table_ref(destination, default_project=self.project)
2564-
2565-
if job_config is not None:
2566-
_verify_job_config_type(job_config, LoadJobConfig)
2567-
else:
2568-
job_config = job.LoadJobConfig()
2569-
2570-
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2598+
new_job_config = self._prepare_load_config(job_config)
2599+
load_job = self._prepare_load_job(
2600+
destination,
2601+
new_job_config,
2602+
job_id=job_id,
2603+
job_id_prefix=job_id_prefix,
2604+
location=location,
2605+
project=project,
2606+
source_uris=source_uris,
2607+
)
25712608

2572-
load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
25732609
load_job._begin(retry=retry, timeout=timeout)
2574-
25752610
return load_job
25762611

25772612
def load_table_from_file(
@@ -2647,25 +2682,15 @@ def load_table_from_file(
26472682
If ``job_config`` is not an instance of
26482683
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
26492684
"""
2650-
job_id = _make_job_id(job_id, job_id_prefix)
2651-
2652-
if project is None:
2653-
project = self.project
2654-
2655-
if location is None:
2656-
location = self.location
2657-
2658-
destination = _table_arg_to_table_ref(destination, default_project=self.project)
2659-
job_ref = job._JobReference(job_id, project=project, location=location)
2660-
2661-
if job_config is not None:
2662-
_verify_job_config_type(job_config, LoadJobConfig)
2663-
else:
2664-
job_config = job.LoadJobConfig()
2665-
2666-
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2667-
2668-
load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
2685+
new_job_config = self._prepare_load_config(job_config)
2686+
load_job = self._prepare_load_job(
2687+
destination,
2688+
new_job_config,
2689+
job_id=job_id,
2690+
job_id_prefix=job_id_prefix,
2691+
location=location,
2692+
project=project,
2693+
)
26692694
job_resource = load_job.to_api_repr()
26702695

26712696
if rewind:
@@ -2796,14 +2821,7 @@ def load_table_from_dataframe(
27962821
If ``job_config`` is not an instance of
27972822
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
27982823
"""
2799-
job_id = _make_job_id(job_id, job_id_prefix)
2800-
2801-
if job_config is not None:
2802-
_verify_job_config_type(job_config, LoadJobConfig)
2803-
else:
2804-
job_config = job.LoadJobConfig()
2805-
2806-
new_job_config = job_config._fill_from_default(self._default_load_job_config)
2824+
new_job_config = self._prepare_load_config(job_config)
28072825

28082826
supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
28092827
if new_job_config.source_format is None:
@@ -2830,9 +2848,6 @@ def load_table_from_dataframe(
28302848
# pyarrow is now the only supported parquet engine.
28312849
raise ValueError("This method requires pyarrow to be installed")
28322850

2833-
if location is None:
2834-
location = self.location
2835-
28362851
# If table schema is not provided, we try to fetch the existing table
28372852
# schema, and check if dataframe schema is compatible with it - except
28382853
# for WRITE_TRUNCATE jobs, the existing schema does not matter then.
@@ -2877,8 +2892,14 @@ def load_table_from_dataframe(
28772892
stacklevel=2,
28782893
)
28792894

2895+
# We need a unique suffix for every load_table_from_dataframe call to
2896+
# avoid collisions.
2897+
# See: https://github.com/googleapis/python-bigquery/issues/1363
2898+
session_suffix = uuid.uuid4().hex
28802899
tmpfd, tmppath = tempfile.mkstemp(
2881-
suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
2900+
suffix="_job_{}.{}".format(
2901+
session_suffix, new_job_config.source_format.lower()
2902+
)
28822903
)
28832904
os.close(tmpfd)
28842905

@@ -3012,15 +3033,7 @@ def load_table_from_json(
30123033
If ``job_config`` is not an instance of
30133034
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
30143035
"""
3015-
job_id = _make_job_id(job_id, job_id_prefix)
3016-
3017-
if job_config is not None:
3018-
_verify_job_config_type(job_config, LoadJobConfig)
3019-
else:
3020-
job_config = job.LoadJobConfig()
3021-
3022-
new_job_config = job_config._fill_from_default(self._default_load_job_config)
3023-
3036+
new_job_config = self._prepare_load_config(job_config)
30243037
new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON
30253038

30263039
# In specific conditions, we check if the table alread exists, and/or
@@ -3040,14 +3053,6 @@ def load_table_from_json(
30403053
else:
30413054
new_job_config.autodetect = False
30423055

3043-
if project is None:
3044-
project = self.project
3045-
3046-
if location is None:
3047-
location = self.location
3048-
3049-
destination = _table_arg_to_table_ref(destination, default_project=self.project)
3050-
30513056
data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows)
30523057
encoded_str = data_str.encode()
30533058
data_file = io.BytesIO(encoded_str)

0 commit comments

Comments
 (0)