Skip to content

Commit b2e1238

Browse files
authored
feat(pipeline_options): add support for custom maven repository url (#36390)
* feat(pipeline_options): add support for custom maven repository url Add --maven_repository_url flag to SetupOptions to allow specifying custom Maven repository Modify JavaJarServer, JavaJarExpansionService and BeamJarExpansionService to use custom repository Update _resolve_expansion_service to pass maven_repository_url from pipeline options Add unit tests for maven_repository_url functionality * comments * retvert sdks/python/apache_beam/utils/subprocess_server.py * need to support maven_repository_url for path_to_beam_jar * use MAVEN_STAGING_REPOSITORY for rc
1 parent 08c96f2 commit b2e1238

File tree

5 files changed

+307
-19
lines changed

5 files changed

+307
-19
lines changed

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,6 +1741,13 @@ def _add_argparse_args(cls, parser):
17411741
help=(
17421742
'A user agent string describing the pipeline to external services. '
17431743
'The format should follow RFC2616.'))
1744+
parser.add_argument(
1745+
'--maven_repository_url',
1746+
default=None,
1747+
help=(
1748+
'Custom Maven repository URL to use for downloading JAR files. '
1749+
'If not specified, the default Maven Central repository will be '
1750+
'used.'))
17441751

17451752
def validate(self, validator):
17461753
errors = []

sdks/python/apache_beam/transforms/external.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,15 +1032,21 @@ class JavaJarExpansionService(object):
10321032
append_args: arguments to be provided when starting up the
10331033
expansion service using the jar file. These arguments will be appended to
10341034
the default arguments.
1035-
user_agent: the user agent to use when downloading the jar.
1035+
user_agent: HTTP user agent string used when downloading jars via
1036+
`JavaJarServer.local_jar`, including the main jar and any classpath
1037+
dependencies.
1038+
maven_repository_url: Maven repository base URL to resolve artifacts when
1039+
classpath entries or jars are specified as Maven coordinates
1040+
(`group:artifact:version`). Defaults to Maven Central if not provided.
10361041
"""
10371042
def __init__(
10381043
self,
10391044
path_to_jar,
10401045
extra_args=None,
10411046
classpath=None,
10421047
append_args=None,
1043-
user_agent=None):
1048+
user_agent=None,
1049+
maven_repository_url=None):
10441050
if extra_args and append_args:
10451051
raise ValueError('Only one of extra_args or append_args may be provided')
10461052
self.path_to_jar = path_to_jar
@@ -1049,12 +1055,13 @@ def __init__(
10491055
self._service_count = 0
10501056
self._append_args = append_args or []
10511057
self._user_agent = user_agent
1058+
self._maven_repository_url = maven_repository_url
10521059

10531060
def is_existing_service(self):
10541061
return subprocess_server.is_service_endpoint(self.path_to_jar)
10551062

10561063
@staticmethod
1057-
def _expand_jars(jar, user_agent=None):
1064+
def _expand_jars(jar, user_agent=None, maven_repository_url=None):
10581065
if glob.glob(jar):
10591066
return glob.glob(jar)
10601067
elif isinstance(jar, str) and (jar.startswith('http://') or
@@ -1073,15 +1080,21 @@ def _expand_jars(jar, user_agent=None):
10731080
return [jar]
10741081
path = subprocess_server.JavaJarServer.local_jar(
10751082
subprocess_server.JavaJarServer.path_to_maven_jar(
1076-
artifact_id, group_id, version),
1083+
artifact_id,
1084+
group_id,
1085+
version,
1086+
repository=(
1087+
maven_repository_url or
1088+
subprocess_server.JavaJarServer.MAVEN_CENTRAL_REPOSITORY)),
10771089
user_agent=user_agent)
10781090
return [path]
10791091

10801092
def _default_args(self):
10811093
"""Default arguments to be used by `JavaJarExpansionService`."""
10821094

10831095
to_stage = ','.join([self.path_to_jar] + sum((
1084-
JavaJarExpansionService._expand_jars(jar, self._user_agent)
1096+
JavaJarExpansionService._expand_jars(
1097+
jar, self._user_agent, self._maven_repository_url)
10851098
for jar in self._classpath or []), []))
10861099
args = ['{{PORT}}', f'--filesToStage={to_stage}']
10871100
# TODO(robertwb): See if it's possible to scope this per pipeline.
@@ -1110,7 +1123,8 @@ def __enter__(self):
11101123
subprocess_server.JavaJarServer.local_jar(path)
11111124
for jar in self._classpath
11121125
for path in JavaJarExpansionService._expand_jars(
1113-
jar, user_agent=self._user_agent)
1126+
jar, user_agent=self._user_agent,
1127+
maven_repository_url=self._maven_repository_url)
11141128
]
11151129
self._service_provider = subprocess_server.JavaJarServer(
11161130
ExpansionAndArtifactRetrievalStub,
@@ -1146,6 +1160,11 @@ class BeamJarExpansionService(JavaJarExpansionService):
11461160
append_args: arguments to be provided when starting up the
11471161
expansion service using the jar file. These arguments will be appended to
11481162
the default arguments.
1163+
user_agent: HTTP user agent string used when downloading the Beam jar and
1164+
any classpath dependencies.
1165+
maven_repository_url: Maven repository base URL to resolve the Beam jar
1166+
for the provided Gradle target. Defaults to Maven Central if not
1167+
provided.
11491168
"""
11501169
def __init__(
11511170
self,
@@ -1154,16 +1173,20 @@ def __init__(
11541173
gradle_appendix=None,
11551174
classpath=None,
11561175
append_args=None,
1157-
user_agent=None):
1176+
user_agent=None,
1177+
maven_repository_url=None):
11581178
path_to_jar = subprocess_server.JavaJarServer.path_to_beam_jar(
1159-
gradle_target, gradle_appendix)
1179+
gradle_target,
1180+
gradle_appendix,
1181+
maven_repository_url=maven_repository_url)
11601182
self.gradle_target = gradle_target
11611183
super().__init__(
11621184
path_to_jar,
11631185
extra_args,
11641186
classpath=classpath,
11651187
append_args=append_args,
1166-
user_agent=user_agent)
1188+
user_agent=user_agent,
1189+
maven_repository_url=maven_repository_url)
11671190

