Skip to content

Commit 45e442d

Browse files
authored
chore: add experimental blob tansform func tuning params (#1422)
1 parent 1251ded commit 45e442d

File tree

2 files changed

+55
-13
lines changed

2 files changed

+55
-13
lines changed

bigframes/blob/_functions.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from dataclasses import dataclass
1616
import inspect
17-
from typing import Callable, Iterable
17+
from typing import Callable, Iterable, Union
1818

1919
import google.cloud.bigquery as bigquery
2020

@@ -42,12 +42,18 @@ def __init__(
4242
session: bigframes.session.Session,
4343
connection: str,
4444
max_batching_rows: int,
45+
container_cpu: Union[float, int],
46+
container_memory: str,
4547
):
4648
self._func = func_def.func
4749
self._requirements = func_def.requirements
4850
self._session = session
4951
self._connection = connection
50-
self._max_batching_rows = max_batching_rows
52+
self._max_batching_rows = (
53+
int(max_batching_rows) if max_batching_rows > 1 else max_batching_rows
54+
)
55+
self._container_cpu = container_cpu
56+
self._container_memory = container_memory
5157

5258
def _input_bq_signature(self):
5359
sig = inspect.signature(self._func)
@@ -72,7 +78,7 @@ def _create_udf(self):
7278
CREATE OR REPLACE FUNCTION `{udf_name}`({self._input_bq_signature()})
7379
RETURNS {self._output_bq_type()} LANGUAGE python
7480
WITH CONNECTION `{self._connection}`
75-
OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages}, max_batching_rows={self._max_batching_rows})
81+
OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages}, max_batching_rows={self._max_batching_rows}, container_cpu={self._container_cpu}, container_memory='{self._container_memory}')
7682
AS r\"\"\"
7783
7884

