Skip to content

Commit 796fc3e

Browse files
authored
chore: add experimental blob.image_blur function (#1256)
* chore: add experimental blob.image_blur function * apply to obj_ref * docs * fix mypy
1 parent bc5f946 commit 796fc3e

File tree

4 files changed

+210
-0
lines changed

4 files changed

+210
-0
lines changed

bigframes/blob/_functions.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from dataclasses import dataclass
16+
import inspect
17+
from typing import Callable, Iterable
18+
19+
import google.cloud.bigquery as bigquery
20+
21+
import bigframes
22+
import bigframes.session._io.bigquery as bf_io_bigquery
23+
24+
_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"}
25+
26+
27+
@dataclass(frozen=True)
28+
class FunctionDef:
29+
"""Definition of a Python UDF."""
30+
31+
func: Callable # function body
32+
requirements: Iterable[str] # required packages
33+
34+
35+
# TODO(garrettwu): migrate to bigframes UDF when it is available
36+
class TransformFunction:
37+
"""Simple transform function class to deal with Python UDF."""
38+
39+
def __init__(
40+
self, func_def: FunctionDef, session: bigframes.Session, connection: str
41+
):
42+
self._func = func_def.func
43+
self._requirements = func_def.requirements
44+
self._session = session
45+
self._connection = connection
46+
47+
def _input_bq_signature(self):
48+
sig = inspect.signature(self._func)
49+
inputs = []
50+
for k, v in sig.parameters.items():
51+
inputs.append(f"{k} {_PYTHON_TO_BQ_TYPES[v.annotation]}")
52+
return ", ".join(inputs)
53+
54+
def _output_bq_type(self):
55+
sig = inspect.signature(self._func)
56+
return _PYTHON_TO_BQ_TYPES[sig.return_annotation]
57+
58+
def _create_udf(self):
59+
"""Create Python UDF in BQ. Return name of the UDF."""
60+
udf_name = str(self._session._loader._storage_manager._random_table())
61+
62+
func_body = inspect.getsource(self._func)
63+
func_name = self._func.__name__
64+
packages = str(list(self._requirements))
65+
66+
sql = f"""
67+
CREATE OR REPLACE FUNCTION `{udf_name}`({self._input_bq_signature()})
68+
RETURNS {self._output_bq_type()} LANGUAGE python
69+
WITH CONNECTION `{self._connection}`
70+
OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages})
71+
AS r\"\"\"
72+
73+
74+
{func_body}
75+
76+
77+
\"\"\"
78+
"""
79+
80+
bf_io_bigquery.start_query_with_client(
81+
self._session.bqclient,
82+
sql,
83+
job_config=bigquery.QueryJobConfig(),
84+
metrics=self._session._metrics,
85+
)
86+
87+
return udf_name
88+
89+
def udf(self):
90+
"""Create and return the UDF object."""
91+
udf_name = self._create_udf()
92+
return self._session.read_gbq_function(udf_name)
93+
94+
95+
# Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string.
96+
def image_blur_func(
97+
src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int
98+
) -> str:
99+
import json
100+
101+
import cv2 as cv # type: ignore
102+
import numpy as np
103+
import requests
104+
105+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
106+
dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt)
107+
108+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
109+
dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"]
110+
111+
response = requests.get(src_url)
112+
bts = response.content
113+
114+
nparr = np.frombuffer(bts, np.uint8)
115+
img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED)
116+
img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y))
117+
bts = cv.imencode(".jpeg", img_blurred)[1].tobytes()
118+
119+
requests.put(
120+
url=dst_url,
121+
data=bts,
122+
headers={
123+
"Content-Type": "image/jpeg",
124+
},
125+
)
126+
127+
return dst_obj_ref_rt
128+
129+
130+
image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"])

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,11 @@ def json_extract_string_array_op_impl(
12101210
return json_extract_string_array(json_obj=x, json_path=op.json_path)
12111211

12121212

1213+
@scalar_op_compiler.register_unary_op(ops.ToJSONString)
1214+
def to_json_string_op_impl(json_obj: ibis_types.Value):
1215+
return to_json_string(json_obj=json_obj)
1216+
1217+
12131218
# Blob Ops
12141219
@scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op)
12151220
def obj_fetch_metadata_op_impl(obj_ref: ibis_types.Value):
@@ -1909,6 +1914,13 @@ def json_extract_string_array( # type: ignore[empty-body]
19091914
"""Extracts a JSON array and converts it to a SQL ARRAY of STRINGs."""
19101915

19111916

1917+
@ibis_udf.scalar.builtin(name="to_json_string")
1918+
def to_json_string( # type: ignore[empty-body]
1919+
json_obj: ibis_dtypes.JSON,
1920+
) -> ibis_dtypes.String:
1921+
"""Convert JSON to STRING."""
1922+
1923+
19121924
@ibis_udf.scalar.builtin(name="ML.DISTANCE")
19131925
def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: # type: ignore[empty-body]
19141926
"""Computes the distance between two vectors using specified type ("EUCLIDEAN", "MANHATTAN", or "COSINE")"""

bigframes/operations/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,23 @@ def output_type(self, *input_types):
740740
)
741741