11681191

11691192
def _maybe_use_transform_service(provided_service=None, options=None):

sdks/python/apache_beam/transforms/managed.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,24 @@ def __init__(
118118
f"An unsupported source was specified: '{source}'. Please specify "
119119
f"one of the following sources: {list(self._READ_TRANSFORMS.keys())}")
120120

121-
self._expansion_service = _resolve_expansion_service(
122-
source, identifier, expansion_service)
121+
# Store parameters for deferred expansion service creation
122+
self._identifier = identifier
123+
self._provided_expansion_service = expansion_service
123124
self._underlying_identifier = identifier
124125
self._yaml_config = yaml.dump(config)
125126
self._config_url = config_url
126127

127128
def expand(self, input):
129+
# Create expansion service with access to pipeline options
130+
expansion_service = _resolve_expansion_service(
131+
self._source,
132+
self._identifier,
133+
self._provided_expansion_service,
134+
pipeline_options=input.pipeline._options)
135+
128136
return input | SchemaAwareExternalTransform(
129137
identifier=MANAGED_SCHEMA_TRANSFORM_IDENTIFIER,
130-
expansion_service=self._expansion_service,
138+
expansion_service=expansion_service,
131139
rearrange_based_on_discovery=True,
132140
transform_identifier=self._underlying_identifier,
133141
config=self._yaml_config,
@@ -162,16 +170,24 @@ def __init__(
162170
f"An unsupported sink was specified: '{sink}'. Please specify "
163171
f"one of the following sinks: {list(self._WRITE_TRANSFORMS.keys())}")
164172

165-
self._expansion_service = _resolve_expansion_service(
166-
sink, identifier, expansion_service)
173+
# Store parameters for deferred expansion service creation
174+
self._identifier = identifier
175+
self._provided_expansion_service = expansion_service
167176
self._underlying_identifier = identifier
168177
self._yaml_config = yaml.dump(config)
169178
self._config_url = config_url
170179

171180
def expand(self, input):
181+
# Create expansion service with access to pipeline options
182+
expansion_service = _resolve_expansion_service(
183+
self._sink,
184+
self._identifier,
185+
self._provided_expansion_service,
186+
pipeline_options=input.pipeline._options)
187+
172188
return input | SchemaAwareExternalTransform(
173189
identifier=MANAGED_SCHEMA_TRANSFORM_IDENTIFIER,
174-
expansion_service=self._expansion_service,
190+
expansion_service=expansion_service,
175191
rearrange_based_on_discovery=True,
176192
transform_identifier=self._underlying_identifier,
177193
config=self._yaml_config,
@@ -182,7 +198,10 @@ def default_label(self) -> str:
182198

183199

184200
def _resolve_expansion_service(
185-
transform_name: str, identifier: str, expansion_service):
201+
transform_name: str,
202+
identifier: str,
203+
expansion_service,
204+
pipeline_options=None):
186205
if expansion_service:
187206
return expansion_service
188207

@@ -193,4 +212,18 @@ def _resolve_expansion_service(
193212
raise ValueError(
194213
"No expansion service was specified and could not find a "
195214
f"default expansion service for {transform_name}: '{identifier}'.")
196-
return BeamJarExpansionService(gradle_target)
215+
216+
# Extract maven_repository_url and user_agent from pipeline options if
217+
# available
218+
maven_repository_url = None
219+
user_agent = None
220+
if pipeline_options:
221+
from apache_beam.options import pipeline_options as po
222+
setup_options = pipeline_options.view_as(po.SetupOptions)
223+
maven_repository_url = setup_options.maven_repository_url
224+
user_agent = setup_options.user_agent
225+
226+
return BeamJarExpansionService(
227+
gradle_target,
228+
maven_repository_url=maven_repository_url,
229+
user_agent=user_agent)

0 commit comments

Comments
 (0)