@@ -156,10 +156,6 @@ public static File randomPartitionLogDir(File parentDir) {
156
156
throw new RuntimeException ("Failed to create directory after 1000 attempts" );
157
157
}
158
158
159
- public static Properties createBrokerConfig (int nodeId , int port ) {
160
- return new BrokerConfigBuilder (nodeId ).withPort (port ).build ();
161
- }
162
-
163
159
public static MemoryRecords singletonRecords (byte [] value , byte [] key ) {
164
160
return singletonRecords (value , key , Compression .NONE , RecordBatch .NO_TIMESTAMP , RecordBatch .CURRENT_MAGIC_VALUE );
165
161
}
@@ -239,109 +235,4 @@ public static MemoryRecords records(List<SimpleRecord> records,
239
235
}
240
236
return builder .build ();
241
237
}
242
-
243
- public static class BrokerConfigBuilder {
244
- private final int nodeId ;
245
- private boolean enableControlledShutdown = true ;
246
- private boolean enableDeleteTopic = true ;
247
- private int port = -1 ;
248
- private Optional <SecurityProtocol > interBrokerSecurityProtocol = Optional .empty ();
249
- private Optional <File > trustStoreFile = Optional .empty ();
250
- private Optional <Properties > saslProperties = Optional .empty ();
251
- private boolean enablePlaintext = true ;
252
- private boolean enableSaslPlaintext = false ;
253
- private int saslPlaintextPort = -1 ;
254
- private boolean enableSsl = false ;
255
- private int sslPort = -1 ;
256
- private boolean enableSaslSsl = false ;
257
- private int saslSslPort = -1 ;
258
- private Optional <String > rack = Optional .empty ();
259
- private int logDirCount = 1 ;
260
- private int numPartitions = 1 ;
261
- private short defaultReplicationFactor = 1 ;
262
- private boolean enableFetchFromFollower = false ;
263
-
264
- public BrokerConfigBuilder (int nodeId ) {
265
- this .nodeId = nodeId ;
266
- }
267
-
268
- public BrokerConfigBuilder withPort (int port ) {
269
- this .enablePlaintext = true ;
270
- this .port = port ;
271
- return this ;
272
- }
273
-
274
- public BrokerConfigBuilder withSsl (int port , File trustStoreFile ) {
275
- this .enableSsl = true ;
276
- this .sslPort = port ;
277
- this .trustStoreFile = Optional .of (trustStoreFile );
278
- return this ;
279
- }
280
-
281
- public Properties build () {
282
- List <Map .Entry <SecurityProtocol , Integer >> protocolAndPorts = new ArrayList <>();
283
-
284
- if (enablePlaintext || (interBrokerSecurityProtocol .isPresent () && interBrokerSecurityProtocol .get () == SecurityProtocol .PLAINTEXT ))
285
- protocolAndPorts .add (new AbstractMap .SimpleEntry <>(SecurityProtocol .PLAINTEXT , port ));
286
- if (enableSsl || (interBrokerSecurityProtocol .isPresent () && interBrokerSecurityProtocol .get () == SecurityProtocol .SSL ))
287
- protocolAndPorts .add (new AbstractMap .SimpleEntry <>(SecurityProtocol .SSL , sslPort ));
288
- if (enableSaslPlaintext || (interBrokerSecurityProtocol .isPresent () && interBrokerSecurityProtocol .get () == SecurityProtocol .SASL_PLAINTEXT ))
289
- protocolAndPorts .add (new AbstractMap .SimpleEntry <>(SecurityProtocol .SASL_PLAINTEXT , saslPlaintextPort ));
290
- if (enableSaslSsl || (interBrokerSecurityProtocol .isPresent () && interBrokerSecurityProtocol .get () == SecurityProtocol .SASL_SSL ))
291
- protocolAndPorts .add (new AbstractMap .SimpleEntry <>(SecurityProtocol .SASL_SSL , saslSslPort ));
292
-
293
- String listeners = protocolAndPorts .stream ()
294
- .map (entry -> String .format ("%s://localhost:%d" , entry .getKey ().name (), entry .getValue ()))
295
- .collect (Collectors .joining ("," ));
296
-
297
- Properties props = new Properties ();
298
- props .put (ServerConfigs .UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG , "true" );
299
- props .put (ServerConfigs .UNSTABLE_API_VERSIONS_ENABLE_CONFIG , "true" );
300
- props .put (KRaftConfigs .SERVER_MAX_STARTUP_TIME_MS_CONFIG , String .valueOf (TimeUnit .MINUTES .toMillis (10 )));
301
- props .put (KRaftConfigs .NODE_ID_CONFIG , String .valueOf (nodeId ));
302
- props .put (ServerConfigs .BROKER_ID_CONFIG , String .valueOf (nodeId ));
303
- props .put (SocketServerConfigs .ADVERTISED_LISTENERS_CONFIG , listeners );
304
- props .put (SocketServerConfigs .LISTENERS_CONFIG , listeners );
305
- props .put (KRaftConfigs .CONTROLLER_LISTENER_NAMES_CONFIG , "CONTROLLER" );
306
-
307
- String securityProtocolMap = protocolAndPorts .stream ()
308
- .map (entry -> String .format ("%s:%s" , entry .getKey ().name (), entry .getKey ().name ()))
309
- .collect (Collectors .joining ("," )) + ",CONTROLLER:PLAINTEXT" ;
310
- props .put (SocketServerConfigs .LISTENER_SECURITY_PROTOCOL_MAP_CONFIG , securityProtocolMap );
311
-
312
- if (logDirCount > 1 ) {
313
- String logDirs = IntStream .range (0 , logDirCount )
314
- .mapToObj (i -> tempDirectory ().getAbsolutePath ())
315
- .collect (Collectors .joining ("," ));
316
- props .put (ServerLogConfigs .LOG_DIRS_CONFIG , logDirs );
317
- } else {
318
- props .put (ServerLogConfigs .LOG_DIR_CONFIG , tempDirectory ().getAbsolutePath ());
319
- }
320
-
321
- props .put (KRaftConfigs .PROCESS_ROLES_CONFIG , "broker" );
322
- props .put (QuorumConfig .QUORUM_VOTERS_CONFIG , "1000@localhost:0" );
323
- props .put (ServerConfigs .CONTROLLED_SHUTDOWN_ENABLE_CONFIG , String .valueOf (enableControlledShutdown ));
324
- props .put (ServerConfigs .DELETE_TOPIC_ENABLE_CONFIG , String .valueOf (enableDeleteTopic ));
325
-
326
- rack .ifPresent (r -> props .put (ServerConfigs .BROKER_RACK_CONFIG , r ));
327
-
328
- try {
329
- } catch (Exception e ) {
330
- throw new RuntimeException (e );
331
- }
332
-
333
- interBrokerSecurityProtocol .ifPresent (protocol ->
334
- props .put (ReplicationConfigs .INTER_BROKER_SECURITY_PROTOCOL_CONFIG , protocol .name ()));
335
-
336
- props .put (ServerLogConfigs .NUM_PARTITIONS_CONFIG , String .valueOf (numPartitions ));
337
- props .put (ReplicationConfigs .DEFAULT_REPLICATION_FACTOR_CONFIG , String .valueOf (defaultReplicationFactor ));
338
-
339
- if (enableFetchFromFollower ) {
340
- props .put (ServerConfigs .BROKER_RACK_CONFIG , String .valueOf (nodeId ));
341
- props .put (ReplicationConfigs .REPLICA_SELECTOR_CLASS_CONFIG , "org.apache.kafka.common.replica.RackAwareReplicaSelector" );
342
- }
343
-
344
- return props ;
345
- }
346
- }
347
238
}
0 commit comments