Skip to content

Commit 3d296c4

Browse files
author
Kyle Weaver
authored
Merge pull request #12905 from sambvfx/artifact-endpoint
[BEAM-8660] Use PortableOptions.artifact_endpoint if provided over value reported from jobserver
2 parents 88037dc + 300d38d commit 3d296c4

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

sdks/python/apache_beam/runners/portability/local_job_service_test.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,22 @@
2222
import logging
2323
import unittest
2424

25+
import mock
26+
27+
from apache_beam.options import pipeline_options
2528
from apache_beam.portability.api import beam_job_api_pb2
2629
from apache_beam.portability.api import beam_runner_api_pb2
2730
from apache_beam.runners.portability import local_job_service
2831
from apache_beam.runners.portability.portable_runner import JobServiceHandle
32+
from apache_beam.runners.portability.portable_runner import PortableRunner
2933

3034

3135
class TestJobServicePlan(JobServiceHandle):
3236
def __init__(self, job_service):
3337
self.job_service = job_service
3438
self.options = None
3539
self.timeout = None
40+
self.artifact_endpoint = None
3641

3742
def get_pipeline_options(self):
3843
return None
@@ -102,6 +107,32 @@ def test_error_messages_after_pipeline_failure(self):
102107
for m in message_results),
103108
messages_again)
104109

110+
def test_artifact_service_override(self):
111+
job_service = local_job_service.LocalJobServicer()
112+
port = job_service.start_grpc_server()
113+
114+
test_artifact_endpoint = 'testartifactendpoint:4242'
115+
116+
options = pipeline_options.PipelineOptions([
117+
'--job_endpoint',
118+
'localhost:%d' % port,
119+
'--artifact_endpoint',
120+
test_artifact_endpoint,
121+
])
122+
runner = PortableRunner()
123+
job_service_handle = runner.create_job_service(options)
124+
125+
with mock.patch.object(job_service_handle, 'stage') as mocked_stage:
126+
job_service_handle.submit(beam_runner_api_pb2.Pipeline())
127+
mocked_stage.assert_called_once_with(
128+
mock.ANY, test_artifact_endpoint, mock.ANY)
129+
130+
# Confirm the artifact_endpoint is in the options protobuf
131+
options_proto = job_service_handle.get_pipeline_options()
132+
self.assertEqual(
133+
options_proto['beam:option:artifact_endpoint:v1'],
134+
test_artifact_endpoint)
135+
105136

106137
if __name__ == '__main__':
107138
logging.getLogger().setLevel(logging.INFO)

sdks/python/apache_beam/runners/portability/portable_runner.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(self, job_service, options, retain_unknown_options=False):
9696
self.job_service = job_service
9797
self.options = options
9898
self.timeout = options.view_as(PortableOptions).job_server_timeout
99+
self.artifact_endpoint = options.view_as(PortableOptions).artifact_endpoint
99100
self._retain_unknown_options = retain_unknown_options
100101

101102
def submit(self, proto_pipeline):
@@ -105,9 +106,12 @@ def submit(self, proto_pipeline):
105106
Submit and run the pipeline defined by `proto_pipeline`.
106107
"""
107108
prepare_response = self.prepare(proto_pipeline)
109+
artifact_endpoint = (
110+
self.artifact_endpoint or
111+
prepare_response.artifact_staging_endpoint.url)
108112
self.stage(
109113
proto_pipeline,
110-
prepare_response.artifact_staging_endpoint.url,
114+
artifact_endpoint,
111115
prepare_response.staging_session_token)
112116
return self.run(prepare_response.preparation_id)
113117

0 commit comments

Comments
 (0)