Skip to content

Commit ffe7dc6

Browse files
Shuowei Lishuoweil
andauthored
chore: add experimental blob.pdf_chunking function (#1370)
* pdf chunking code is done * pdf chunking is working, currently, we save all chunked results back to GCS. We will change it for the next round * pdf chunking, it takes a GCS link as input, and write chunked output into a bigquery table * refactor code * move the import blob place to fix bugs --------- Co-authored-by: Shuowei Li <[email protected]>
1 parent d202e5e commit ffe7dc6

File tree

2 files changed

+215
-11
lines changed

2 files changed

+215
-11
lines changed

bigframes/blob/_functions.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,77 @@ def image_blur_func(
128128

129129

130130
image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"])
131+
132+
133+
# Extracts all text from a PDF url
134+
def pdf_extract_func(src_obj_ref_rt: str) -> str:
135+
import io
136+
import json
137+
138+
from pypdf import PdfReader # type: ignore
139+
import requests
140+
141+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
142+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
143+
144+
response = requests.get(src_url, stream=True)
145+
response.raise_for_status()
146+
pdf_bytes = response.content
147+
148+
pdf_file = io.BytesIO(pdf_bytes)
149+
reader = PdfReader(pdf_file, strict=False)
150+
151+
all_text = ""
152+
for page in reader.pages:
153+
page_extract_text = page.extract_text()
154+
if page_extract_text:
155+
all_text += page_extract_text
156+
return all_text
157+
158+
159+
pdf_extract_def = FunctionDef(pdf_extract_func, ["pypdf", "requests"])
160+
161+
162+
# Extracts text from a PDF url and chunks it simultaneously
163+
def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str:
164+
import io
165+
import json
166+
167+
from pypdf import PdfReader # type: ignore
168+
import requests
169+
170+
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
171+
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
172+
173+
response = requests.get(src_url, stream=True)
174+
response.raise_for_status()
175+
pdf_bytes = response.content
176+
177+
pdf_file = io.BytesIO(pdf_bytes)
178+
reader = PdfReader(pdf_file, strict=False)
179+
180+
# extract and chunk text simultaneously
181+
all_text_chunks = []
182+
curr_chunk = ""
183+
for page in reader.pages:
184+
page_text = page.extract_text()
185+
if page_text:
186+
curr_chunk += page_text
187+
# split the accumulated text into chunks of a specific size with overlaop
188+
# this loop implements a sliding window approach to create chunks
189+
while len(curr_chunk) >= chunk_size:
190+
split_idx = curr_chunk.rfind(" ", 0, chunk_size)
191+
if split_idx == -1:
192+
split_idx = chunk_size
193+
actual_chunk = curr_chunk[:split_idx]
194+
all_text_chunks.append(actual_chunk)
195+
overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size]
196+
curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :]
197+
if curr_chunk:
198+
all_text_chunks.append(curr_chunk)
199+
200+
all_text_json_string = json.dumps(all_text_chunks)
201+
return all_text_json_string
202+
203+
204+
pdf_chunk_def = FunctionDef(pdf_chunk_func, ["pypdf", "requests"])

bigframes/operations/blob.py

Lines changed: 141 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,54 @@ def display_single_url(read_url: str, content_type: str):
224224
for _, row in df.iterrows():
225225
display_single_url(row["read_url"], row["content_type"])
226226

