|
37 | 37 | import sys |
38 | 38 | import time |
39 | 39 | import uuid |
| 40 | +import threading |
40 | 41 | from json.decoder import JSONDecodeError |
41 | 42 | from typing import Optional |
42 | 43 | from typing import Sequence |
|
66 | 67 | from apache_beam.typehints.typehints import Any |
67 | 68 | from apache_beam.utils import retry |
68 | 69 | from apache_beam.utils.histogram import LinearBucket |
| 70 | +from cachetools import TTLCache, cachedmethod, Cache |
| 71 | +from cachetools.keys import hashkey |
69 | 72 |
|
70 | 73 | # Protect against environments where bigquery library is not available. |
71 | 74 | try: |
@@ -139,6 +142,12 @@ class ExportCompression(object): |
139 | 142 | SNAPPY = 'SNAPPY' |
140 | 143 | NONE = 'NONE' |
141 | 144 |
|
| 145 | +class _NonNoneTTLCache(TTLCache): |
| 146 | + """TTLCache that does not store None values.""" |
| 147 | + def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): |
| 148 | + if value is not None: |
| 149 | + super().__setitem__(key=key, value=value) |
| 150 | + |
142 | 151 |
|
143 | 152 | def default_encoder(obj): |
144 | 153 | if isinstance(obj, decimal.Decimal): |
@@ -359,6 +368,9 @@ class BigQueryWrapper(object): |
359 | 368 |
|
360 | 369 | HISTOGRAM_METRIC_LOGGER = MetricLogger() |
361 | 370 |
|
| 371 | + _TABLE_CACHE = _NonNoneTTLCache(maxsize=1024, ttl=300) |
| 372 | + _TABLE_CACHE_LOCK = threading.RLock() |
| 373 | + |
362 | 374 | def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): |
363 | 375 | self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) |
364 | 376 | self.gcp_bq_client = client or gcp_bigquery.Client( |
@@ -393,6 +405,14 @@ def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): |
393 | 405 | self._temporary_table_suffix = uuid.uuid4().hex |
394 | 406 | self.temp_dataset_id = temp_dataset_id or self._get_temp_dataset() |
395 | 407 |
|
| 408 | + self._table_cache_ttl_secs = self._TABLE_CACHE_TTL_SECONDS |
| 409 | + self._table_cache_maxsize = self._TABLE_CACHE_MAX_SIZE |
| 410 | + self._table_cache = TTLCache( |
| 411 | + maxsize=self._table_cache_maxsize, |
| 412 | + ttl=self._table_cache_ttl_secs, |
| 413 | + ) |
| 414 | + self._table_cache_lock = threading.RLock() |
| 415 | + |
396 | 416 | self.created_temp_dataset = False |
397 | 417 |
|
398 | 418 | @property |
@@ -788,27 +808,20 @@ def _insert_all_rows( |
788 | 808 | int(time.time() * 1000) - started_millis) |
789 | 809 | return not errors, errors |
790 | 810 |
|
| 811 | + @cachedmethod( |
| 812 | + cache=lambda self: self._TABLE_CACHE, |
| 813 | + lock=lambda self: self._TABLE_CACHE_LOCK, |
| 814 | + key=lambda self, project_id, dataset_id, table_id: hashkey( |
| 815 | + project_id, dataset_id, table_id), |
| 816 | + ) |
791 | 817 | @retry.with_exponential_backoff( |
792 | | - num_retries=MAX_RETRIES, |
793 | | - retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter) |
| 818 | + num_retries=MAX_RETRIES, |
| 819 | + retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter) |
794 | 820 | def get_table(self, project_id, dataset_id, table_id): |
795 | | - """Lookup a table's metadata object. |
796 | | -
|
797 | | - Args: |
798 | | - client: bigquery.BigqueryV2 instance |
799 | | - project_id: table lookup parameter |
800 | | - dataset_id: table lookup parameter |
801 | | - table_id: table lookup parameter |
802 | | -
|
803 | | - Returns: |
804 | | - bigquery.Table instance |
805 | | - Raises: |
806 | | - HttpError: if lookup failed. |
807 | | - """ |
| 821 | + """Lookup a table's metadata object (shared cached, TTL, thread-safe).""" |
808 | 822 | request = bigquery.BigqueryTablesGetRequest( |
809 | | - projectId=project_id, datasetId=dataset_id, tableId=table_id) |
810 | | - response = self.client.tables.Get(request) |
811 | | - return response |
| 823 | + projectId=project_id, datasetId=dataset_id, tableId=table_id) |
| 824 | + return self.client.tables.Get(request) |
812 | 825 |
|
813 | 826 | def _create_table( |
814 | 827 | self, |
|
0 commit comments