Skip to content

Commit bfcc08f

Browse files
authored
chore: Add cleanup step for old UDFs in anonymous dataset (#2171)
* chore: Add cleanup step for old UDFs in anonymous dataset * fix * fix * fix with some test code * fix * no retry * fix retry * disable doctest * testing - increase timeout * revert timout * add warning
1 parent 10ec52f commit bfcc08f

File tree

4 files changed

+79
-2
lines changed

4 files changed

+79
-2
lines changed

bigframes/exceptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class UnknownLocationWarning(Warning):
3030

3131

3232
class CleanupFailedWarning(Warning):
33-
"""Bigframes failed to clean up a table resource."""
33+
"""Bigframes failed to clean up a table or function resource."""
3434

3535

3636
class DefaultIndexWarning(Warning):

bigframes/session/anonymous_dataset.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,21 @@
1616
import threading
1717
from typing import List, Optional, Sequence
1818
import uuid
19+
import warnings
1920

21+
from google.api_core import retry as api_core_retry
2022
import google.cloud.bigquery as bigquery
2123

2224
from bigframes import constants
2325
import bigframes.core.events
26+
import bigframes.exceptions as bfe
2427
from bigframes.session import temporary_storage
2528
import bigframes.session._io.bigquery as bf_io_bigquery
2629

2730
_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"
31+
# UDFs older than this many days are considered stale and will be deleted
32+
# from the anonymous dataset before creating a new UDF.
33+
_UDF_CLEANUP_THRESHOLD_DAYS = 3
2834

2935

3036
class AnonymousDatasetManager(temporary_storage.TemporaryStorageManager):
@@ -137,8 +143,46 @@ def generate_unique_resource_id(self) -> bigquery.TableReference:
137143
)
138144
return self.dataset.table(table_id)
139145

146+
def _cleanup_old_udfs(self):
147+
"""Clean up old UDFs in the anonymous dataset."""
148+
dataset = self.dataset
149+
routines = list(self.bqclient.list_routines(dataset))
150+
cleanup_cutoff_time = datetime.datetime.now(
151+
datetime.timezone.utc
152+
) - datetime.timedelta(days=_UDF_CLEANUP_THRESHOLD_DAYS)
153+
154+
for routine in routines:
155+
if (
156+
routine.created < cleanup_cutoff_time
157+
and routine._properties["routineType"] == "SCALAR_FUNCTION"
158+
):
159+
try:
160+
self.bqclient.delete_routine(
161+
routine.reference,
162+
not_found_ok=True,
163+
retry=api_core_retry.Retry(timeout=0),
164+
)
165+
except Exception as e:
166+
msg = bfe.format_message(
167+
f"Unable to clean this old UDF '{routine.reference}': {e}"
168+
)
169+
warnings.warn(msg, category=bfe.CleanupFailedWarning)
170+
140171
def close(self):
141172
"""Delete tables that were created with this session's session_id."""
142173
for table_ref in self._table_ids:
143174
self.bqclient.delete_table(table_ref, not_found_ok=True)
144175
self._table_ids.clear()
176+
177+
try:
178+
# Before closing the session, attempt to clean up any uncollected,
179+
# old Python UDFs residing in the anonymous dataset. These UDFs
180+
# accumulate over time and can eventually exceed resource limits.
181+
# See more from b/450913424.
182+
self._cleanup_old_udfs()
183+
except Exception as e:
184+
# Log a warning on the failure, do not interrupt the workflow.
185+
msg = bfe.format_message(
186+
f"Failed to clean up the old Python UDFs before closing the session: {e}"
187+
)
188+
warnings.warn(msg, category=bfe.CleanupFailedWarning)

tests/system/large/test_session.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import datetime
16+
from unittest import mock
1617

1718
import google.cloud.bigquery as bigquery
1819
import google.cloud.exceptions
@@ -138,3 +139,35 @@ def test_clean_up_via_context_manager(session_creator):
138139
bqclient.delete_table(full_id_1)
139140
with pytest.raises(google.cloud.exceptions.NotFound):
140141
bqclient.delete_table(full_id_2)
142+
143+
144+
def test_cleanup_old_udfs(session: bigframes.Session):
145+
routine_ref = session._anon_dataset_manager.dataset.routine("test_routine_cleanup")
146+
147+
# Create a dummy function to be deleted.
148+
create_function_sql = f"""
149+
CREATE OR REPLACE FUNCTION `{routine_ref.project}.{routine_ref.dataset_id}.{routine_ref.routine_id}`(x INT64)
150+
RETURNS INT64 LANGUAGE python
151+
OPTIONS (entry_point='dummy_func', runtime_version='python-3.11')
152+
AS r'''
153+
def dummy_func(x):
154+
return x + 1
155+
'''
156+
"""
157+
session.bqclient.query(create_function_sql).result()
158+
159+
assert session.bqclient.get_routine(routine_ref) is not None
160+
161+
mock_routine = mock.MagicMock(spec=bigquery.Routine)
162+
mock_routine.created = datetime.datetime.now(
163+
datetime.timezone.utc
164+
) - datetime.timedelta(days=100)
165+
mock_routine.reference = routine_ref
166+
mock_routine._properties = {"routineType": "SCALAR_FUNCTION"}
167+
routines = [mock_routine]
168+
169+
with mock.patch.object(session.bqclient, "list_routines", return_value=routines):
170+
session._anon_dataset_manager._cleanup_old_udfs()
171+
172+
with pytest.raises(google.cloud.exceptions.NotFound):
173+
session.bqclient.get_routine(routine_ref)

third_party/bigframes_vendored/pandas/core/frame.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ def to_gbq(
420420
>>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
421421
>>> destination = df.to_gbq(ordering_id="ordering_id")
422422
>>> # The table created can be read outside of the current session.
423-
>>> bpd.close_session() # Optional, to demonstrate a new session.
423+
>>> bpd.close_session() # Optional, to demonstrate a new session. # doctest: +SKIP
424424
>>> bpd.read_gbq(destination, index_col="ordering_id")
425425
col1 col2
426426
ordering_id

0 commit comments

Comments
 (0)