Skip to content

Commit 0d6ae87

Browse files
authored
chore: add max_batching_rows to experimental blob transform functions (#1407)
1 parent abe48d6 commit 0d6ae87

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

bigframes/blob/_functions.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,17 @@ class TransformFunction:
3737
"""Simple transform function class to deal with Python UDF."""
3838

3939
def __init__(
40-
self, func_def: FunctionDef, session: bigframes.session.Session, connection: str
40+
self,
41+
func_def: FunctionDef,
42+
session: bigframes.session.Session,
43+
connection: str,
44+
max_batching_rows: int,
4145
):
4246
self._func = func_def.func
4347
self._requirements = func_def.requirements
4448
self._session = session
4549
self._connection = connection
50+
self._max_batching_rows = max_batching_rows
4651

4752
def _input_bq_signature(self):
4853
sig = inspect.signature(self._func)
@@ -67,7 +72,7 @@ def _create_udf(self):
6772
CREATE OR REPLACE FUNCTION `{udf_name}`({self._input_bq_signature()})
6873
RETURNS {self._output_bq_type()} LANGUAGE python
6974
WITH CONNECTION `{self._connection}`
70-
OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages})
75+
OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages}, max_batching_rows={self._max_batching_rows})
7176
AS r\"\"\"
7277
7378

