Skip to content

Commit 0697e54

Browse files
authored
Kerberos Authentication for KafkaIO (#36099)
* Add the FileAwareFactoryFn and the KerberosConsumerFactoryFn classes to support consumer factories which pull files from GCS. * Revert "Add the FileAwareFactoryFn and the KerberosConsumerFactoryFn classes to support consumer factories which pull files from GCS." This reverts commit f8f69d9. * Add tests for file aware factory fn * Add changes to the build and integration files for manual testing. Be sure to remove these later as they cannot stay. * Migrate to a new module such that kafka remains GCP Agnostic. * Clean up classes for PR review * Move the existing module files to the extensions repo. This module will contain the factory functions to be utilized by users and the cross lang expansion service. * Modify the base class to use GCS client instead of GCS FileSystems. This is a more lightweight dependency for the expansion service. * remove merge conflict code. * generalize the classes such that no external storage system is strictly specified. * Add tests to fill gaps in coverage. * Add javadoc * Change logging to not include potential secret values.
1 parent 86bc452 commit 0697e54

File tree

8 files changed

+983
-0
lines changed

8 files changed

+983
-0
lines changed

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ tasks.register("javaioPreCommit") {
353353
dependsOn(":sdks:java:io:jms:build")
354354
dependsOn(":sdks:java:io:kafka:build")
355355
dependsOn(":sdks:java:io:kafka:upgrade:build")
356+
dependsOn(":sdks:java:io:kafka:file-aware-factories:build")
356357
dependsOn(":sdks:java:io:kudu:build")
357358
dependsOn(":sdks:java:io:mongodb:build")
358359
dependsOn(":sdks:java:io:mqtt:build")
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
plugins { id 'org.apache.beam.module' }
20+
applyJavaNature(
21+
automaticModuleName: 'org.apache.beam.sdk.extensions.kafka.factories',
22+
publish: 'False'
23+
)
24+
25+
description = "Apache Beam :: SDKs :: Java :: Extensions :: Kafka :: Factories"
26+
ext.summary = "Library to instantiate kafka clients with files from GCS or SecretManager."
27+
28+
dependencies {
29+
// ------------------------- CORE DEPENDENCIES -------------------------
30+
implementation project(path: ":sdks:java:core", configuration: "shadow")
31+
provided library.java.kafka_clients
32+
implementation 'com.google.cloud:google-cloud-secretmanager:2.72.0'
33+
implementation library.java.slf4j_api
34+
implementation library.java.vendored_guava_32_1_2_jre
35+
implementation project(path: ":sdks:java:extensions:google-cloud-platform-core")
36+
permitUnusedDeclared project(path: ":sdks:java:extensions:google-cloud-platform-core")
37+
// ------------------------- TEST DEPENDENCIES -------------------------
38+
testImplementation 'org.apache.kafka:kafka-clients:3.9.0'
39+
testImplementation library.java.junit
40+
testImplementation library.java.mockito_core
41+
testRuntimeOnly library.java.mockito_inline
42+
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
43+
}
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.extensions.kafka.factories;
19+
20+
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
21+
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
22+
import com.google.cloud.secretmanager.v1.SecretVersionName;
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.nio.channels.FileChannel;
26+
import java.nio.channels.ReadableByteChannel;
27+
import java.nio.charset.StandardCharsets;
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
30+
import java.nio.file.Paths;
31+
import java.nio.file.StandardOpenOption;
32+
import java.util.HashMap;
33+
import java.util.HashSet;
34+
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.ConcurrentHashMap;
37+
import java.util.regex.Matcher;
38+
import java.util.regex.Pattern;
39+
import org.apache.beam.sdk.io.FileSystems;
40+
import org.apache.beam.sdk.transforms.SerializableFunction;
41+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
45+
/**
46+
* An abstract {@link SerializableFunction} that serves as a base class for factories that need to
47+
* process a configuration map to handle external resources like files and secrets.
48+
*
49+
* <p>This class is designed to be extended by concrete factory implementations (e.g., for creating
50+
* Kafka consumers). It automates the process of detecting special URI strings within the
51+
* configuration values and transforming them before passing the processed configuration to the
52+
* subclass.
53+
*
54+
* <h3>Supported Patterns:</h3>
55+
*
56+
* <ul>
57+
* <li><b>External File Paths:</b> It recognizes paths prefixed with schemes like {@code gs://} or
58+
* {@code s3://} that are supported by the Beam {@link FileSystems} API. It downloads these
59+
* files to a local temporary directory (under {@code /tmp/<factory-type>/...}) and replaces
60+
* the original path in the configuration with the new local file path.
61+
* <li><b>Secret Manager Values:</b> It recognizes strings prefixed with {@code secretValue:}. It
62+
* interprets the rest of the string as a Google Secret Manager secret version name (e.g.,
63+
* "projects/p/secrets/s/versions/v"), fetches the secret payload, and replaces the original
64+
* {@code secretValue:...} identifier with the plain-text secret.
65+
* </ul>
66+
*
67+
* <h3>Usage:</h3>
68+
*
69+
* <p>A subclass must implement the {@link #createObject(Map)} method, which receives the fully
70+
* processed configuration map with all paths localized and secrets resolved. Subclasses can also
71+
* override {@link #downloadAndProcessExtraFiles()} to handle specific preliminary file downloads
72+
* (e.g., a krb5.conf file) before the main configuration processing begins.
73+
*
74+
* @param <T> The type of object this factory creates.
75+
*/
76+
public abstract class FileAwareFactoryFn<T>
77+
implements SerializableFunction<Map<String, Object>, T> {
78+
79+
public static final String SECRET_VALUE_PREFIX = "secretValue:";
80+
public static final String DIRECTORY_PREFIX = "/tmp";
81+
private static final Pattern PATH_PATTERN =
82+
Pattern.compile("([a-zA-Z0-9]+://[^\"]+)|(secretValue:[^\"]+)|(secretFile:[^\"]+)");
83+
84+
private static final Map<String, byte[]> secretCache = new ConcurrentHashMap<>();
85+
86+
private final String factoryType;
87+
private static final Logger LOG = LoggerFactory.getLogger(FileAwareFactoryFn.class);
88+
89+
public FileAwareFactoryFn(String factoryType) {
90+
Preconditions.checkNotNull(factoryType);
91+
this.factoryType = factoryType;
92+
}
93+
94+
protected abstract T createObject(Map<String, Object> config);
95+
96+
@Override
97+
public T apply(Map<String, Object> config) {
98+
if (config == null) {
99+
return createObject(config);
100+
}
101+
102+
Map<String, Object> processedConfig = new HashMap<>(config);
103+
104+
String key = "";
105+
Object value = null;
106+
try {
107+
downloadAndProcessExtraFiles();
108+
109+
for (Map.Entry<String, Object> e : config.entrySet()) {
110+
try {
111+
key = e.getKey();
112+
value = e.getValue();
113+
if (value instanceof String) {
114+
String originalValue = (String) value;
115+
Matcher matcher = PATH_PATTERN.matcher(originalValue);
116+
StringBuffer sb = new StringBuffer();
117+
118+
while (matcher.find()) {
119+
String externalPath = matcher.group(1);
120+
String secretValue = matcher.group(2);
121+
String secretFile = matcher.group(3);
122+
123+
if (externalPath != null) {
124+
try {
125+
String tmpPath = replacePathWithLocal(externalPath);
126+
String localPath = downloadExternalFile(externalPath, tmpPath);
127+
matcher.appendReplacement(sb, Matcher.quoteReplacement(localPath));
128+
LOG.info("Downloaded {} to {}", externalPath, localPath);
129+
} catch (IOException io) {
130+
throw new IOException("Failed to download file : " + externalPath, io);
131+
}
132+
} else if (secretValue != null) {
133+
try {
134+
String secretId = secretValue.substring(SECRET_VALUE_PREFIX.length());
135+
String processedSecret =
136+
processSecret(originalValue, secretId, getSecretWithCache(secretId));
137+
138+
matcher.appendReplacement(sb, Matcher.quoteReplacement(processedSecret));
139+
} catch (IllegalArgumentException ia) {
140+
throw new IllegalArgumentException("Failed to get secret.", ia);
141+
}
142+
} else if (secretFile != null) {
143+
throw new UnsupportedOperationException("Not yet implemented.");
144+
}
145+
}
146+
matcher.appendTail(sb);
147+
String processedValue = sb.toString();
148+
processedConfig.put(key, processedValue);
149+
}
150+
} catch (IOException ex) {
151+
throw new RuntimeException("Failed trying to process value for key " + key + ".", ex);
152+
}
153+
}
154+
} catch (IOException e) {
155+
throw new RuntimeException("Failed trying to process extra files.", e);
156+
}
157+
158+
return createObject(processedConfig);
159+
}
160+
161+
/**
162+
* A function to download files from their specified external storage path and copy them to the
163+
* provided local filepath. The local filepath is provided by the replacePathWithLocal.
164+
*
165+
* @param externalFilePath
166+
* @param outputFileString
167+
* @return
168+
* @throws IOException
169+
*/
170+
protected static synchronized String downloadExternalFile(
171+
String externalFilePath, String outputFileString) throws IOException {
172+
// create the file only if it doesn't exist
173+
if (new File(outputFileString).exists()) {
174+
return outputFileString;
175+
}
176+
Path outputFilePath = Paths.get(outputFileString);
177+
Path parentDir = outputFilePath.getParent();
178+
if (parentDir != null) {
179+
Files.createDirectories(parentDir);
180+
}
181+
LOG.info("Staging external file [{}] to [{}]", externalFilePath, outputFileString);
182+
Set<StandardOpenOption> options = new HashSet<>(2);
183+
options.add(StandardOpenOption.CREATE);
184+
options.add(StandardOpenOption.WRITE);
185+
186+
// Copy the external file into a local file and will throw an I/O exception in case file not
187+
// found.
188+
try (ReadableByteChannel readerChannel =
189+
FileSystems.open(FileSystems.matchSingleFileSpec(externalFilePath).resourceId())) {
190+
try (FileChannel writeChannel = FileChannel.open(outputFilePath, options)) {
191+
writeChannel.transferFrom(readerChannel, 0, Long.MAX_VALUE);
192+
}
193+
}
194+
return outputFileString;
195+
}
196+
197+
protected byte[] getSecretWithCache(String secretId) {
198+
return secretCache.computeIfAbsent(secretId, this::getSecret);
199+
}
200+
201+
/**
202+
* A helper method to create a new string with the external paths replaced with their local path
203+
* and subdirectory based on the factory type in the /tmp directory. For example, the kerberos
204+
* factory type will replace the file paths with /tmp/kerberos/file.path
205+
*
206+
* @param externalPath
207+
* @return a string with all instances of external paths converted to the local paths where the
208+
* files sit.
209+
*/
210+
private String replacePathWithLocal(String externalPath) throws IOException {
211+
String externalBucketPrefixIdentifier = "://";
212+
int externalBucketPrefixIndex = externalPath.lastIndexOf(externalBucketPrefixIdentifier);
213+
if (externalBucketPrefixIndex == -1) {
214+
// if we don't find a known bucket prefix then we will error early.
215+
throw new RuntimeException(
216+
"The provided external bucket could not be matched to a known source.");
217+
}
218+
219+
int prefixLength = externalBucketPrefixIndex + externalBucketPrefixIdentifier.length();
220+
return DIRECTORY_PREFIX + "/" + factoryType + "/" + externalPath.substring(prefixLength);
221+
}
222+
223+
/**
224+
* @throws IOException A hook for subclasses to download and process specific files before the
225+
* main configuration is handled. For example, the kerberos factory can use this to download a
226+
* krb5.conf and set a system property.
227+
*/
228+
protected void downloadAndProcessExtraFiles() throws IOException {
229+
// Default implementation should do nothing.
230+
}
231+
232+
protected String getBaseDirectory() {
233+
return DIRECTORY_PREFIX;
234+
}
235+
236+
protected byte[] getSecret(String secretVersion) {
237+
SecretVersionName secretVersionName;
238+
if (SecretVersionName.isParsableFrom(secretVersion)) {
239+
secretVersionName = SecretVersionName.parse(secretVersion);
240+
} else {
241+
throw new IllegalArgumentException(
242+
"Provided Secret must be in the form"
243+
+ " projects/{project}/secrets/{secret}/versions/{secret_version}");
244+
}
245+
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
246+
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
247+
return response.getPayload().getData().toByteArray();
248+
} catch (IOException e) {
249+
throw new RuntimeException(e);
250+
}
251+
}
252+
253+
protected String processSecret(String originalValue, String secretId, byte[] secretValue) {
254+
// By Default, this will return the secret value directly. This function can be overridden by
255+
// derived classes.
256+
return new String(secretValue, StandardCharsets.UTF_8);
257+
}
258+
}

0 commit comments

Comments
 (0)