Skip to content

Commit 86b8300

Browse files
committed
[BEAM-34076] Added TTL caching for BigQuery table definitions
1 parent c111c8b commit 86b8300

File tree

2 files changed

+60
-5
lines changed

2 files changed

+60
-5
lines changed

sdks/python/apache_beam/io/gcp/bigquery_tools.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import sys
3838
import time
3939
import uuid
40+
import threading
4041
from json.decoder import JSONDecodeError
4142
from typing import Optional
4243
from typing import Sequence
@@ -66,6 +67,8 @@
6667
from apache_beam.typehints.typehints import Any
6768
from apache_beam.utils import retry
6869
from apache_beam.utils.histogram import LinearBucket
70+
from cachetools import TTLCache, cachedmethod
71+
from cachetools.keys import hashkey
6972

7073
# Protect against environments where bigquery library is not available.
7174
try:
@@ -359,6 +362,9 @@ class BigQueryWrapper(object):
359362

360363
HISTOGRAM_METRIC_LOGGER = MetricLogger()
361364

365+
_TABLE_CACHE_MAX_SIZE = 1024
366+
_TABLE_CACHE_TTL_SECONDS = 300
367+
362368
def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
363369
self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions())
364370
self.gcp_bq_client = client or gcp_bigquery.Client(
@@ -393,6 +399,14 @@ def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
393399
self._temporary_table_suffix = uuid.uuid4().hex
394400
self.temp_dataset_id = temp_dataset_id or self._get_temp_dataset()
395401

402+
self._table_cache_ttl_secs = self._TABLE_CACHE_TTL_SECONDS
403+
self._table_cache_maxsize = self._TABLE_CACHE_MAX_SIZE
404+
self._table_cache = TTLCache(
405+
maxsize=self._table_cache_maxsize,
406+
ttl=self._table_cache_ttl_secs,
407+
)
408+
self._table_cache_lock = threading.RLock()
409+
396410
self.created_temp_dataset = False
397411

398412
@property
@@ -791,8 +805,20 @@ def _insert_all_rows(
791805
@retry.with_exponential_backoff(
792806
num_retries=MAX_RETRIES,
793807
retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter)
808+
def _get_table_uncached(self, project_id, dataset_id, table_id):
809+
"""Uncached tables.get call with retry/backoff."""
810+
request = bigquery.BigqueryTablesGetRequest(
811+
projectId=project_id, datasetId=dataset_id, tableId=table_id)
812+
return self.client.tables.Get(request)
813+
814+
@cachedmethod(
815+
cache=lambda self: self._table_cache,
816+
lock=lambda self: self._table_cache_lock,
817+
key=lambda self, project_id, dataset_id, table_id: hashkey(
818+
project_id, dataset_id, table_id),
819+
)
794820
def get_table(self, project_id, dataset_id, table_id):
795-
"""Lookup a table's metadata object.
821+
"""Lookup a table's metadata object (cached with TTL, thread-safe).
796822
797823
Args:
798824
client: bigquery.BigqueryV2 instance
@@ -805,10 +831,7 @@ def get_table(self, project_id, dataset_id, table_id):
805831
Raises:
806832
HttpError: if lookup failed.
807833
"""
808-
request = bigquery.BigqueryTablesGetRequest(
809-
projectId=project_id, datasetId=dataset_id, tableId=table_id)
810-
response = self.client.tables.Get(request)
811-
return response
834+
return self._get_table_uncached(project_id, dataset_id, table_id)
812835

813836
def _create_table(
814837
self,

sdks/python/apache_beam/io/gcp/bigquery_tools_test.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,38 @@ def test_temporary_dataset_is_unique(self, patched_time_sleep):
292292
wrapper.create_temporary_dataset('project-id', 'location')
293293
self.assertTrue(client.datasets.Get.called)
294294

295+
def test_get_table_invokes_tables_get_and_caches_result(self):
296+
297+
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
298+
299+
client = mock.Mock()
300+
client.tables = mock.Mock()
301+
302+
returned_table = mock.Mock(name="BigQueryTable")
303+
client.tables.Get = mock.Mock(return_value=returned_table)
304+
305+
wrapper = BigQueryWrapper(client=client)
306+
307+
project_id = "my-project"
308+
dataset_id = "my_dataset"
309+
table_id = "my_table"
310+
311+
table1 = wrapper.get_table(project_id, dataset_id, table_id)
312+
313+
assert table1 is returned_table
314+
assert client.tables.Get.call_count == 1
315+
316+
(request,), _ = client.tables.Get.call_args
317+
assert isinstance(request, bigquery.BigqueryTablesGetRequest)
318+
assert request.projectId == project_id
319+
assert request.datasetId == dataset_id
320+
assert request.tableId == table_id
321+
322+
table2 = wrapper.get_table(project_id, dataset_id, table_id)
323+
324+
assert table2 is returned_table
325+
assert client.tables.Get.call_count == 1 # still 1 => cached
326+
295327
def test_get_or_create_dataset_created(self):
296328
client = mock.Mock()
297329
client.datasets.Get.side_effect = HttpError(

0 commit comments

Comments
 (0)