Skip to content

Commit 0a4db79

Browse files
committed
Add support for applying BigQuery hidden policy tags to PII fields in the airbyte table
1 parent 6a591ce commit 0a4db79

File tree

14 files changed

+391
-36
lines changed

14 files changed

+391
-36
lines changed

config/locales/en.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ en:
2828
description: |
2929
The name of the BigQuery dataset we're writing to.
3030
default: ENV['BIGQUERY_DATASET']
31+
bigquery_airbyte_dataset:
32+
description: |
33+
The name of the BigQuery airbyte dataset we're writing to.
34+
default: ENV['BIGQUERY_AIRBYTE_DATASET']
35+
bigquery_hidden_policy_tag:
36+
description: The name of the BigQuery hidden policy tag applied to PII in raw airbyte tables.
37+
default: ENV['BIGQUERY_HIDDEN_POLICY_TAG']
3138
bigquery_retries:
3239
description: |
3340
Passed directly to the retries: option on the BigQuery client

lib/dfe/analytics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
require 'dfe/analytics/azure_federated_auth'
3030
require 'dfe/analytics/api_requests'
3131
require 'dfe/analytics/airbyte_stream_config'
32+
require 'dfe/analytics/big_query_apply_policy_tags'
3233
require 'services/airbyte'
3334

3435
module DfE

lib/dfe/analytics/big_query_api.rb

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,43 +9,42 @@ class BigQueryApi
99
RETRY_INITIAL_BASE_INTERVAL = 15
1010
RETRY_MAX_INTERVAL = 60
1111
RETRY_INTERVAL_MULTIPLIER = 2
12-
13-
def self.events_client
14-
@events_client ||= begin
15-
missing_config = %i[
16-
bigquery_project_id
17-
bigquery_table_name
18-
bigquery_dataset
19-
azure_client_id
20-
azure_token_path
21-
azure_scope
22-
gcp_scope
23-
google_cloud_credentials
24-
].select { |val| DfE::Analytics.config.send(val).blank? }
25-
26-
raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?
12+
BIGQUERY_MANDATORY_CONFIG = %i[
13+
bigquery_project_id
14+
bigquery_table_name
15+
bigquery_dataset
16+
azure_client_id
17+
azure_token_path
18+
azure_scope
19+
gcp_scope
20+
google_cloud_credentials
21+
].freeze
22+
23+
def self.client
24+
@client ||= begin
25+
DfE::Analytics::Config.check_missing_config!(BIGQUERY_MANDATORY_CONFIG)
2726

2827
Google::Apis::BigqueryV2::BigqueryService.new
2928
end
3029

31-
@events_client.authorization = DfE::Analytics::AzureFederatedAuth.gcp_client_credentials
32-
@events_client
30+
@client.authorization = DfE::Analytics::AzureFederatedAuth.gcp_client_credentials
31+
@client
3332
end
3433

3534
def self.insert(events)
3635
rows = events.map { |event| { json: event } }
3736
data_request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new(rows: rows, skip_invalid_rows: true)
3837
options = Google::Apis::RequestOptions.default
3938

40-
options.authorization = events_client.authorization
39+
options.authorization = client.authorization
4140
options.retries = DfE::Analytics.config.bigquery_retries
4241
options.max_elapsed_time = ALL_RETRIES_MAX_ELASPED_TIME
4342
options.base_interval = RETRY_INITIAL_BASE_INTERVAL
4443
options.max_interval = RETRY_MAX_INTERVAL
4544
options.multiplier = RETRY_INTERVAL_MULTIPLIER
4645

