Skip to content

Commit edcfdc4

Browse files
tvalentynTheNeuralBitYichi ZhangKyle Weaveryeandy
authored
[BEAM-13388] Cherry-pick more changes for google cloud dlp update. (#16268)
Co-authored-by: Brian Hulette <hulettbh@gmail.com> Co-authored-by: Yichi Zhang <zyichi@google.com> Co-authored-by: Kyle Weaver <kcweaver@google.com> Co-authored-by: Andy Ye <andyye333@gmail.com>
1 parent 46e825a commit edcfdc4

File tree

3 files changed

+39
-22
lines changed

3 files changed

+39
-22
lines changed

sdks/python/apache_beam/io/gcp/pubsub_integration_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ def test_streaming_data_only(self):
210210

211211
@pytest.mark.it_postcommit
212212
def test_streaming_with_attributes(self):
213+
if self.runner_name == 'TestDataflowRunner':
214+
pytest.skip("BEAM-13218")
213215
self._test_streaming(with_attributes=True)
214216

215217

sdks/python/apache_beam/ml/gcp/cloud_dlp.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
"""
2121

2222
import logging
23+
from typing import List
2324

2425
from google.cloud import dlp_v2
2526

27+
from apache_beam import typehints
2628
from apache_beam.options.pipeline_options import GoogleCloudOptions
2729
from apache_beam.transforms import DoFn
2830
from apache_beam.transforms import ParDo
@@ -35,6 +37,8 @@
3537

3638

3739
@experimental()
40+
@typehints.with_input_types(str)
41+
@typehints.with_output_types(str)
3842
class MaskDetectedDetails(PTransform):
3943
"""Scrubs sensitive information detected in text.
4044
The ``PTransform`` returns a ``PCollection`` of ``str``
@@ -126,6 +130,8 @@ def expand(self, pcoll):
126130

127131

128132
@experimental()
133+
@typehints.with_input_types(str)
134+
@typehints.with_output_types(List[dlp_v2.types.dlp.Finding])
129135
class InspectForDetails(PTransform):
130136
"""Inspects input text for sensitive information.
131137
the ``PTransform`` returns a ``PCollection`` of
@@ -190,13 +196,13 @@ def setup(self):
190196
self.client = dlp_v2.DlpServiceClient()
191197
self.params = {
192198
'timeout': self.timeout,
193-
'parent': self.client.project_path(self.project)
194199
}
195-
self.params.update(self.config)
200+
self.parent = self.client.common_project_path(self.project)
196201

197202
def process(self, element, **kwargs):
198-
operation = self.client.deidentify_content(
199-
item={"value": element}, **self.params)
203+
request = {'item': {'value': element}, 'parent': self.parent}
204+
request.update(self.config)
205+
operation = self.client.deidentify_content(request=request, **self.params)
200206
yield operation.item.value
201207

202208

@@ -213,12 +219,12 @@ def setup(self):
213219
self.client = dlp_v2.DlpServiceClient()
214220
self.params = {
215221
'timeout': self.timeout,
216-
"parent": self.client.project_path(self.project)
217222
}
218-
self.params.update(self.config)
223+
self.parent = self.client.common_project_path(self.project)
219224

220225
def process(self, element, **kwargs):
221-
operation = self.client.inspect_content(
222-
item={"value": element}, **self.params)
226+
request = {'item': {'value': element}, 'parent': self.parent}
227+
request.update(self.config)
228+
operation = self.client.inspect_content(request=request, **self.params)
223229
hits = [x for x in operation.result.findings]
224230
yield hits

sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from apache_beam.ml.gcp.cloud_dlp import MaskDetectedDetails
3838
from apache_beam.ml.gcp.cloud_dlp import _DeidentifyFn
3939
from apache_beam.ml.gcp.cloud_dlp import _InspectFn
40+
from google.cloud.dlp_v2.types import dlp
4041
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
4142

4243
_LOGGER = logging.getLogger(__name__)
@@ -56,6 +57,9 @@ class TestDeidentifyFn(unittest.TestCase):
5657
def test_deidentify_called(self):
5758
class ClientMock(object):
5859
def deidentify_content(self, *args, **kwargs):
60+
# Check that we can marshal a valid request.
61+
dlp.DeidentifyContentRequest(kwargs['request'])
62+
5963
called = Metrics.counter('test_deidentify_text', 'called')
6064
called.inc()
6165
operation = mock.Mock()
@@ -64,27 +68,29 @@ def deidentify_content(self, *args, **kwargs):
6468
operation.item = item
6569
return operation
6670

67-
def project_path(self, *args):
71+
def common_project_path(self, *args):
6872
return 'test'
6973

7074
with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
7175
p = TestPipeline()
72-
deidentify_config = {
73-
"info_type_transformations": {
74-
"transformations": [{
75-
"primitive_transformation": {
76-
"character_mask_config": {
77-
"masking_character": '#'
76+
config = {
77+
"deidentify_config": {
78+
"info_type_transformations": {
79+
"transformations": [{
80+
"primitive_transformation": {
81+
"character_mask_config": {
82+
"masking_character": '#'
83+
}
7884
}
79-
}
80-
}]
85+
}]
86+
}
8187
}
8288
}
8389
# pylint: disable=expression-not-assigned
8490
(
8591
p
8692
| beam.Create(['mary.sue@example.com', 'john.doe@example.com'])
87-
| beam.ParDo(_DeidentifyFn(config=deidentify_config)))
93+
| beam.ParDo(_DeidentifyFn(config=config)))
8894
result = p.run()
8995
result.wait_until_finish()
9096
called = result.metrics().query()['counters'][0]
@@ -101,28 +107,31 @@ def test_exception_raised_then_no_config_provided(self):
101107

102108

103109
@unittest.skipIf(dlp_v2 is None, 'GCP dependencies are not installed')
104-
class TestDeidentifyFn(unittest.TestCase):
110+
class TestInspectFn(unittest.TestCase):
105111
def test_inspect_called(self):
106112
class ClientMock(object):
107113
def inspect_content(self, *args, **kwargs):
114+
# Check that we can marshal a valid request.
115+
dlp.InspectContentRequest(kwargs['request'])
116+
108117
called = Metrics.counter('test_inspect_text', 'called')
109118
called.inc()
110119
operation = mock.Mock()
111120
operation.result = mock.Mock()
112121
operation.result.findings = [None]
113122
return operation
114123

115-
def project_path(self, *args):
124+
def common_project_path(self, *args):
116125
return 'test'
117126

118127
with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
119128
p = TestPipeline()
120-
inspect_config = {"info_types": [{"name": "EMAIL_ADDRESS"}]}
129+
config = {"inspect_config": {"info_types": [{"name": "EMAIL_ADDRESS"}]}}
121130
# pylint: disable=expression-not-assigned
122131
(
123132
p
124133
| beam.Create(['mary.sue@example.com', 'john.doe@example.com'])
125-
| beam.ParDo(_InspectFn(config=inspect_config)))
134+
| beam.ParDo(_InspectFn(config=config)))
126135
result = p.run()
127136
result.wait_until_finish()
128137
called = result.metrics().query()['counters'][0]

0 commit comments

Comments
 (0)