Skip to content

Commit 9ffb040

Browse files
committed
Merge pull request #24 from chris-zen/master
Added partitions and replicationFactor options to createCustomTopic
2 parents 8fd4506 + 85f4773 commit 9ffb040

File tree

3 files changed

+28
-6
lines changed

3 files changed

+28
-6
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ The `EmbeddedKafka` trait provides also some utility methods to interact with th
8181
8282
def consumeFirstMessageFrom(topic: String): String
8383

84-
def createCustomTopic(topic: String, topicConfig: Map[String,String]): Unit
84+
def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit
8585
8686
## Custom producers
8787

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,15 +242,19 @@ sealed trait EmbeddedKafkaSupport {
242242
/**
243243
* Creates a topic with a custom configuration
244244
*
245-
* @param topic the topic name
246-
* @param topicConfig per topic configuration [[Map]]
247-
* @param config an implicit [[EmbeddedKafkaConfig]]
245+
* @param topic the topic name
246+
* @param topicConfig per topic configuration [[Map]]
247+
* @param partitions number of partitions [[Int]]
248+
* @param replicationFactor replication factor [[Int]]
249+
* @param config an implicit [[EmbeddedKafkaConfig]]
248250
*/
249-
def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty)(implicit config: EmbeddedKafkaConfig): Unit = {
251+
def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty,
252+
partitions: Int = 1, replicationFactor: Int = 1)(implicit config: EmbeddedKafkaConfig): Unit = {
253+
250254
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
251255
val topicProperties = topicConfig.foldLeft(new Properties){case (props, (k,v)) => props.put(k,v); props}
252256

253-
try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicProperties) finally zkUtils.close()
257+
try AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties) finally zkUtils.close()
254258
}
255259

256260
}

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,24 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
121121

122122
}
123123
}
124+
125+
"create a topic with custom number of partitions" in {
126+
implicit val config = EmbeddedKafkaConfig()
127+
val topic = "test_custom_topic"
128+
129+
withRunningKafka {
130+
131+
createCustomTopic(topic, Map("cleanup.policy"->"compact"), partitions = 2)
132+
133+
val zkSessionTimeoutMs = 10000
134+
val zkConnectionTimeoutMs = 10000
135+
val zkSecurityEnabled = false
136+
137+
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
138+
try { AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata.size shouldBe 2 } finally zkUtils.close()
139+
140+
}
141+
}
124142
}
125143

126144
"the consumeFirstStringMessageFrom method" should {

0 commit comments

Comments
 (0)