bigframes/operations/blob.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,9 @@ def image_blur(
278278
*,
279279
dst: Optional[Union[str, bigframes.series.Series]] = None,
280280
connection: Optional[str] = None,
281-
max_batching_rows: int = 10000,
281+
max_batching_rows: int = 8096,
282+
container_cpu: Union[float, int] = 0.33,
283+
container_memory: str = "512Mi",
282284
) -> bigframes.series.Series:
283285
"""Blurs images.
284286
@@ -289,7 +291,9 @@ def image_blur(
289291
ksize (tuple(int, int)): Kernel size.
290292
dst (str or bigframes.series.Series or None, default None): Destination GCS folder str or blob series. If None, output to BQ as bytes.
291293
connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
292-
max_batching_rows (int, default 10,000): Max number of rows per batch send to cloud run to execute the function.
294+
max_batching_rows (int, default 8,096): Max number of rows per batch send to cloud run to execute the function.
295+
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
296+
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
293297
294298
Returns:
295299
BigFrames Blob Series
@@ -305,6 +309,8 @@ def image_blur(
305309
session=self._block.session,
306310
connection=connection,
307311
max_batching_rows=max_batching_rows,
312+
container_cpu=container_cpu,
313+
container_memory=container_memory,
308314
).udf()
309315

310316
df["ksize_x"], df["ksize_y"] = ksize
@@ -326,6 +332,8 @@ def image_blur(
326332
session=self._block.session,
327333
connection=connection,
328334
max_batching_rows=max_batching_rows,
335+
container_cpu=container_cpu,
336+
container_memory=container_memory,
329337
).udf()
330338

331339
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
@@ -346,7 +354,9 @@ def image_resize(
346354
fy: float = 0.0,
347355
dst: Optional[Union[str, bigframes.series.Series]] = None,
348356
connection: Optional[str] = None,
349-
max_batching_rows: int = 10000,
357+
max_batching_rows: int = 8096,
358+
container_cpu: Union[float, int] = 0.33,
359+
container_memory: str = "512Mi",
350360
):
351361
"""Resize images.
352362
@@ -359,7 +369,9 @@ def image_resize(
359369
fy (float, defalut 0.0): scale factor along the vertical axis. If set to 0.0, dsize parameter determines the output size.
360370
dst (str or bigframes.series.Series or None, default None): Destination GCS folder str or blob series. If None, output to BQ as bytes.
361371
connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
362-
max_batching_rows (int, default 10,000): Max number of rows per batch send to cloud run to execute the function.
372+
max_batching_rows (int, default 8,096): Max number of rows per batch send to cloud run to execute the function.
373+
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
374+
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
363375
364376
Returns:
365377
BigFrames Blob Series
@@ -382,6 +394,8 @@ def image_resize(
382394
session=self._block.session,
383395
connection=connection,
384396
max_batching_rows=max_batching_rows,
397+
container_cpu=container_cpu,
398+
container_memory=container_memory,
385399
).udf()
386400

387401
df["dsize_x"], df["dsizye_y"] = dsize
@@ -404,6 +418,8 @@ def image_resize(
404418
session=self._block.session,
405419
connection=connection,
406420
max_batching_rows=max_batching_rows,
421+
container_cpu=container_cpu,
422+
container_memory=container_memory,
407423
).udf()
408424

409425
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
@@ -425,7 +441,9 @@ def image_normalize(
425441
norm_type: str = "l2",
426442
dst: Optional[Union[str, bigframes.series.Series]] = None,
427443
connection: Optional[str] = None,
428-
max_batching_rows: int = 10000,
444+
max_batching_rows: int = 8096,
445+
container_cpu: Union[float, int] = 0.33,
446+
container_memory: str = "512Mi",
429447
) -> bigframes.series.Series:
430448
"""Normalize images.
431449
@@ -438,7 +456,9 @@ def image_normalize(
438456
norm_type (str, default "l2"): Normalization type. Accepted values are "inf", "l1", "l2" and "minmax".
439457
dst (str or bigframes.series.Series or None, default None): Destination GCS folder str or blob series. If None, output to BQ as bytes.
440458
connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
441-
max_batching_rows (int, default 10,000): Max number of rows per batch send to cloud run to execute the function.
459+
max_batching_rows (int, default 8,096): Max number of rows per batch send to cloud run to execute the function.
460+
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
461+
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
442462
443463
Returns:
444464
BigFrames Blob Series
@@ -454,6 +474,8 @@ def image_normalize(
454474
session=self._block.session,
455475
connection=connection,
456476
max_batching_rows=max_batching_rows,
477+
container_cpu=container_cpu,
478+
container_memory=container_memory,
457479
).udf()
458480

459481
df["alpha"] = alpha
@@ -477,6 +499,8 @@ def image_normalize(
477499
session=self._block.session,
478500
connection=connection,
479501
max_batching_rows=max_batching_rows,
502+
container_cpu=container_cpu,
503+
container_memory=container_memory,
480504
).udf()
481505

482506
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
@@ -495,7 +519,9 @@ def pdf_extract(
495519
self,
496520
*,
497521
connection: Optional[str] = None,
498-
max_batching_rows: int = 10000,
522+
max_batching_rows: int = 8096,
523+
container_cpu: Union[float, int] = 0.33,
524+
container_memory: str = "512Mi",
499525
) -> bigframes.series.Series:
500526
"""Extracts and chunks text from PDF URLs and saves the text as
501527
arrays of string.
@@ -508,8 +534,10 @@ def pdf_extract(
508534
connection (str or None, default None): BQ connection used for
509535
function internet transactions, and the output blob if "dst"
510536
is str. If None, uses default connection of the session.
511-
max_batching_rows (int, default 10,000): Max number of rows per batch
537+
max_batching_rows (int, default 8,096): Max number of rows per batch
512538
send to cloud run to execute the function.
539+
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
540+
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
513541
514542
Returns:
515543
bigframes.series.Series: conatins all text from a pdf file
@@ -524,6 +552,8 @@ def pdf_extract(
524552
session=self._block.session,
525553
connection=connection,
526554
max_batching_rows=max_batching_rows,
555+
container_cpu=container_cpu,
556+
container_memory=container_memory,
527557
).udf()
528558

529559
src_rt = self._get_runtime_json_str(mode="R")
@@ -536,7 +566,9 @@ def pdf_chunk(
536566
connection: Optional[str] = None,
537567
chunk_size: int = 1000,
538568
overlap_size: int = 200,
539-
max_batching_rows: int = 10000,
569+
max_batching_rows: int = 8096,
570+
container_cpu: Union[float, int] = 0.33,
571+
container_memory: str = "512Mi",
540572
) -> bigframes.series.Series:
541573
"""Extracts and chunks text from PDF URLs and saves the text as
542574
arrays of strings.
@@ -554,8 +586,10 @@ def pdf_chunk(
554586
overlap_size (int, default 200): the number of overlapping characters
555587
between consective chunks. The helps to ensure context is
556588
perserved across chunk boundaries.
557-
max_batching_rows (int, default 10,000): Max number of rows per batch
589+
max_batching_rows (int, default 8,096): Max number of rows per batch
558590
send to cloud run to execute the function.
591+
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
592+
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
559593
560594
Returns:
561595
bigframe.series.Series of array[str], where each string is a
@@ -579,6 +613,8 @@ def pdf_chunk(
579613
session=self._block.session,
580614
connection=connection,
581615
max_batching_rows=max_batching_rows,
616+
container_cpu=container_cpu,
617+
container_memory=container_memory,
582618
).udf()
583619

584620
src_rt = self._get_runtime_json_str(mode="R")

0 commit comments

Comments
 (0)