Skip to content

Commit 62a3294

Browse files
4.0 Build fixes (#102)
* Fix 4.0 builds * Fix test cases * Fix CI * Fix Checkstyles * Remove comments --------- Co-authored-by: mukesh-ctds <[email protected]>
1 parent 768812a commit 62a3294

File tree

16 files changed

+142
-107
lines changed

16 files changed

+142
-107
lines changed

ci/init_hydra_oauth_server.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ wait_for_url() {
3333
}
3434

3535
# Start hydra server
36-
docker-compose -f ci/hydra/docker-compose.yml up -d
36+
docker compose -f ci/hydra/docker-compose.yml up -d
3737

3838
# Wait until the hydra server started
3939
wait_for_url "http://localhost:4445/clients" "Waiting for Hydra admin REST to start"

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import lombok.extern.slf4j.Slf4j;
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.pulsar.broker.PulsarService;
29+
import org.apache.pulsar.broker.namespace.TopicExistsInfo;
2930
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
3031
import org.apache.pulsar.common.naming.TopicName;
3132
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
@@ -152,7 +153,9 @@ public CompletableFuture<Boolean> isTopicExists(final String topic) {
152153
}
153154

154155
protected CompletableFuture<Boolean> internalCheckTopicExists(TopicName topicName) {
155-
return this.pulsar.getNamespaceService().checkTopicExists(topicName);
156+
return pulsar.getNamespaceService()
157+
.checkTopicExists(topicName)
158+
.thenApplyAsync(TopicExistsInfo::isExists);
156159
}
157160

158161

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import java.net.InetSocketAddress;
1717
import java.util.concurrent.CompletableFuture;
1818
import lombok.extern.slf4j.Slf4j;
19-
import org.apache.commons.lang3.tuple.Pair;
2019
import org.apache.pulsar.broker.PulsarService;
20+
import org.apache.pulsar.client.impl.LookupTopicResult;
2121
import org.apache.pulsar.common.naming.TopicName;
2222

2323
/**
@@ -31,6 +31,7 @@ public LookupClient(final PulsarService pulsarService, final KafkaServiceConfigu
3131
}
3232

3333
public CompletableFuture<InetSocketAddress> getBrokerAddress(final TopicName topicName) {
34-
return getPulsarClient().getLookup().getBroker(topicName).thenApply(Pair::getLeft);
34+
return getPulsarClient().getLookup().getBroker(topicName)
35+
.thenApplyAsync(LookupTopicResult::getLogicalAddress);
3536
}
3637
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,9 @@ public CompletableFuture<Map<TopicPartition, Errors>> storeOffsets(
485485
filteredOffsetMetadata.forEach((tp, offsetAndMetadata) -> {
486486
CommitRecordMetadataAndOffset commitRecordMetadataAndOffset =
487487
new CommitRecordMetadataAndOffset(
488-
Optional.of(PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId())),
488+
Optional.of(PositionFactory.create(
489+
lastMessageId.getLedgerId(),
490+
lastMessageId.getEntryId())),
489491
offsetAndMetadata
490492
);
491493
if (isTxnOffsetCommit) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void updateProducerStats(final TopicPartition topicPartition,
8383
final RequestStats requestStats,
8484
final Producer producer) {
8585
final int numBytes = encodedByteBuf.readableBytes();
86-
producer.updateRates(numMessages, numBytes);
87-
producer.getTopic().incrementPublishCount(numMessages, numBytes);
86+
producer.updateRates();
87+
producer.incrementThrottleCount();
8888

8989
final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLoggerForTopicPartition(topicPartition);
9090

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
import org.apache.pulsar.common.naming.TopicName;
9191
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
9292
import org.apache.pulsar.common.util.FutureUtil;
93-
import org.jetbrains.annotations.NotNull;
9493

9594
/**
9695
* Analyze result.

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/OpFindNewestEntry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.bookkeeper.mledger.Entry;
2121
import org.apache.bookkeeper.mledger.ManagedLedgerException;
2222
import org.apache.bookkeeper.mledger.Position;
23-
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
2423
import org.apache.bookkeeper.mledger.PositionBound;
24+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
2525

2626
/**
2727
* Used to find Entry/Offset from ManagedLedger.

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ssl/SSLUtils.java

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,19 @@
2626
import java.util.Arrays;
2727
import java.util.Map;
2828
import java.util.Set;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.TimeUnit;
2932
import javax.net.ssl.SSLEngine;
33+
import lombok.SneakyThrows;
3034
import lombok.extern.slf4j.Slf4j;
3135
import org.apache.commons.lang.StringUtils;
3236
import org.apache.kafka.common.config.SslConfigs;
3337
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
3438
import org.apache.kafka.common.config.types.Password;
35-
import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
36-
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
39+
import org.apache.pulsar.common.util.PulsarSslConfiguration;
40+
import org.apache.pulsar.common.util.PulsarSslFactory;
41+
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
3742
import org.eclipse.jetty.util.ssl.SslContextFactory;
3843

3944
/**
@@ -358,76 +363,100 @@ public static SslContextFactory.Client createClientSslContextFactory(Map<String,
358363
return ssl;
359364
}
360365

361-
public static NettyServerSslContextBuilder buildNettyServerSslContextBuilder(
366+
public static PulsarSslFactory buildNettyServerSslContextBuilder(
362367
KafkaServiceConfiguration serviceConfig) {
363368
try {
364369
SslProvider sslProvider = null;
365370
if (serviceConfig.getTlsProvider() != null) {
366371
sslProvider = SslProvider.valueOf(serviceConfig.getTlsProvider());
367372
}
368373

369-
return new NettyServerSslContextBuilder(
370-
sslProvider,
371-
serviceConfig.isTlsAllowInsecureConnection(),
372-
serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(),
373-
serviceConfig.getTlsKeyFilePath(), serviceConfig.getTlsCiphers(),
374-
serviceConfig.getTlsProtocols(),
375-
serviceConfig.isTlsRequireTrustedClientCertOnConnect(),
376-
serviceConfig.getTlsCertRefreshCheckDurationSec());
374+
PulsarSslConfiguration configuration = buildSslConfiguration(serviceConfig);
375+
var sslFactory = (PulsarSslFactory) Class.forName(serviceConfig.getSslFactoryPlugin())
376+
.getConstructor().newInstance();
377+
sslFactory.initialize(configuration);
378+
sslFactory.createInternalSslContext();
379+
380+
return sslFactory;
377381
} catch (Throwable t) {
378382
throw new RuntimeException(t);
379383
}
380384
}
381385

382386

387+
public static PulsarSslConfiguration buildSslConfiguration(KafkaServiceConfiguration serviceConfig) {
388+
return PulsarSslConfiguration.builder()
389+
.tlsKeyStoreType(serviceConfig.getTlsKeyStoreType())
390+
.tlsKeyStorePath(serviceConfig.getTlsKeyStore())
391+
.tlsKeyStorePassword(serviceConfig.getTlsKeyStorePassword())
392+
.tlsTrustStoreType(serviceConfig.getTlsTrustStoreType())
393+
.tlsTrustStorePath(serviceConfig.getTlsTrustStore())
394+
.tlsTrustStorePassword(serviceConfig.getTlsTrustStorePassword())
395+
.tlsCiphers(serviceConfig.getTlsCiphers())
396+
.tlsProtocols(serviceConfig.getTlsProtocols())
397+
.tlsTrustCertsFilePath(serviceConfig.getTlsTrustCertsFilePath())
398+
.tlsCertificateFilePath(serviceConfig.getTlsCertificateFilePath())
399+
.tlsKeyFilePath(serviceConfig.getTlsKeyFilePath())
400+
.allowInsecureConnection(serviceConfig.isTlsAllowInsecureConnection())
401+
.requireTrustedClientCertOnConnect(serviceConfig.isTlsRequireTrustedClientCertOnConnect())
402+
.tlsEnabledWithKeystore(serviceConfig.isTlsEnabledWithKeyStore())
403+
.tlsCustomParams(serviceConfig.getSslFactoryPluginParams())
404+
.serverMode(true)
405+
.build();
406+
}
407+
408+
public static void refreshSslContext(PulsarSslFactory sslFactory) {
409+
try {
410+
sslFactory.update();
411+
} catch (Exception e) {
412+
log.error("Failed to refresh SSL context", e);
413+
}
414+
}
415+
383416
public static final class ServerSideTLSSupport {
384417
private final SslContextFactory.Server sslContextFactory;
385-
private final NettyServerSslContextBuilder serverSslCtxRefresher;
386-
private final boolean tlsEnabledWithKeyStore;
387-
private NettySSLContextAutoRefreshBuilder serverSSLContextAutoRefreshBuilder;
418+
private PulsarSslFactory sslFactory;
419+
private static final ScheduledExecutorService executor =
420+
Executors.newScheduledThreadPool(1);
388421

422+
@SneakyThrows
389423
public ServerSideTLSSupport(KafkaServiceConfiguration kafkaConfig) {
390-
this.tlsEnabledWithKeyStore = kafkaConfig.isTlsEnabledWithKeyStore();
391424
if (!StringUtils.isEmpty(kafkaConfig.getKopSslKeystoreLocation())) {
392425
// KOP mode
393426
sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig);
394-
serverSSLContextAutoRefreshBuilder = null;
395-
serverSslCtxRefresher = null;
396-
} else if (tlsEnabledWithKeyStore) { // Pulsar mode - tlsEnabledWithKeyStore=true
397-
serverSSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
398-
kafkaConfig.getTlsProvider(),
399-
kafkaConfig.getTlsKeyStoreType(),
400-
kafkaConfig.getTlsKeyStore(),
401-
kafkaConfig.getTlsKeyStorePassword(),
402-
kafkaConfig.isTlsAllowInsecureConnection(),
403-
kafkaConfig.getTlsTrustStoreType(),
404-
kafkaConfig.getTlsTrustStore(),
405-
kafkaConfig.getTlsTrustStorePassword(),
406-
kafkaConfig.isTlsRequireTrustedClientCertOnConnect(),
407-
kafkaConfig.getTlsCiphers(),
408-
kafkaConfig.getTlsProtocols(),
409-
kafkaConfig.getTlsCertRefreshCheckDurationSec());
410-
serverSslCtxRefresher = null;
411-
sslContextFactory = null;
427+
} else if (kafkaConfig.isTlsEnabledWithKeyStore()) { // Pulsar mode - tlsEnabledWithKeyStore=true
428+
429+
PulsarSslConfiguration sslConfiguration = buildSslConfiguration(kafkaConfig);
430+
this.sslFactory = (PulsarSslFactory) Class.forName(kafkaConfig.getSslFactoryPlugin())
431+
.getConstructor().newInstance();
432+
this.sslFactory.initialize(sslConfiguration);
433+
this.sslFactory.createInternalSslContext();
434+
if (kafkaConfig.getTlsCertRefreshCheckDurationSec() > 0) {
435+
executor.scheduleWithFixedDelay(() -> refreshSslContext(sslFactory),
436+
kafkaConfig.getTlsCertRefreshCheckDurationSec(),
437+
kafkaConfig.getTlsCertRefreshCheckDurationSec(),
438+
TimeUnit.SECONDS);
439+
}
440+
sslContextFactory =
441+
JettySslContextFactory.createSslContextFactory(kafkaConfig.getWebServiceTlsProvider(),
442+
this.sslFactory, kafkaConfig.isTlsRequireTrustedClientCertOnConnect(),
443+
kafkaConfig.getTlsCiphers(), kafkaConfig.getTlsProtocols());
444+
412445
} else { // Pulsar mode - tlsEnabledWithKeyStore=false
413446
sslContextFactory = null;
414-
serverSSLContextAutoRefreshBuilder = null;
415-
serverSslCtxRefresher = SSLUtils.buildNettyServerSslContextBuilder(kafkaConfig);
447+
sslFactory = SSLUtils.buildNettyServerSslContextBuilder(kafkaConfig);
416448
}
417449
}
418450

419451
public void addTlsHandler(SocketChannel ch){
420452
try {
421453
if (sslContextFactory != null) {
422454
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(createSslEngine(sslContextFactory)));
423-
} else if (serverSslCtxRefresher != null) {
424-
SslContext sslContext = serverSslCtxRefresher.get();
455+
} else {
456+
SslContext sslContext = sslFactory.getInternalNettySslContext();
425457
if (sslContext != null) {
426458
ch.pipeline().addLast(TLS_HANDLER, sslContext.newHandler(ch.alloc()));
427459
}
428-
} else if (tlsEnabledWithKeyStore && serverSSLContextAutoRefreshBuilder != null) {
429-
ch.pipeline().addLast(TLS_HANDLER,
430-
new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
431460
}
432461
} catch (Exception err) {
433462
throw new RuntimeException(err);

kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataTest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.util.concurrent.CompletableFuture;
3737
import java.util.function.Supplier;
3838
import lombok.val;
39-
import org.apache.bookkeeper.mledger.impl.PositionImpl;
39+
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
4040
import org.apache.kafka.common.TopicPartition;
4141
import org.testng.annotations.BeforeMethod;
4242
import org.testng.annotations.Test;
@@ -400,7 +400,7 @@ public void testOffsetCommit() {
400400

401401
group.onOffsetCommitAppend(
402402
partition,
403-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, commitRecordOffset)),
403+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, commitRecordOffset)),
404404
OffsetAndMetadata.apply(37)));
405405
assertEquals(group.numPendingOffsetCommits(), 0);
406406
assertEquals(group.numOffsets(), 1);
@@ -416,23 +416,23 @@ public void testOffsetIncreasingCommit() {
416416
pendingOffsetCommits.put(topicPartition, OffsetAndMetadata.apply(offset));
417417
group.prepareOffsetCommit(pendingOffsetCommits);
418418
group.onOffsetCommitAppend(topicPartition,
419-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 1000)),
419+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 1000)),
420420
OffsetAndMetadata.apply(offset)));
421421
assertEquals(group.offset(topicPartition, NAMESPACE_PREFIX).get().offset(), offset);
422422

423423
offset = 21L;
424424
pendingOffsetCommits.put(topicPartition, OffsetAndMetadata.apply(offset));
425425
group.prepareOffsetCommit(pendingOffsetCommits);
426426
group.onOffsetCommitAppend(topicPartition,
427-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 1001)),
427+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 1001)),
428428
OffsetAndMetadata.apply(offset)));
429429
assertEquals(group.offset(topicPartition, NAMESPACE_PREFIX).get().offset(), offset);
430430

431431
offset = 22L;
432432
pendingOffsetCommits.put(topicPartition, OffsetAndMetadata.apply(offset));
433433
group.prepareOffsetCommit(pendingOffsetCommits);
434434
group.onOffsetCommitAppend(topicPartition,
435-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 999)),
435+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 999)),
436436
OffsetAndMetadata.apply(offset)));
437437
assertNotEquals(group.offset(topicPartition, NAMESPACE_PREFIX).get().offset(), offset);
438438
}
@@ -475,7 +475,7 @@ public void testOffsetCommitFailureWithAnotherPending() {
475475
assertEquals(Optional.empty(), group.offset(partition, NAMESPACE_PREFIX));
476476

477477
group.onOffsetCommitAppend(partition,
478-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 3L)), secondOffset));
478+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 3L)), secondOffset));
479479
assertTrue(group.hasOffsets());
480480
assertEquals(Optional.of(secondOffset), group.offset(partition, NAMESPACE_PREFIX));
481481
}
@@ -498,12 +498,12 @@ public void testOffsetCommitWithAnotherPending() {
498498
assertTrue(group.hasOffsets());
499499

500500
group.onOffsetCommitAppend(partition,
501-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 4L)), firstOffset));
501+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 4L)), firstOffset));
502502
assertTrue(group.hasOffsets());
503503
assertEquals(Optional.of(firstOffset), group.offset(partition, NAMESPACE_PREFIX));
504504

505505
group.onOffsetCommitAppend(partition,
506-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 5L)), secondOffset));
506+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 5L)), secondOffset));
507507
assertTrue(group.hasOffsets());
508508
assertEquals(Optional.of(secondOffset), group.offset(partition, NAMESPACE_PREFIX));
509509
}
@@ -527,10 +527,10 @@ public void testConsumerBeatsTransactionalOffsetCommit() {
527527
assertTrue(group.hasOffsets());
528528

529529
group.onTxnOffsetCommitAppend(producerId, partition,
530-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 3L)),
530+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 3L)),
531531
OffsetAndMetadata.apply(37)));
532532
group.onOffsetCommitAppend(partition,
533-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 4L)), consumerOffsetCommit));
533+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 4L)), consumerOffsetCommit));
534534
assertTrue(group.hasOffsets());
535535
assertEquals(Optional.of(consumerOffsetCommit), group.offset(partition, NAMESPACE_PREFIX));
536536

@@ -562,10 +562,10 @@ public void testTransactionBeatsConsumerOffsetCommit() {
562562
assertTrue(group.hasOffsets());
563563

564564
group.onOffsetCommitAppend(
565-
partition, new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 3L)),
565+
partition, new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 3L)),
566566
consumerOffsetCommit));
567567
group.onTxnOffsetCommitAppend(producerId, partition,
568-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 4L)), txnOffsetCommit));
568+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 4L)), txnOffsetCommit));
569569
assertTrue(group.hasOffsets());
570570
// The transactional offset commit hasn't been committed yet, so we should materialize
571571
// the consumer offset commit.
@@ -597,9 +597,9 @@ public void testTransactionalCommitIsAbortedAndConsumerCommitWins() {
597597
assertTrue(group.hasOffsets());
598598

599599
group.onOffsetCommitAppend(partition,
600-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 3L)), consumerOffsetCommit));
600+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 3L)), consumerOffsetCommit));
601601
group.onTxnOffsetCommitAppend(producerId, partition,
602-
new CommitRecordMetadataAndOffset(Optional.of(new PositionImpl(1000, 4L)), txnOffsetCommit));
602+
new CommitRecordMetadataAndOffset(Optional.of(new ImmutablePositionImpl(1000, 4L)), txnOffsetCommit));
603603
assertTrue(group.hasOffsets());
604604
// The transactional offset commit hasn't been committed yet, so we should materialize the consumer
605605
// offset commit.

0 commit comments

Comments
 (0)