Skip to content

Commit bc33a33

Browse files
authored
Allow users to specify custom audit entries in pipeline options. (#34062)
* Allow users to specify custom audit entries in pipeline options. * Add sanity check on custom audit entries during pipeline option parsing. * Fix lints and typos. * Add docstring * Change the flag to snake case.
1 parent 7d5f229 commit bc33a33

File tree

4 files changed

+158
-5
lines changed

4 files changed

+158
-5
lines changed

sdks/python/apache_beam/io/gcp/gcsio.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,21 @@ def create_storage_client(pipeline_options, use_credentials=True):
125125
from google.api_core import client_info
126126
beam_client_info = client_info.ClientInfo(
127127
user_agent="apache-beam/%s (GPN:Beam)" % beam_version.__version__)
128+
129+
extra_headers = {
130+
"x-goog-custom-audit-job": google_cloud_options.job_name
131+
if google_cloud_options.job_name else "UNKNOWN"
132+
}
133+
134+
# Note: Custom audit entries with "job" key will overwrite the default above
135+
if google_cloud_options.gcs_custom_audit_entries is not None:
136+
extra_headers.update(google_cloud_options.gcs_custom_audit_entries)
137+
128138
return storage.Client(
129139
credentials=credentials.get_google_auth_credentials(),
130140
project=google_cloud_options.project,
131141
client_info=beam_client_info,
132-
extra_headers={
133-
"x-goog-custom-audit-job": google_cloud_options.job_name
134-
if google_cloud_options.job_name else "UNKNOWN"
135-
})
142+
extra_headers=extra_headers)
136143
else:
137144
return storage.Client.create_anonymous_client()
138145

sdks/python/apache_beam/io/gcp/gcsio_test.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from apache_beam.metrics.execution import MetricsContainer
3131
from apache_beam.metrics.execution import MetricsEnvironment
3232
from apache_beam.metrics.metricbase import MetricName
33+
from apache_beam.pipeline import PipelineOptions
3334
from apache_beam.runners.worker import statesampler
3435
from apache_beam.utils import counters
3536

@@ -712,7 +713,13 @@ def test_headers(self, mock_get_service_credentials, mock_do_request):
712713
mock_get_service_credentials.return_value = _ApitoolsCredentialsAdapter(
713714
_make_credentials("test-project"))
714715

715-
gcs = gcsio.GcsIO(pipeline_options={"job_name": "test-job-name"})
716+
options = PipelineOptions([
717+
"--job_name=test-job-name",
718+
"--gcs_custom_audit_entry=user=test-user-id",
719+
"--gcs_custom_audit_entries={\"id\": \"1234\", \"status\": \"ok\"}"
720+
])
721+
722+
gcs = gcsio.GcsIO(pipeline_options=options)
716723
# no HTTP request when initializing GcsIO
717724
mock_do_request.assert_not_called()
718725

@@ -732,6 +739,9 @@ def test_headers(self, mock_get_service_credentials, mock_do_request):
732739
beam_user_agent = "apache-beam/%s (GPN:Beam)" % beam_version.__version__
733740
self.assertIn(beam_user_agent, actual_headers['User-Agent'])
734741
self.assertEqual(actual_headers['x-goog-custom-audit-job'], 'test-job-name')
742+
self.assertEqual(actual_headers['x-goog-custom-audit-user'], 'test-user-id')
743+
self.assertEqual(actual_headers['x-goog-custom-audit-id'], '1234')
744+
self.assertEqual(actual_headers['x-goog-custom-audit-status'], 'ok')
735745

736746
@mock.patch('google.cloud._http.JSONConnection._do_request')
737747
@mock.patch('apache_beam.internal.gcp.auth.get_service_credentials')

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,71 @@ def __call__(self, parser, namespace, values, option_string=None):
149149
getattr(namespace, self.dest).update(values)
150150

151151