227+
def _resolve_connection(self, connection: Optional[str] = None) -> str:
228+
"""Resovle the BigQuery connection.
229+
230+
.. note::
231+
BigFrames Blob is still under experiments. It may not work and
232+
subject to change in the future.
233+
234+
Args:
235+
connection (str or None, default None): BQ connection used for
236+
function internet transactions, and the output blob if "dst" is
237+
str. If None, uses default connection of the session.
238+
239+
Returns:
240+
str: the resolved BigQuery connection string in the format:
241+
"project.location.connection_id".
242+
243+
Raises:
244+
ValueError: If the connection cannot be resolved to a valid string.
245+
"""
246+
connection = connection or self._block.session._bq_connection
247+
return clients.resolve_full_bq_connection_name(
248+
connection,
249+
default_project=self._block.session._project,
250+
default_location=self._block.session._location,
251+
)
252+
253+
def _get_runtime_json_str(
254+
self, mode: str = "R", with_metadata: bool = False
255+
) -> bigframes.series.Series:
256+
"""Get the runtime and apply the ToJSONSTring transformation.
257+
258+
.. note::
259+
BigFrames Blob is still under experiments. It may not work and
260+
subject to change in the future.
261+
262+
Args:
263+
mode(str or str, default "R"): the mode for accessing the runtime.
264+
Default to "R". Possible values are "R" (read-only) and
265+
"RW" (read-write)
266+
with_metadata (bool, default False): whether to include metadata
267+
in the JOSN string. Default to False.
268+
269+
Returns:
270+
str: the runtime object in the JSON string.
271+
"""
272+
runtime = self._get_runtime(mode=mode, with_metadata=with_metadata)
273+
return runtime._apply_unary_op(ops.ToJSONString())
274+
227275
def image_blur(
228276
self,
229277
ksize: tuple[int, int],
@@ -246,12 +294,7 @@ def image_blur(
246294
"""
247295
import bigframes.blob._functions as blob_func
248296

249-
connection = connection or self._block.session._bq_connection
250-
connection = clients.resolve_full_bq_connection_name(
251-
connection,
252-
default_project=self._block.session._project,
253-
default_location=self._block.session._location,
254-
)
297+
connection = self._resolve_connection(connection)
255298

256299
if isinstance(dst, str):
257300
dst = os.path.join(dst, "")
@@ -268,11 +311,8 @@ def image_blur(
268311
connection=connection,
269312
).udf()
270313

271-
src_rt = self._get_runtime(mode="R")
272-
dst_rt = dst.blob._get_runtime(mode="RW")
273-
274-
src_rt = src_rt._apply_unary_op(ops.ToJSONString())
275-
dst_rt = dst_rt._apply_unary_op(ops.ToJSONString())
314+
src_rt = self._get_runtime_json_str(mode="R")
315+
dst_rt = dst.blob._get_runtime_json_str(mode="RW")
276316

277317
df = src_rt.to_frame().join(dst_rt.to_frame(), how="outer")
278318
df["ksize_x"], df["ksize_y"] = ksize
@@ -281,3 +321,93 @@ def image_blur(
281321
res.cache() # to execute the udf
282322

283323
return dst
324+
325+
def pdf_extract(
326+
self, *, connection: Optional[str] = None
327+
) -> bigframes.series.Series:
328+
"""Extracts and chunks text from PDF URLs and saves the text as
329+
arrays of string.
330+
331+
.. note::
332+
BigFrames Blob is still under experiments. It may not work and
333+
subject to change in the future.
334+
335+
Args:
336+
connection (str or None, default None): BQ connection used for
337+
function internet transactions, and the output blob if "dst"
338+
is str. If None, uses default connection of the session.
339+
340+
Returns:
341+
bigframes.series.Series: conatins all text from a pdf file
342+
"""
343+
344+
import bigframes.blob._functions as blob_func
345+
346+
connection = self._resolve_connection(connection)
347+
348+
pdf_chunk_udf = blob_func.TransformFunction(
349+
blob_func.pdf_extract_def,
350+
session=self._block.session,
351+
connection=connection,
352+
).udf()
353+
354+
src_rt = self._get_runtime_json_str(mode="R")
355+
res = src_rt.apply(pdf_chunk_udf)
356+
return res
357+
358+
def pdf_chunk(
359+
self,
360+
*,
361+
connection: Optional[str] = None,
362+
chunk_size: int = 1000,
363+
overlap_size: int = 200,
364+
) -> bigframes.series.Series:
365+
"""Extracts and chunks text from PDF URLs and saves the text as
366+
arrays of strings.
367+
368+
.. note::
369+
BigFrames Blob is still under experiments. It may not work and
370+
subject to change in the future.
371+
372+
Args:
373+
connection (str or None, default None): BQ connection used for
374+
function internet transactions, and the output blob if "dst"
375+
is str. If None, uses default connection of the session.
376+
chunk_size (int, default 1000): the desired size of each text chunk
377+
(number of characters).
378+
overlap_size (int, default 200): the number of overlapping characters
379+
between consective chunks. The helps to ensure context is
380+
perserved across chunk boundaries.
381+
382+
Returns:
383+
bigframe.series.Series of array[str], where each string is a
384+
chunk of text extracted from PDF.
385+
"""
386+
387+
import bigframes.bigquery as bbq
388+
import bigframes.blob._functions as blob_func
389+
390+
connection = self._resolve_connection(connection)
391+
392+
if chunk_size <= 0:
393+
raise ValueError("chunk_size must be a positive integer.")
394+
if overlap_size < 0:
395+
raise ValueError("overlap_size must be a non-negative integer.")
396+
if overlap_size >= chunk_size:
397+
raise ValueError("overlap_size must be smaller than chunk_size.")
398+
399+
pdf_chunk_udf = blob_func.TransformFunction(
400+
blob_func.pdf_chunk_def,
401+
session=self._block.session,
402+
connection=connection,
403+
).udf()
404+
405+
src_rt = self._get_runtime_json_str(mode="R")
406+
df = src_rt.to_frame()
407+
df["chunk_size"] = chunk_size
408+
df["overlap_size"] = overlap_size
409+
410+
res = df.apply(pdf_chunk_udf, axis=1)
411+
412+
res_array = bbq.json_extract_string_array(res)
413+
return res_array

0 commit comments

Comments
 (0)