Skip to content

Commit 3db4951

Browse files
authored
Update pulsar dependency to 2.10.3 (#144)
1 parent ea34459 commit 3db4951

File tree

6 files changed

+33
-16
lines changed

6 files changed

+33
-16
lines changed

agent/src/main/java/com/datastax/oss/cdc/agent/AbstractPulsarMutationSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public void initialize(AgentConfig config) throws PulsarClientException {
9292
try {
9393
ClientBuilder clientBuilder = PulsarClient.builder()
9494
.serviceUrl(config.pulsarServiceUrl)
95+
.memoryLimit(config.pulsarMemoryLimitBytes, SizeUnit.BYTES)
9596
.enableTcpNoDelay(false);
9697

9798
if (config.pulsarServiceUrl.startsWith("pulsar+ssl://")) {
@@ -197,7 +198,6 @@ public Producer<KeyValue<byte[], MutationValue>> getProducer(final TableInfo tm)
197198
.hashingScheme(HashingScheme.Murmur3_32Hash)
198199
.blockIfQueueFull(true)
199200
.maxPendingMessages(config.pulsarMaxPendingMessages)
200-
.maxPendingMessagesAcrossPartitions(config.pulsarMaxPendingMessagesAcrossPartitions)
201201
.autoUpdatePartitions(true);
202202

203203
if (config.pulsarBatchDelayInMs > 0) {

agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -297,13 +297,13 @@ public static long getEnvAsLong(String varName, long defaultValue) {
297297
1000, "CDC_PULSAR_MAX_PENDING_MESSAGES", Setting::getEnvAsInteger,
298298
"Integer", "pulsar", 4);
299299

300-
public static final String PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS= "pulsarMaxPendingMessagesAcrossPartitions";
301-
public int pulsarMaxPendingMessagesAcrossPartitions;
302-
public static final Setting<Integer> PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING =
303-
new Setting<>(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS, Platform.PULSAR, (c, s) -> c.pulsarMaxPendingMessagesAcrossPartitions = Integer.parseInt(s), c -> c.pulsarMaxPendingMessagesAcrossPartitions,
304-
"The Pulsar maximum number of pending messages across partitions.",
305-
50000, "CDC_PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS", Setting::getEnvAsInteger,
306-
"Integer", "pulsar", 5);
300+
public static final String PULSAR_MEMORY_LIMIT_BYTES= "pulsarMemoryLimitBytes";
301+
public long pulsarMemoryLimitBytes;
302+
public static final Setting<Long> PULSAR_MEMORY_LIMIT_BYTES_SETTING =
303+
new Setting<>(PULSAR_MEMORY_LIMIT_BYTES, Platform.PULSAR, (c, s) -> c.pulsarMemoryLimitBytes = Long.parseLong(s), c -> c.pulsarMemoryLimitBytes,
304+
"Limit of client memory usage (in bytes). The 0 default means memory limit is disabled.",
305+
0L, "CDC_PULSAR_MEMORY_LIMIT_BYTES", Setting::getEnvAsLong,
306+
"Long", "pulsar", 5);
307307

308308
public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = "pulsarAuthPluginClassName";
309309
public String pulsarAuthPluginClassName;
@@ -349,9 +349,9 @@ public static long getEnvAsLong(String varName, long defaultValue) {
349349
set.add(PULSAR_BATCH_BATCH_DELAY_IN_MS_SETTING);
350350
set.add(PULSAR_KEY_BASED_BATCHER_SETTING);
351351
set.add(PULSAR_MAX_PENDING_MESSAGES_SETTING);
352-
set.add(PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING);
353352
set.add(PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING);
354353
set.add(PULSAR_AUTH_PARAMS_SETTING);
354+
set.add(PULSAR_MEMORY_LIMIT_BYTES_SETTING);
355355
settings = Collections.unmodifiableSet(set);
356356

357357
Map<String, Setting<?>> map = new HashMap<>();
@@ -382,9 +382,9 @@ public AgentConfig() {
382382
this.pulsarBatchDelayInMs = PULSAR_BATCH_BATCH_DELAY_IN_MS_SETTING.initDefault();
383383
this.pulsarKeyBasedBatcher = PULSAR_KEY_BASED_BATCHER_SETTING.initDefault();
384384
this.pulsarMaxPendingMessages = PULSAR_MAX_PENDING_MESSAGES_SETTING.initDefault();
385-
this.pulsarMaxPendingMessagesAcrossPartitions = PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS_SETTING.initDefault();
386385
this.pulsarAuthPluginClassName = PULSAR_AUTH_PLUGIN_CLASS_NAME_SETTING.initDefault();
387386
this.pulsarAuthParams = PULSAR_AUTH_PARAMS_SETTING.initDefault();
387+
this.pulsarMemoryLimitBytes = PULSAR_MEMORY_LIMIT_BYTES_SETTING.initDefault();
388388
}
389389

390390
public static void main(String[] args) {
@@ -509,7 +509,10 @@ public AgentConfig configure(Platform platform, Map<String, Object> agentParamet
509509
throw new IllegalArgumentException(String.format("Unsupported parameter '%s' for the %s platform ", key, platform));
510510
}
511511
setting.initializer.apply(this, value);
512-
} else {
512+
} else if ("pulsarMaxPendingMessagesAcrossPartitions".equals(key)) {
513+
log.warn("The 'pulsarMaxPendingMessagesAcrossPartitions' parameter is deprecated, the config will be ignored");
514+
}
515+
else {
513516
throw new RuntimeException(String.format("Unknown parameter '%s'", key));
514517
}
515518
}

agent/src/test/java/com/datastax/oss/cdc/agent/AgentParametersTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,11 @@ void assertCommonConfig(AgentConfig config) {
7171
public void testConfigurePulsar() {
7272
String agentArgs = COMMON_CONFIG +
7373
PULSAR_SERVICE_URL + "=pulsar+ssl://mypulsar:6650\\,localhost:6651\\,localhost:6652," +
74+
PULSAR_MEMORY_LIMIT_BYTES + "=64," +
7475
PULSAR_BATCH_DELAY_IN_MS + "=20," +
7576
PULSAR_KEY_BASED_BATCHER + "=true," +
7677
PULSAR_MAX_PENDING_MESSAGES + "=20," +
77-
PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS + "=200," +
78+
"pulsarMaxPendingMessagesAcrossPartitions" + "=200," + // make sure if someone is passing the deprecated parameter, AgentConfig will not fail
7879
PULSAR_AUTH_PLUGIN_CLASS_NAME + "=MyAuthPlugin," +
7980
PULSAR_AUTH_PARAMS + "=x:y\\,z:t," +
8081
SSL_ALLOW_INSECURE_CONNECTION + "=true," +
@@ -91,7 +92,7 @@ public void testConfigurePulsar() {
9192
assertEquals(20L, config.pulsarBatchDelayInMs);
9293
assertTrue(config.pulsarKeyBasedBatcher);
9394
assertEquals(20, config.pulsarMaxPendingMessages);
94-
assertEquals(200, config.pulsarMaxPendingMessagesAcrossPartitions);
95+
assertEquals(64L, config.pulsarMemoryLimitBytes);
9596

9697
// Pulsar Auth
9798
assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName);
@@ -107,9 +108,11 @@ public void testConfigurePulsarFromMap() {
107108
tenantInfo.put(SSL_ALLOW_INSECURE_CONNECTION, "true");
108109
tenantInfo.put(SSL_HOSTNAME_VERIFICATION_ENABLE, "true");
109110
tenantInfo.put(TLS_TRUST_CERTS_FILE_PATH, "/test.p12");
111+
tenantInfo.put(PULSAR_MEMORY_LIMIT_BYTES, "64");
110112

111113
AgentConfig config = AgentConfig.create(Platform.PULSAR, tenantInfo);
112114
assertEquals("pulsar+ssl://mypulsar:6650,localhost:6651,localhost:6652", config.pulsarServiceUrl);
115+
assertEquals(64L, config.pulsarMemoryLimitBytes);
113116

114117
// Pulsar Auth
115118
assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName);
@@ -129,6 +132,7 @@ public void testConfigurePulsarFromMap() {
129132
@SetEnvironmentVariable(key = "CDC_PULSAR_BATCH_DELAY_IN_MS", value = "555")
130133
@SetEnvironmentVariable(key = "CDC_PULSAR_KEY_BASED_BATCHER", value = "true")
131134
@SetEnvironmentVariable(key = "CDC_PULSAR_MAX_PENDING_MESSAGES", value = "555")
135+
@SetEnvironmentVariable(key = "CDC_PULSAR_MEMORY_LIMIT_BYTES", value = "64")
132136
public void testConfigurePulsarFromEnvVar() {
133137
AgentConfig config = AgentConfig.create(Platform.PULSAR, "");
134138
assertEquals("pulsar+ssl://mypulsar:6650,localhost:6651,localhost:6652", config.pulsarServiceUrl);
@@ -139,6 +143,7 @@ public void testConfigurePulsarFromEnvVar() {
139143
assertEquals(555L, config.pulsarBatchDelayInMs);
140144
assertEquals(true, config.pulsarKeyBasedBatcher);
141145
assertEquals(555, config.pulsarMaxPendingMessages);
146+
assertEquals(64L, config.pulsarMemoryLimitBytes);
142147

143148
// Pulsar Auth
144149
assertEquals("MyAuthPlugin", config.pulsarAuthPluginClassName);

connector/build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ dependencies {
7171

7272
testImplementation "org.testcontainers:testcontainers:${testContainersVersion}"
7373
testImplementation project(':testcontainers')
74-
testImplementation("${pulsarGroup}:pulsar-client:${pulsarVersion}")
74+
testImplementation("${pulsarGroup}:pulsar-client") {
75+
version {
76+
strictly("${pulsarVersion}")
77+
}
78+
}
7579

7680
nar "${pulsarGroup}:pulsar-io:${pulsarVersion}"
7781
}

connector/gradle.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
artifact=connector
2+
3+
# TODO: remove this to fall back to the oveerall client used in the project (2.20.3) once the following issue is fixed:
4+
# https://github.com/apache/pulsar/issues/20092
5+
pulsarVersion=2.8.3

gradle.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ cassandra4Version=4.0.4
1515
dse4Version=6.8.23
1616

1717
pulsarGroup=org.apache.pulsar
18-
pulsarVersion=2.8.3
18+
pulsarVersion=2.10.3
19+
# Used when running tests locally, CI will override those values
1920
testPulsarImage=datastax/lunastreaming
20-
testPulsarImageTag=2.8.0_1.1.42
21+
testPulsarImageTag=2.10_3.4
2122

2223
kafkaVersion=3.4.0
2324
vavrVersion=0.10.3

0 commit comments

Comments
 (0)