4746
response =
48-
events_client.insert_all_table_data(
47+
client.insert_all_table_data(
4948
DfE::Analytics.config.bigquery_project_id,
5049
DfE::Analytics.config.bigquery_dataset,
5150
DfE::Analytics.config.bigquery_table_name,
@@ -77,8 +76,46 @@ def self.error_message_for(response)
7776
"DfE::Analytics BigQuery API insert error for #{response.insert_errors.length} event(s):\n#{message}"
7877
end
7978

80-
class ConfigurationError < StandardError; end
79+
def self.apply_policy_tags(tables, policy_tag)
80+
tables.each do |table_name, column_names|
81+
begin
82+
table = client.get_table(
83+
DfE::Analytics.config.bigquery_project_id,
84+
DfE::Analytics.config.bigquery_airbyte_dataset,
85+
table_name.to_s
86+
)
87+
rescue Google::Apis::ClientError => e
88+
error_message = "DfE::Analytics Failed to retrieve table: #{table_name}: #{e.message}"
89+
Rails.logger.error(error_message)
90+
raise PolicyTagError, error_message
91+
end
92+
93+
updated_fields = table.schema.fields.map do |field|
94+
field.policy_tags = Google::Apis::BigqueryV2::TableFieldSchema::PolicyTags.new(names: [policy_tag]) if column_names.include?(field.name)
95+
field
96+
end
97+
98+
new_schema = Google::Apis::BigqueryV2::TableSchema.new(fields: updated_fields)
99+
updated_table = Google::Apis::BigqueryV2::Table.new(schema: new_schema)
100+
101+
begin
102+
client.patch_table(
103+
DfE::Analytics.config.bigquery_project_id,
104+
DfE::Analytics.config.bigquery_airbyte_dataset,
105+
table_name.to_s,
106+
updated_table,
107+
fields: 'schema'
108+
)
109+
rescue Google::Apis::ClientError => e
110+
error_message = "DfE::Analytics Failed to update table: #{table_name}: #{e.message}"
111+
Rails.logger.error(error_message)
112+
raise PolicyTagError, error_message
113+
end
114+
end
115+
end
116+
81117
class SendEventsError < StandardError; end
118+
class PolicyTagError < StandardError; end
82119
end
83120
end
84121
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
module DfE
4+
module Analytics
5+
# Applies BigQuery hidden policy tags to PII fields in the airbyte tables
6+
class BigQueryApplyPolicyTags < AnalyticsJob
7+
def self.do(delay_in_minutes: 0)
8+
if delay_in_minutes.zero?
9+
perform_later
10+
else
11+
time_to_run = Time.zone.now + delay_in_minutes.minutes
12+
13+
set(wait_until: time_to_run).perform_later
14+
end
15+
end
16+
17+
def perform
18+
unless DfE::Analytics.airbyte_enabled?
19+
Rails.logger.warn('DfE::Analytics::BigQueryApplyPolicyTags.perform called but airbyte is disabled. Please check DfE::Analytics.airbyte_enabled? before applying policy tags in BigQuery')
20+
return
21+
end
22+
23+
DfE::Analytics::BigQueryApi.apply_policy_tags(
24+
DfE::Analytics.hidden_pii,
25+
DfE::Analytics.config.bigquery_hidden_policy_tag
26+
)
27+
end
28+
end
29+
end
30+
end

lib/dfe/analytics/config.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ module Config
1111
bigquery_table_name
1212
bigquery_project_id
1313
bigquery_dataset
14+
bigquery_airbyte_dataset
1415
bigquery_api_json_key
16+
bigquery_hidden_policy_tag
1517
bigquery_retries
1618
bigquery_timeout
1719
enable_analytics
@@ -46,7 +48,9 @@ def self.configure(config)
4648
config.bigquery_table_name ||= ENV.fetch('BIGQUERY_TABLE_NAME', nil)
4749
config.bigquery_project_id ||= ENV.fetch('BIGQUERY_PROJECT_ID', nil)
4850
config.bigquery_dataset ||= ENV.fetch('BIGQUERY_DATASET', nil)
51+
config.bigquery_airbyte_dataset ||= ENV.fetch('BIGQUERY_AIRBYTE_DATASET', nil)
4952
config.bigquery_api_json_key ||= ENV.fetch('BIGQUERY_API_JSON_KEY', nil)
53+
config.bigquery_hidden_policy_tag ||= ENV.fetch('BIGQUERY_HIDDEN_POLICY_TAG', nil)
5054
config.bigquery_retries ||= 3
5155
config.bigquery_timeout ||= 120
5256
config.environment ||= ENV.fetch('RAILS_ENV', 'development')
@@ -77,6 +81,14 @@ def self.configure(config)
7781
config.azure_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_AZURE_SCOPE
7882
config.gcp_scope ||= DfE::Analytics::AzureFederatedAuth::DEFAULT_GCP_SCOPE
7983
end
84+
85+
def self.check_missing_config!(config)
86+
missing_config = config.select { |val| DfE::Analytics.config.send(val).blank? }
87+
88+
raise(ConfigurationError, "DfE::Analytics: missing required config values: #{missing_config.join(', ')}") if missing_config.any?
89+
end
90+
91+
class ConfigurationError < StandardError; end
8092
end
8193
end
8294
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace :dfe do
2+
namespace :analytics do
3+
desc 'Apply BigQuery policy tags. Optionally pass delay_in_minutes (default is 0)'
4+
task :big_query_apply_policy_tags, [:delay_in_minutes] => :environment do |_, args|
5+
delay = args[:delay_in_minutes].to_i || 0
6+
7+
puts "Calling DfE::Analytics::BigQueryApplyPolicyTags.do(delay_in_minutes: #{delay})"
8+
DfE::Analytics::BigQueryApplyPolicyTags.do(delay_in_minutes: delay)
9+
end
10+
end
11+
end

lib/dfe/analytics/testing.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def authorization; end
7474
end
7575

7676
module TestOverrides
77-
def events_client
77+
def client
7878
if DfE::Analytics::Testing.fake?
7979
StubClient.new
8080
else

lib/services/airbyte/connection_update.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def connection_update_payload
7171
supportedSyncModes: discovered_stream.dig('stream', 'supportedSyncModes'),
7272
defaultCursorField: discovered_stream.dig('stream', 'defaultCursorField'),
7373
sourceDefinedCursor: discovered_stream.dig('stream', 'sourceDefinedCursor'),
74-
sourceDefinedPrimaryKey: discovered_stream.dig('stream', 'sourceDefinedPrimaryKey'),
74+
sourceDefinedPrimaryKey: discovered_stream.dig('stream', 'sourceDefinedPrimaryKey')
7575
},
7676
config: {
7777
syncMode: discovered_stream.dig('config', 'syncMode') || SYNC_MODE,

spec/dfe/analytics/big_query_api_spec.rb

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,31 @@
1111
}
1212
end
1313

14-
let(:events_client) { double(:events_client) }
14+
let(:client) { double(:client) }
1515
let(:authorization) { double(:authorization) }
1616

1717
before(:each) do
1818
allow(DfE::Analytics.config).to receive(:azure_federated_auth).and_return(true)
1919

20-
allow(Google::Apis::BigqueryV2::BigqueryService).to receive(:new).and_return(events_client)
20+
allow(Google::Apis::BigqueryV2::BigqueryService).to receive(:new).and_return(client)
2121
allow(DfE::Analytics::AzureFederatedAuth).to receive(:gcp_client_credentials).and_return(authorization)
22-
allow(events_client).to receive(:authorization=).and_return(authorization)
23-
allow(events_client).to receive(:authorization).and_return(authorization)
22+
allow(client).to receive(:authorization=).and_return(authorization)
23+
allow(client).to receive(:authorization).and_return(authorization)
2424

2525
DfE::Analytics::Testing.webmock!
2626
end
2727

28-
describe '#events_client' do
28+
describe '#client' do
2929
it 'raises a configuration error on missing config values' do
3030
with_analytics_config(bigquery_project_id: nil) do
31-
expect { described_class.events_client }.to raise_error(DfE::Analytics::BigQueryApi::ConfigurationError)
31+
expect { described_class.client }.to raise_error(DfE::Analytics::Config::ConfigurationError)
3232
end
3333
end
3434

3535
context 'when authorization endpoint returns OK response' do
3636
it 'calls the expected big query apis' do
3737
with_analytics_config(test_dummy_config) do
38-
expect(described_class.events_client).to eq(events_client)
38+
expect(described_class.client).to eq(client)
3939
end
4040
end
4141
end
@@ -52,14 +52,14 @@
5252
let(:response) { double(:response, insert_errors: []) }
5353

5454
it 'does not log the request when event_debug disabled' do
55-
allow(events_client).to receive(:insert_all_table_data).and_return(response)
55+
allow(client).to receive(:insert_all_table_data).and_return(response)
5656
expect(Rails.logger).not_to receive(:info)
5757

5858
insert
5959
end
6060

6161
it 'calls the expected big query apis' do
62-
expect(events_client).to receive(:insert_all_table_data)
62+
expect(client).to receive(:insert_all_table_data)
6363
.with(
6464
test_dummy_config[:bigquery_project_id],
6565
test_dummy_config[:bigquery_dataset],
@@ -78,7 +78,7 @@
7878
let(:insert_error) { double(:insert_error, index: 0, errors: [error]) }
7979
let(:error) { double(:error, message: 'An error.') }
8080

81-
before { expect(events_client).to receive(:insert_all_table_data).and_return(response) }
81+
before { expect(client).to receive(:insert_all_table_data).and_return(response) }
8282

8383
it 'raises an exception' do
8484
expect { insert }.to raise_error(DfE::Analytics::BigQueryApi::SendEventsError, /An error./)
@@ -99,4 +99,77 @@
9999
end
100100
end
101101
end
102+
103+
describe '.apply_policy_tags' do
104+
let(:tables) { { users: %w[email name] } }
105+
let(:policy_tag) { 'projects/my-project/locations/eu/taxonomies/123/policyTags/abc' }
106+
107+
let(:table_schema_fields) do
108+
[
109+
double('field', name: 'email').tap { |f| allow(f).to receive(:policy_tags=) },
110+
double('field', name: 'name').tap { |f| allow(f).to receive(:policy_tags=) },
111+
double('field', name: 'created_at').tap { |f| allow(f).to receive(:policy_tags=) }
112+
]
113+
end
114+
115+
let(:schema) do
116+
instance_double(Google::Apis::BigqueryV2::TableSchema, fields: table_schema_fields)
117+
end
118+
119+
let(:table) do
120+
instance_double(Google::Apis::BigqueryV2::Table, schema: schema)
121+
end
122+
123+
before do
124+
allow(client).to receive(:get_table).with(
125+
test_dummy_config[:bigquery_project_id],
126+
'airbyte_dataset',
127+
'users'
128+
).and_return(table)
129+
130+
allow(client).to receive(:patch_table)
131+
end
132+
133+
it 'updates policy tags for matching columns only' do
134+
with_analytics_config(test_dummy_config.merge(bigquery_airbyte_dataset: 'airbyte_dataset')) do
135+
expect(client).to receive(:patch_table).with(
136+
test_dummy_config[:bigquery_project_id],
137+
'airbyte_dataset',
138+
'users',
139+
an_instance_of(Google::Apis::BigqueryV2::Table),
140+
fields: 'schema'
141+
)
142+
143+
described_class.apply_policy_tags(tables, policy_tag)
144+
145+
expect(table_schema_fields[0]).to have_received(:policy_tags=)
146+
expect(table_schema_fields[1]).to have_received(:policy_tags=)
147+
expect(table_schema_fields[2]).not_to have_received(:policy_tags=)
148+
end
149+
end
150+
151+
context 'when get_table raises a ClientError' do
152+
it 'raises a PolicyTagError and logs the error' do
153+
with_analytics_config(test_dummy_config.merge(bigquery_airbyte_dataset: 'airbyte_dataset')) do
154+
allow(client).to receive(:get_table).and_raise(Google::Apis::ClientError.new('Table not found'))
155+
expect(Rails.logger).to receive(:error).with(/Failed to retrieve table: users/)
156+
expect do
157+
described_class.apply_policy_tags(tables, policy_tag)
158+
end.to raise_error(DfE::Analytics::BigQueryApi::PolicyTagError)
159+
end
160+
end
161+
end
162+
163+
context 'when patch_table raises a ClientError' do
164+
it 'raises a PolicyTagError and logs the error' do
165+
with_analytics_config(test_dummy_config.merge(bigquery_airbyte_dataset: 'airbyte_dataset')) do
166+
allow(client).to receive(:patch_table).and_raise(Google::Apis::ClientError.new('Invalid schema'))
167+
expect(Rails.logger).to receive(:error).with(/Failed to update table: users/)
168+
expect do
169+
described_class.apply_policy_tags(tables, policy_tag)
170+
end.to raise_error(DfE::Analytics::BigQueryApi::PolicyTagError)
171+
end
172+
end
173+
end
174+
end
102175
end

0 commit comments

Comments
 (0)