Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions providers/google/docs/operators/cloud/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,34 @@ Also you can use deferrable mode in this operator if you would like to free up t
:start-after: [START howto_sensor_bigquery_table_partition_async]
:end-before: [END howto_sensor_bigquery_table_partition_async]

Check that the BigQuery Table Streaming Buffer is empty
""""""""""""""""""""""""""""""""""""""""""""""""""""""""

To check that the BigQuery streaming buffer of a table is empty you can use
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryStreamingBufferEmptySensor`.
This sensor is useful in ETL pipelines to ensure that recent streamed data has been fully
processed before continuing downstream tasks.

.. exampleinclude:: /../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_streaming_buffer_empty]
:end-before: [END howto_sensor_bigquery_streaming_buffer_empty]

Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.

.. exampleinclude:: /../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_streaming_buffer_empty_defered]
:end-before: [END howto_sensor_bigquery_streaming_buffer_empty_defered]

.. exampleinclude:: /../../google/tests/system/google/cloud/bigquery/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_streaming_buffer_empty_async]
:end-before: [END howto_sensor_bigquery_streaming_buffer_empty_async]

Reference
^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.providers.common.compat.sdk import AirflowException, BaseSensorOperator, conf
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.triggers.bigquery import (
BigQueryStreamingBufferEmptyTrigger,
BigQueryTableExistenceTrigger,
BigQueryTablePartitionExistenceTrigger,
)
Expand Down Expand Up @@ -256,3 +257,133 @@ def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None

message = "No event received in trigger callback"
raise AirflowException(message)


class BigQueryStreamingBufferEmptySensor(BaseSensorOperator):
"""
Sensor for checking whether the streaming buffer in a BigQuery table is empty.

The BigQueryStreamingBufferEmptySensor waits for the streaming buffer in a specified
BigQuery table to be empty before proceeding. It can be used in ETL pipelines to ensure
that recent streamed data has been processed before continuing downstream tasks.

:param project_id: The Google Cloud project ID where the BigQuery table resides.
:param dataset_id: The ID of the dataset containing the BigQuery table.
:param table_id: The ID of the BigQuery table to monitor.
:param gcp_conn_id: The Airflow connection ID for GCP. Defaults to "google_cloud_default".
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param deferrable: Run sensor in deferrable mode. Defaults to False.
"""

template_fields: Sequence[str] = (
"project_id",
"dataset_id",
"table_id",
"impersonation_chain",
)

ui_color = "#f0eee4"

def __init__(
self,
*,
project_id: str,
dataset_id: str,
table_id: str,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
) -> None:
if deferrable and "poke_interval" not in kwargs:
# TODO: Remove once deprecated
if "polling_interval" in kwargs:
kwargs["poke_interval"] = kwargs["polling_interval"]
warnings.warn(
"Argument `polling_interval` is deprecated and will be removed "
"in a future release. Please use `poke_interval` instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
else:
kwargs["poke_interval"] = 30

super().__init__(**kwargs)

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.deferrable = deferrable

def execute(self, context: Context) -> None:
"""Airflow runs this method on the worker and defers using the trigger if deferrable."""
if not self.deferrable:
super().execute(context)
else:
if not self.poke(context=context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=BigQueryStreamingBufferEmptyTrigger(
project_id=self.project_id,
dataset_id=self.dataset_id,
table_id=self.table_id,
poll_interval=self.poke_interval,
gcp_conn_id=self.gcp_conn_id,
hook_params={
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
)

def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
"""
Act as a callback for when the trigger fires - returns immediately.

Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
self.log.info("Checking streaming buffer state for table: %s", table_uri)
if event:
if event["status"] == "success":
return event["message"]
raise AirflowException(event["message"])

message = "No event received in trigger callback"
raise AirflowException(message)

def poke(self, context: Context) -> bool:
"""
Check if the BigQuery streaming buffer is empty for the specified table.

This method periodically checks the status of the BigQuery table's streaming buffer
to determine if it is empty. It is useful for ensuring that recent streamed data
has been fully processed before continuing with downstream tasks.
"""
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
self.log.info("Checking streaming buffer state for table: %s", table_uri)

hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
try:
client = hook.get_client(project_id=self.project_id)
table_ref = f"{self.project_id}.{self.dataset_id}.{self.table_id}"
table = client.get_table(table_ref)
return table.streaming_buffer is None
except Exception as err:
if "not found" in str(err):
raise AirflowException(
f"Table {self.project_id}.{self.dataset_id}.{self.table_id} not found"
) from err
raise err
Original file line number Diff line number Diff line change
Expand Up @@ -806,3 +806,158 @@ async def _partition_exists(self, hook: BigQueryAsyncHook, job_id: str | None, p
if records:
records = [row[0] for row in records]
return self.partition_id in records


class BigQueryStreamingBufferEmptyTrigger(BaseTrigger):
"""
Trigger that periodically checks if a BigQuery table's streaming buffer is empty.

This trigger continuously polls a BigQuery table to determine if its streaming buffer
has been fully processed and is now empty. It's particularly useful before running
DML operations (UPDATE, DELETE, MERGE) on tables populated via streaming inserts.

:param project_id: The Google Cloud Project in which to look for the table.
:param dataset_id: The dataset ID of the table to check.
:param table_id: The table ID of the table to check.
:param gcp_conn_id: Reference to Google Cloud connection ID.
:param hook_params: Additional parameters for hook initialization.
:param poll_interval: Polling period in seconds to check the streaming buffer status.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.
"""

def __init__(
self,
project_id: str,
dataset_id: str,
table_id: str,
gcp_conn_id: str,
hook_params: dict[str, Any],
poll_interval: float = 30.0,
impersonation_chain: str | Sequence[str] | None = None,
):
super().__init__()
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.gcp_conn_id = gcp_conn_id
self.poll_interval = poll_interval
self.hook_params = hook_params
self.impersonation_chain = impersonation_chain

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize BigQueryStreamingBufferEmptyTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryStreamingBufferEmptyTrigger",
{
"project_id": self.project_id,
"dataset_id": self.dataset_id,
"table_id": self.table_id,
"gcp_conn_id": self.gcp_conn_id,
"poll_interval": self.poll_interval,
"hook_params": self.hook_params,
"impersonation_chain": self.impersonation_chain,
},
)

def _get_async_hook(self) -> BigQueryTableAsyncHook:
"""Get the async hook for BigQuery table operations."""
return BigQueryTableAsyncHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)

async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Continuously check if the streaming buffer is empty.

Yields a TriggerEvent when the streaming buffer becomes empty or if an error occurs.
"""
try:
hook = self._get_async_hook()
while True:
self.log.info(
"Checking streaming buffer for table %s.%s.%s",
self.project_id,
self.dataset_id,
self.table_id,
)

is_buffer_empty = await self._is_streaming_buffer_empty(
hook=hook,
project_id=self.project_id,
dataset_id=self.dataset_id,
table_id=self.table_id,
)

if is_buffer_empty:
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
message = f"Streaming buffer is empty for table: {table_uri}"
self.log.info(message)
yield TriggerEvent(
{
"status": "success",
"message": message,
}
)
return
else:
self.log.info(
"Streaming buffer still has data. Sleeping for %s seconds.",
self.poll_interval,
)
await asyncio.sleep(self.poll_interval)

except Exception as e:
self.log.exception(
"Exception occurred while checking streaming buffer for table %s.%s.%s",
self.project_id,
self.dataset_id,
self.table_id,
)
yield TriggerEvent({"status": "error", "message": str(e)})

async def _is_streaming_buffer_empty(
self,
hook: BigQueryTableAsyncHook,
project_id: str,
dataset_id: str,
table_id: str,
) -> bool:
"""
Check if the streaming buffer is empty for the specified table.

:param hook: BigQueryTableAsyncHook instance for async operations.
:param project_id: The Google Cloud Project ID.
:param dataset_id: The dataset ID containing the table.
:param table_id: The table ID to check.
:return: True if streaming buffer is empty or doesn't exist, False otherwise.
"""
async with ClientSession() as session:
try:
client = await hook.get_table_client(
dataset=dataset_id,
table_id=table_id,
project_id=project_id,
session=session,
)
response = await client.get()

if not response:
# Table doesn't exist
raise AirflowException(f"Table {project_id}.{dataset_id}.{table_id} does not exist")

# Check if streamingBuffer exists in the response
streaming_buffer = response.get("streamingBuffer")
return streaming_buffer is None

except ClientResponseError as err:
if err.status == 404:
raise AirflowException(f"Table {project_id}.{dataset_id}.{table_id} not found") from err
raise err
Loading