152+
class _GcsCustomAuditEntriesAction(argparse.Action):
153+
"""
154+
Argparse Action for GCS custom audit entries.
155+
156+
According to Google Cloud Storage audit logging documentation
157+
(https://cloud.google.com/storage/docs/audit-logging#add-custom-metadata), the
158+
following limitations apply:
159+
- keys must be 64 characters or less,
160+
- values 1,200 characters or less, and
161+
- a maximum of four custom metadata entries are permitted per request.
162+
"""
163+
MAX_KEY_LENGTH = 64
164+
MAX_VALUE_LENGTH = 1200
165+
MAX_ENTRIES = 4
166+
167+
def _exceed_entry_limit(self):
168+
if 'x-goog-custom-audit-job' in self._custom_audit_entries:
169+
return len(
170+
self._custom_audit_entries) > _GcsCustomAuditEntriesAction.MAX_ENTRIES
171+
else:
172+
return len(self._custom_audit_entries) > (
173+
_GcsCustomAuditEntriesAction.MAX_ENTRIES - 1)
174+
175+
def _add_entry(self, key, value):
176+
if len(key) > _GcsCustomAuditEntriesAction.MAX_KEY_LENGTH:
177+
raise argparse.ArgumentError(
178+
None,
179+
"The key '%s' in GCS custom audit entries exceeds the %d-character limit." # pylint: disable=line-too-long
180+
% (key, _GcsCustomAuditEntriesAction.MAX_KEY_LENGTH))
181+
182+
if len(value) > _GcsCustomAuditEntriesAction.MAX_VALUE_LENGTH:
183+
raise argparse.ArgumentError(
184+
None,
185+
"The value '%s' in GCS custom audit entries exceeds the %d-character limit." # pylint: disable=line-too-long
186+
% (value, _GcsCustomAuditEntriesAction.MAX_VALUE_LENGTH))
187+
188+
self._custom_audit_entries[f"x-goog-custom-audit-{key}"] = value
189+
190+
def __call__(self, parser, namespace, values, option_string=None):
191+
if not hasattr(namespace,
192+
self.dest) or getattr(namespace, self.dest) is None:
193+
setattr(namespace, self.dest, {})
194+
self._custom_audit_entries = getattr(namespace, self.dest)
195+
196+
if option_string == '--gcs_custom_audit_entries':
197+
# in the format of {"key": "value"}
198+
assert (isinstance(values, str))
199+
sub_entries = json.loads(values)
200+
for key, value in sub_entries.items():
201+
self._add_entry(key, value)
202+
else: # option_string == '--gcs_custom_audit_entry'
203+
# in the format of 'key=value'
204+
assert (isinstance(values, str))
205+
parts = values.split('=', 1)
206+
key = parts[0]
207+
value = parts[1] if len(parts) > 1 else ''
208+
self._add_entry(key, value)
209+
210+
if self._exceed_entry_limit():
211+
raise argparse.ArgumentError(
212+
None,
213+
"The maximum allowed number of GCS custom audit entries (including the default x-goo-custom-audit-job) is %d." # pylint: disable=line-too-long
214+
% _GcsCustomAuditEntriesAction.MAX_ENTRIES)
215+
216+
152217
class PipelineOptions(HasDisplayData):
153218
"""This class and subclasses are used as containers for command line options.
154219
@@ -953,6 +1018,16 @@ def _add_argparse_args(cls, parser):
9531018
action='store_true',
9541019
help='Use blob generation when mutating blobs in GCSIO to '
9551020
'mitigate race conditions at the cost of more HTTP requests.')
1021+
parser.add_argument(
1022+
'--gcs_custom_audit_entry',
1023+
'--gcs_custom_audit_entries',
1024+
dest='gcs_custom_audit_entries',
1025+
action=_GcsCustomAuditEntriesAction,
1026+
default=None,
1027+
help='Custom information to be attached to audit logs. '
1028+
'Entries are key value pairs separated by = '
1029+
'(e.g. --gcs_custom_audit_entry key=value) or a JSON string '
1030+
'(e.g. --gcs_custom_audit_entries=\'{ "user": "test", "id": "12" }\').')
9561031

9571032
def _create_default_gcs_bucket(self):
9581033
try:

sdks/python/apache_beam/options/pipeline_options_test.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,67 @@ def test_options_store_false_with_different_dest(self):
723723
"the dest and the flag name to the map "
724724
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")
725725

726+
def test_gcs_custom_audit_entries(self):
727+
options = PipelineOptions([
728+
'--gcs_custom_audit_entry=user=test-user',
729+
'--gcs_custom_audit_entry=work=test-work',
730+
'--gcs_custom_audit_entries={"job":"test-job", "id":"1234"}'
731+
])
732+
entries = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
733+
self.assertDictEqual(
734+
entries,
735+
{
736+
'x-goog-custom-audit-user': 'test-user',
737+
'x-goog-custom-audit-work': 'test-work',
738+
'x-goog-custom-audit-job': 'test-job',
739+
'x-goog-custom-audit-id': '1234'
740+
})
741+
742+
@mock.patch('apache_beam.options.pipeline_options._BeamArgumentParser.error')
743+
def test_gcs_custom_audit_entries_with_errors(self, mock_error):
744+
long_key = 'a' * 65
745+
options = PipelineOptions([f'--gcs_custom_audit_entry={long_key}=1'])
746+
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
747+
self.assertRegex(
748+
mock_error.call_args[0][0],
749+
'The key .* exceeds the 64-character limit.')
750+
751+
mock_error.reset_mock()
752+
753+
long_value = 'b' * 1201
754+
options = PipelineOptions([f'--gcs_custom_audit_entry=key={long_value}'])
755+
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
756+
self.assertRegex(
757+
mock_error.call_args[0][0],
758+
'The value .* exceeds the 1200-character limit.')
759+
760+
mock_error.reset_mock()
761+
762+
options = PipelineOptions([
763+
'--gcs_custom_audit_entry=a=1',
764+
'--gcs_custom_audit_entry=b=2',
765+
'--gcs_custom_audit_entry=c=3',
766+
'--gcs_custom_audit_entry=d=4',
767+
'--gcs_custom_audit_entry=job=test-job'
768+
])
769+
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
770+
self.assertRegex(
771+
mock_error.call_args[0][0],
772+
'The maximum allowed number of GCS custom audit entries .*')
773+
774+
mock_error.reset_mock()
775+
776+
options = PipelineOptions([
777+
'--gcs_custom_audit_entry=a=1',
778+
'--gcs_custom_audit_entry=b=2',
779+
'--gcs_custom_audit_entry=c=3',
780+
'--gcs_custom_audit_entry=d=4'
781+
])
782+
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
783+
self.assertRegex(
784+
mock_error.call_args[0][0],
785+
'The maximum allowed number of GCS custom audit entries .*')
786+
726787
def _check_errors(self, options, validator, expected):
727788
if has_gcsio:
728789
with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',

0 commit comments

Comments
 (0)