bigframes/operations/blob.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ def image_blur(
278278
*,
279279
dst: Optional[Union[str, bigframes.series.Series]] = None,
280280
connection: Optional[str] = None,
281+
max_batching_rows: int = 10000,
281282
) -> bigframes.series.Series:
282283
"""Blurs images.
283284
@@ -288,6 +289,7 @@ def image_blur(
288289
ksize (tuple(int, int)): Kernel size.
289290
dst (str or bigframes.series.Series or None, default None): Destination GCS folder str or blob series. If None, output to BQ as bytes.
290291
connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
292+
max_batching_rows (int, default 10,000): Max number of rows per batch send to cloud run to execute the function.
291293
292294
Returns:
293295
BigFrames Blob Series
@@ -302,6 +304,7 @@ def image_blur(
302304
blob_func.image_blur_to_bytes_def,
303305
session=self._block.session,
304306
connection=connection,
307+
max_batching_rows=max_batching_rows,
305308
).udf()
306309

307310
df["ksize_x"], df["ksize_y"] = ksize
@@ -322,6 +325,7 @@ def image_blur(
322325
blob_func.image_blur_def,
323326
session=self._block.session,
324327
connection=connection,
328+
max_batching_rows=max_batching_rows,
325329
).udf()
326330

327331
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
@@ -342,6 +346,7 @@ def image_resize(
342346
fy: float = 0.0,
343347
dst: Optional[Union[str, bigframes.series.Series]] = None,
344348
connection: Optional[str] = None,
349+
max_batching_rows: int = 10000,
345350
):
346351
"""Resize images.
347352
@@ -354,6 +359,7 @@ def image_resize(
354359
fy (float, defalut 0.0): scale factor along the vertical axis. If set to 0.0, dsize parameter determines the output size.
355360
dst (str or bigframes.series.Series or None, default None): Destination GCS folder str or blob series. If None, output to BQ as bytes.
356361
connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
362+
max_batching_rows (int, default 10,000): Max number of rows per batch send to cloud run to execute the function.
357363
358364
Returns:
359365
BigFrames Blob Series
@@ -375,6 +381,7 @@ def image_resize(
375381
blob_func.image_resize_to_bytes_def,
376382
session=self._block.session,
377383
connection=connection,
384+
max_batching_rows=max_batching_rows,
378385
).udf()
379386

380387
df["dsize_x"], df["dsizye_y"] = dsize
@@ -396,6 +403,7 @@ def image_resize(
396403
blob_func.image_resize_def,
397404
session=self._block.session,
398405
connection=connection,
406+
max_batching_rows=max_batching_rows,
399407
).udf()
400408

401409
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
@@ -417,6 +425,7 @@ def image_normalize(
417425
norm_type: str = "l2",
418426
dst: Optional[Union[str, bigframes.series.Series]] = None,
419427
connection: Optional[str] = None,
428+
max_batching_rows: int = 10000,
420429
) -> bigframes.series.Series:
421430
"""Normalize images.
422431
@@ -429,6 +438,7 @@ def image_normalize(
429438
norm_type (str, default "l2"): Normalization type. Accepted values are "inf", "l1", "l2" and "minmax".
430439
dst (str or bigframes.series.Series or None, default None): Destination GCS folder str or blob series. If None, output to BQ as bytes.
431440
connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
441+
max_batching_rows (int, default 10,000): Max number of rows per batch send to cloud run to execute the function.
432442
433443
Returns:
434444
BigFrames Blob Series
@@ -443,6 +453,7 @@ def image_normalize(
443453
blob_func.image_normalize_to_bytes_def,
444454
session=self._block.session,
445455
connection=connection,
456+
max_batching_rows=max_batching_rows,
446457
).udf()
447458

448459
df["alpha"] = alpha
@@ -465,6 +476,7 @@ def image_normalize(
465476
blob_func.image_normalize_def,
466477
session=self._block.session,
467478
connection=connection,
479+
max_batching_rows=max_batching_rows,
468480
).udf()
469481

470482
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
@@ -480,7 +492,10 @@ def image_normalize(
480492
return dst
481493

482494
def pdf_extract(
483-
self, *, connection: Optional[str] = None
495+
self,
496+
*,
497+
connection: Optional[str] = None,
498+
max_batching_rows: int = 10000,
484499
) -> bigframes.series.Series:
485500
"""Extracts and chunks text from PDF URLs and saves the text as
486501
arrays of string.
@@ -493,6 +508,8 @@ def pdf_extract(
493508
connection (str or None, default None): BQ connection used for
494509
function internet transactions, and the output blob if "dst"
495510
is str. If None, uses default connection of the session.
511+
max_batching_rows (int, default 10,000): Max number of rows per batch
512+
send to cloud run to execute the function.
496513
497514
Returns:
498515
bigframes.series.Series: conatins all text from a pdf file
@@ -502,14 +519,15 @@ def pdf_extract(
502519

503520
connection = self._resolve_connection(connection)
504521

505-
pdf_chunk_udf = blob_func.TransformFunction(
522+
pdf_extract_udf = blob_func.TransformFunction(
506523
blob_func.pdf_extract_def,
507524
session=self._block.session,
508525
connection=connection,
526+
max_batching_rows=max_batching_rows,
509527
).udf()
510528

511529
src_rt = self._get_runtime_json_str(mode="R")
512-
res = src_rt.apply(pdf_chunk_udf)
530+
res = src_rt.apply(pdf_extract_udf)
513531
return res
514532

515533
def pdf_chunk(
@@ -518,6 +536,7 @@ def pdf_chunk(
518536
connection: Optional[str] = None,
519537
chunk_size: int = 1000,
520538
overlap_size: int = 200,
539+
max_batching_rows: int = 10000,
521540
) -> bigframes.series.Series:
522541
"""Extracts and chunks text from PDF URLs and saves the text as
523542
arrays of strings.
@@ -535,6 +554,8 @@ def pdf_chunk(
535554
overlap_size (int, default 200): the number of overlapping characters
536555
between consective chunks. The helps to ensure context is
537556
perserved across chunk boundaries.
557+
max_batching_rows (int, default 10,000): Max number of rows per batch
558+
send to cloud run to execute the function.
538559
539560
Returns:
540561
bigframe.series.Series of array[str], where each string is a
@@ -557,6 +578,7 @@ def pdf_chunk(
557578
blob_func.pdf_chunk_def,
558579
session=self._block.session,
559580
connection=connection,
581+
max_batching_rows=max_batching_rows,
560582
).udf()
561583

562584
src_rt = self._get_runtime_json_str(mode="R")

0 commit comments

Comments
 (0)