Kerberos Authentication for KafkaIO#36099
Conversation
There was a problem hiding this comment.
Summary of Changes
Hello @fozzie15, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly enhances the Apache Beam KafkaIO connector by introducing a robust and secure mechanism for configuring Kafka clients. It allows users to leverage external services like Google Cloud Storage and Secret Manager for managing sensitive configuration files and credentials, such as Kerberos keytabs and krb5.conf. This abstraction simplifies the process of setting up complex authentication methods like Kerberos, making KafkaIO more versatile and enterprise-ready.
Highlights
- New Kafka Client Factory Module: Introduces a new module
sdks:java:io:kafka:file-aware-factoriesto provide flexible Kafka client creation. - GCS and Secret Manager Integration: Enables Kafka client configurations to reference files from Google Cloud Storage (GCS) and secrets from Google Secret Manager, which are then downloaded/resolved locally.
- Kerberos Authentication Support: Provides a specific implementation,
KerberosConsumerFactoryFn, to facilitate Kerberos authentication for KafkaIO by handlingkrb5.confand keytab files securely.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
6034093 to
ab4e147
Compare
|
Assigning reviewers: R: @ahmedabu98 for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
I'll keep the comments I provided offline short. Now that I'm seeing how this would be used based on your tests I've got another suggestion that may help to make this more generic and easier to configure in cross language pipelines. :) I think these additions should be under The functionality is very much Dataflow and GCP centric and I agree that this is very much a Dataflow problem because it provides no way to mount shared file systems or secrets. Still, it may not be possible to add volume mounts in the execution environment of other runners so this can be useful outside of Dataflow and GCP. If I understood correctly these consumer factory functions process the Kafka client configuration and replace occurrences of a few template strings based on a prefix, right? Variable substitution with custom substitution functions is supported by Kafka through The template variables are provided as |
|
Here's a sample for the package org.apache.beam.sdk.extensions.kafka.config;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.ConfigProvider;
public final class FileSystemConfigProvider implements ConfigProvider {
static class Config extends AbstractConfig {
static final String TTL_MS_CONFIG = "ttl.ms";
static final ConfigDef CONFIG =
new ConfigDef()
.define(
TTL_MS_CONFIG,
ConfigDef.Type.LONG,
0L,
ConfigDef.Range.atLeast(0L),
ConfigDef.Importance.LOW,
"TTL in milliseconds for temporary files.");
Config(Map<?, ?> props) {
super(CONFIG, props);
}
}
private static final LoadingCache<List<? extends Object>, LoadingCache<String, String>> CACHES =
CacheBuilder.newBuilder()
.build(
new CacheLoader<List<? extends Object>, LoadingCache<String, String>>() {
@Override
public LoadingCache<String, String> load(List<? extends Object> key)
throws IOException {
final long ttlMillis = (long) key.get(0);
final String scheme = (String) key.get(1);
final Path tempDirectory = Files.createTempDirectory("beam-");
tempDirectory.toFile().deleteOnExit();
return (ttlMillis > 0
? CacheBuilder.newBuilder()
.refreshAfterWrite(ttlMillis, TimeUnit.MILLISECONDS)
: CacheBuilder.newBuilder())
.build(
new CacheLoader<String, String>() {
@Override
public String load(String key) throws IOException {
final MatchResult.Metadata meta =
FileSystems.matchSingleFileSpec(scheme + ":" + key);
final Path tempFile = Files.createTempFile(tempDirectory, "", ".tmp");
tempFile.toFile().deleteOnExit();
try (ReadableByteChannel src = FileSystems.open(meta.resourceId());
WritableByteChannel dst =
Files.newByteChannel(
tempFile,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) {
ByteStreams.copy(src, dst);
}
return tempFile.toString();
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys)
throws IOException {
final List<String> specs =
Streams.stream(keys)
.map(e -> scheme + ":" + e)
.collect(Collectors.toList());
final List<MatchResult> matches =
FileSystems.match(specs, EmptyMatchTreatment.ALLOW);
final HashMap<String, String> result = new HashMap<>(specs.size());
final Iterator<String> specIter = specs.iterator();
final Iterator<MatchResult> matchIter = matches.iterator();
while (specIter.hasNext() && matchIter.hasNext()) {
final String spec = specIter.next();
final MatchResult match = matchIter.next();
for (MatchResult.Metadata meta : match.metadata()) {
try {
final Path tempFile =
Files.createTempFile(tempDirectory, "", ".tmp");
tempFile.toFile().deleteOnExit();
try (ReadableByteChannel src =
FileSystems.open(meta.resourceId());
WritableByteChannel dst =
Files.newByteChannel(
tempFile,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) {
ByteStreams.copy(src, dst);
}
result.put(
spec.substring(scheme.length() + 1), tempFile.toString());
} catch (IOException ex) {
continue;
}
}
}
return result;
}
@Override
public ListenableFuture<String> reload(String key, String oldValue)
throws IOException {
final ResourceId resourceId =
FileSystems.matchSingleFileSpec(scheme + ":" + key).resourceId();
try (ReadableByteChannel src = FileSystems.open(resourceId);
WritableByteChannel dst =
Files.newByteChannel(
Paths.get(oldValue),
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)) {
ByteStreams.copy(src, dst);
}
return Futures.immediateFuture(oldValue);
}
});
}
});
private long ttlMillis = 0L;
@Override
public void configure(Map<String, ?> configs) {
final Config cfg = new Config(configs);
ttlMillis = cfg.getLong(Config.TTL_MS_CONFIG);
}
@Override
public void close() throws IOException {}
@Override
public ConfigData get(String path) {
return new ConfigData(Collections.singletonMap(path, path));
}
@Override
public ConfigData get(String path, Set<String> keys) {
try {
return new ConfigData(CACHES.get(Arrays.asList(ttlMillis, path)).getAll(keys));
} catch (Exception ex) {
return new ConfigData(Collections.emptyMap());
}
}
}Note that the cache entries reload lazily so a Usage: Which copies the referenced objects to a temporary directory during substitution and then resolves to the local temporary file path. Example: |
ab4e147 to
5b618c0
Compare
…to support consumer factories which pull files from GCS.
…classes to support consumer factories which pull files from GCS." This reverts commit f8f69d9.
… sure to remove these later as they cannot stay.
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Outdated
Show resolved
Hide resolved
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Outdated
Show resolved
Hide resolved
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Outdated
Show resolved
Hide resolved
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Outdated
Show resolved
Hide resolved
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Outdated
Show resolved
Hide resolved
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Show resolved
Hide resolved
349a4aa to
dcd7d49
Compare
talatuyarer
left a comment
There was a problem hiding this comment.
@fozzie15 PR looks good to me. I saw there is a few missing tests you can cover.
- Missing Krb5Conf or Keytab -Test behavior when krb5.conf/keytab download fails
- Invalid path for krb5conf
- Malformed Jaas Config
Thank you for code changes. BTW are you going to create a seperate PR for KafkaTable ?
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Show resolved
Hide resolved
|
Reminder, please take a look at this pr: @ahmedabu98 @sjvanrossum |
johnjcasey
left a comment
There was a problem hiding this comment.
Based on the scope of this PR, it looks like a user would provide the KerberosConsumerFactoryFn to KafkaIO as a parameter, right?
Have we / can we test that this works in a pipeline as expected?
...ctories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java
Outdated
Show resolved
Hide resolved
Yes, this has been tested and it works as intended. The issue with adding an IT is the same as other GCP ITs (WithGCPApplicationDefaultCreds, for example) so I made sure manual tests worked. We can discuss these ITs in more detail offline. |
This PR allows users to use a custom ConsumerFactoryFn to create their kafka clients with specified configs from GCS and SecretManager. The first specific implementation is for kerberos, which will allow dataflow jobs to be configured to authenticate to Kafka clusters via Kerberos.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.