742742

743+
@dataclasses.dataclass(frozen=True)
744+
class ToJSONString(UnaryOp):
745+
name: typing.ClassVar[str] = "to_json_string"
746+
747+
def output_type(self, *input_types):
748+
input_type = input_types[0]
749+
if not dtypes.is_json_like(input_type):
750+
raise TypeError(
751+
"Input type must be an valid JSON object or JSON-formatted string type."
752+
+ f" Received type: {input_type}"
753+
)
754+
return dtypes.STRING_DTYPE
755+
756+
757+
to_json_string_op = ToJSONString()
758+
759+
743760
## Blob Ops
744761
@dataclasses.dataclass(frozen=True)
745762
class ObjGetAccessUrl(UnaryOp):

bigframes/operations/blob.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414

1515
from __future__ import annotations
1616

17+
from typing import Optional
18+
1719
import IPython.display as ipy_display
1820
import requests
1921

22+
from bigframes import clients
2023
from bigframes.operations import base
2124
import bigframes.operations as ops
2225
import bigframes.series
@@ -66,3 +69,51 @@ def display(self, n: int = 3):
6669
read_url = str(read_url).strip('"')
6770
response = requests.get(read_url)
6871
ipy_display.display(ipy_display.Image(response.content))
72+
73+
def image_blur(
74+
self,
75+
ksize: tuple[int, int],
76+
*,
77+
dst: bigframes.series.Series,
78+
connection: Optional[str] = None,
79+
) -> bigframes.series.Series:
80+
"""Blurs images.
81+
82+
.. note::
83+
BigFrames Blob is still under experiments. It may not work and subject to change in the future.
84+
85+
Args:
86+
ksize (tuple(int, int)): Kernel size.
87+
dst (bigframes.series.Series): Destination blob series.
88+
connection (str or None, default None): BQ connection used for internet transactions. If None, uses default connection of the session.
89+
90+
Returns:
91+
JSON: Runtime info of the Blob.
92+
"""
93+
import bigframes.blob._functions as blob_func
94+
95+
connection = connection or self._block.session._bq_connection
96+
connection = clients.resolve_full_bq_connection_name(
97+
connection,
98+
default_project=self._block.session._project,
99+
default_location=self._block.session._location,
100+
)
101+
102+
image_blur_udf = blob_func.TransformFunction(
103+
blob_func.image_blur_def,
104+
session=self._block.session,
105+
connection=connection,
106+
).udf()
107+
108+
src_rt = bigframes.series.Series(self._block)._apply_unary_op(
109+
ops.ObjGetAccessUrl(mode="R")
110+
)
111+
dst_rt = dst._apply_unary_op(ops.ObjGetAccessUrl(mode="RW"))
112+
113+
src_rt = src_rt._apply_unary_op(ops.to_json_string_op)
114+
dst_rt = dst_rt._apply_unary_op(ops.to_json_string_op)
115+
116+
df = src_rt.to_frame().join(dst_rt.to_frame(), how="outer")
117+
df["ksize_x"], df["ksize_y"] = ksize
118+
119+
return df.apply(image_blur_udf, axis=1)

0 commit comments

Comments
 (0)