diff --git a/dbt/adapters/bigquery/dataset.py b/dbt/adapters/bigquery/dataset.py index 4ecd6daa5..dc0b3e27c 100644 --- a/dbt/adapters/bigquery/dataset.py +++ b/dbt/adapters/bigquery/dataset.py @@ -43,3 +43,25 @@ def add_access_entry_to_dataset(dataset: Dataset, access_entry: AccessEntry) -> access_entries.append(access_entry) dataset.access_entries = access_entries return dataset + + + +def delete_access_entry_from_dataset(dataset: Dataset, access_entry: AccessEntry) -> Dataset: + """Remove an access entry from a dataset, always use. + + Args: + dataset (Dataset): the dataset to be updated + access_entry (AccessEntry): the access entry to be removed from the dataset + """ + access_entries = dataset.access_entries + access_entries_id = [entity.entity_id for entity in access_entries] + + full_dataset_id = f"{dataset.project}.{dataset.dataset_id}" + if access_entry.entity_id in access_entries_id: + dataset.access_entries = [ + entry for entry in access_entries if entry.entity_id != access_entry.entity_id + ] + else: + print(f"no need to revoke the dataset access for '{access_entry.entity_id}' to ' dataset '{full_dataset_id}.' it doesn't exist") + return dataset + diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index dc5cf6e17..a06485bcd 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -55,7 +55,7 @@ from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager from dbt.adapters.bigquery.column import get_nested_column_data_types from dbt.adapters.bigquery.connections import BigQueryAdapterResponse -from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset +from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset, delete_access_entry_from_dataset from dbt.adapters.bigquery.python_submissions import ( ClusterDataprocHelper, ServerlessDataProcHelper, @@ -843,10 +843,46 @@ def describe_relation( return parser.from_bq_table(bq_table) return None + + + @available.parse_none + def grant_access_to(self, entity, entity_type, role, grant_target_dict,full_refresh=False): + """ + Given an entity, grants access to a dataset. + """ + conn: BigQueryConnectionManager = self.connections.get_thread_connection() + client = conn.handle + GrantTarget.validate(grant_target_dict) + grant_target = GrantTarget.from_dict(grant_target_dict) + if entity_type == "view": + entity = self.get_table_ref_from_relation(entity).to_api_repr() + with _dataset_lock: + dataset_ref = self.connections.dataset_ref(grant_target.project, grant_target.dataset) + dataset = client.get_dataset(dataset_ref) + access_entry = AccessEntry(role, entity_type, entity) + # only perform update if access entry in dataset but if full_refresh remove it first + if is_access_entry_in_dataset(dataset, access_entry): + if not full_refresh: + logger.warning(f"Access entry {access_entry} " f"already exists in dataset") + return + else: + dataset = delete_access_entry_from_dataset(dataset,access_entry) + dataset = client.update_dataset( + dataset, + ["access_entries"], + ) # Make an API request. + full_dataset_id = f"{dataset.project}.{dataset.dataset_id}" + logger.info(f"Revoked dataset access for '{access_entry.entity_id}' to ' dataset '{full_dataset_id}.'") + dataset = add_access_entry_to_dataset(dataset, access_entry) + dataset = client.update_dataset(dataset, ["access_entries"]) + full_dataset_id = f"{dataset.project}.{dataset.dataset_id}" + logger.info(f"allowed dataset access for '{access_entry.entity_id}' to ' dataset '{full_dataset_id}.'") + + @available.parse_none - def grant_access_to(self, entity, entity_type, role, grant_target_dict): + def remove_grant_access_to(self, entity, entity_type, role, grant_target_dict): """ - Given an entity, grants it access to a dataset. + Given an entity, removes grants associated with a dataset. """ conn: BigQueryConnectionManager = self.connections.get_thread_connection() client = conn.handle @@ -858,12 +894,18 @@ def grant_access_to(self, entity, entity_type, role, grant_target_dict): dataset_ref = self.connections.dataset_ref(grant_target.project, grant_target.dataset) dataset = client.get_dataset(dataset_ref) access_entry = AccessEntry(role, entity_type, entity) - # only perform update if access entry not in dataset + # only perform removing if access entry in dataset + full_dataset_id = f"{dataset.project}.{dataset.dataset_id}" if is_access_entry_in_dataset(dataset, access_entry): - logger.warning(f"Access entry {access_entry} " f"already exists in dataset") + dataset = delete_access_entry_from_dataset(dataset,access_entry) + dataset = client.update_dataset( + dataset, + ["access_entries"], + ) # Make an API request. + + logger.info(f"Revoked dataset access for '{access_entry.entity_id}' to ' dataset '{full_dataset_id}.'") else: - dataset = add_access_entry_to_dataset(dataset, access_entry) - client.update_dataset(dataset, ["access_entries"]) + logger.warning(f"Access entry {access_entry} not in the dataset {full_dataset_id} no need to remove it") @available.parse_none def get_dataset_location(self, relation): diff --git a/dbt/include/bigquery/macros/materializations/view.sql b/dbt/include/bigquery/macros/materializations/view.sql index fd05129f9..f783726e4 100644 --- a/dbt/include/bigquery/macros/materializations/view.sql +++ b/dbt/include/bigquery/macros/materializations/view.sql @@ -20,7 +20,7 @@ {% if config.get('grant_access_to') %} {% for grant_target_dict in config.get('grant_access_to') %} - {% do adapter.grant_access_to(this, 'view', None, grant_target_dict) %} + {% do adapter.grant_access_to(this, 'view', None, grant_target_dict, should_full_refresh()) %} {% endfor %} {% endif %} diff --git a/tests/functional/adapter/test_grant_access_to.py b/tests/functional/adapter/test_grant_access_to.py index 633cebe92..cb7f86670 100644 --- a/tests/functional/adapter/test_grant_access_to.py +++ b/tests/functional/adapter/test_grant_access_to.py @@ -3,7 +3,7 @@ import pytest from dbt.tests.util import run_dbt - +import re def select_1(dataset: str, materialized: str): config = f"""config( @@ -86,6 +86,21 @@ def test_grant_access_succeeds(self, project, setup_grant_schema, teardown_grant assert len(results) == 2 + +class TestAccessGrantSucceedsWithFullRefresh(TestAccessGrantSucceeds): + def test_grant_access_succeeds(self, project, setup_grant_schema, teardown_grant_schema,capsys): + # Need to run twice to validate idempotency + results = run_dbt(["run"]) + assert len(results) == 2 + time.sleep(10) + results = run_dbt(["run","--full-refresh"]) + assert len(results) == 2 + captured = capsys.readouterr() + assert not re.search(r"BigQuery adapter: Access entry