Skip to content

Commit 932a777

Browse files
committed
feat(admin): create topics
1 parent 17451f5 commit 932a777

File tree

10 files changed

+465
-42
lines changed

10 files changed

+465
-42
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ To be able to run tests locally, `$KAFKA_TEST_BROKER` environment variable is ex
222222

223223
`$KAFKA_TEST_BROKER` should contain an IP address of an accessible Kafka broker that will be used to run integration tests against.
224224

225-
With [Docker Compose](./docker-compose.yml) this variable is used to configure Kafka broker to listen on this address:
225+
With [Docker Compose](./docker-compose.yml) this variable is used to configure a Kafka broker with a UI on localhost:8080 to listen on this address:
226226

227227
```
228228
$ docker-compose up

docker-compose.yml

Lines changed: 117 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,121 @@
1-
version: "3.8"
2-
1+
version: '3.7'
32
services:
4-
zookeeper:
5-
image: confluentinc/cp-zookeeper
6-
hostname: zookeeper
7-
ports:
8-
- 2182:2181
9-
environment:
10-
SERVICE_NAME: zookeeper
11-
ZOOKEEPER_CLIENT_PORT: 2181
12-
13-
kafka:
14-
image: confluentinc/cp-kafka:latest
15-
hostname: localhost
3+
# Redpanda cluster
4+
redpanda-1:
5+
image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
6+
container_name: redpanda-1
7+
command:
8+
- redpanda
9+
- start
10+
- --smp
11+
- '1'
12+
- --reserve-memory
13+
- 0M
14+
- --overprovisioned
15+
- --node-id
16+
- '1'
17+
- --kafka-addr
18+
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
19+
- --advertise-kafka-addr
20+
- PLAINTEXT://redpanda-1:29092,OUTSIDE://localhost:9092
21+
- --pandaproxy-addr
22+
- PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082
23+
- --advertise-pandaproxy-addr
24+
- PLAINTEXT://redpanda-1:28082,OUTSIDE://localhost:8082
25+
- --rpc-addr
26+
- 0.0.0.0:33145
27+
- --advertise-rpc-addr
28+
- redpanda-1:33145
1629
ports:
30+
# - 8081:8081
31+
- 8082:8082
1732
- 9092:9092
18-
links:
19-
- zookeeper:zookeeper
33+
- 9644:9644
34+
- 28082:28082
35+
- 29092:29092
36+
37+
# Want a two node Redpanda cluster? Uncomment this block :)
38+
# redpanda-2:
39+
# image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
40+
# container_name: redpanda-2
41+
# command:
42+
# - redpanda
43+
# - start
44+
# - --smp
45+
# - '1'
46+
# - --reserve-memory
47+
# - 0M
48+
# - --overprovisioned
49+
# - --node-id
50+
# - '2'
51+
# - --seeds
52+
# - redpanda-1:33145
53+
# - --kafka-addr
54+
# - PLAINTEXT://0.0.0.0:29093,OUTSIDE://0.0.0.0:9093
55+
# - --advertise-kafka-addr
56+
# - PLAINTEXT://redpanda-2:29093,OUTSIDE://localhost:9093
57+
# - --pandaproxy-addr
58+
# - PLAINTEXT://0.0.0.0:28083,OUTSIDE://0.0.0.0:8083
59+
# - --advertise-pandaproxy-addr
60+
# - PLAINTEXT://redpanda-2:28083,OUTSIDE://localhost:8083
61+
# - --rpc-addr
62+
# - 0.0.0.0:33146
63+
# - --advertise-rpc-addr
64+
# - redpanda-2:33146
65+
# ports:
66+
# - 8083:8083
67+
# - 9093:9093
68+
69+
# redpanda-3:
70+
# image: docker.redpanda.com/redpandadata/redpanda:v23.1.1
71+
# container_name: redpanda-3
72+
# command:
73+
# - redpanda
74+
# - start
75+
# - --smp
76+
# - '2'
77+
# - --reserve-memory
78+
# - 0M
79+
# - --overprovisioned
80+
# - --node-id
81+
# - '3'
82+
# - --seeds
83+
# - redpanda-1:33145
84+
# - --kafka-addr
85+
# - PLAINTEXT://0.0.0.0:29094,OUTSIDE://0.0.0.0:9094
86+
# - --advertise-kafka-addr
87+
# - PLAINTEXT://redpanda-3:29094,OUTSIDE://localhost:9094
88+
# - --pandaproxy-addr
89+
# - PLAINTEXT://0.0.0.0:28084,OUTSIDE://0.0.0.0:8084
90+
# - --advertise-pandaproxy-addr
91+
# - PLAINTEXT://redpanda-3:28084,OUTSIDE://localhost:8084
92+
# - --rpc-addr
93+
# - 0.0.0.0:33147
94+
# - --advertise-rpc-addr
95+
# - redpanda-3:33147
96+
# ports:
97+
# - 8084:8084
98+
# - 9094:9094
99+
100+
redpanda-console:
101+
image: docker.redpanda.com/redpandadata/console:v2.2.2
102+
container_name: redpanda-console
103+
entrypoint: /bin/sh
104+
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
20105
environment:
21-
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
22-
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://$KAFKA_TEST_BROKER:9092"
23-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
24-
KAFKA_CREATE_TOPICS:
106+
CONFIG_FILEPATH: /tmp/config.yml
107+
CONSOLE_CONFIG_FILE: |
108+
kafka:
109+
brokers: ["redpanda-1:29092"]
110+
schemaRegistry:
111+
enabled: false
112+
redpanda:
113+
adminApi:
114+
enabled: true
115+
urls: ["http://redpanda-1:9644"]
116+
connect:
117+
enabled: false
118+
ports:
119+
- 8080:8080
120+
depends_on:
121+
- redpanda-1

hw-kafka-client.cabal

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ library
5555
build-tool-depends: c2hs:c2hs
5656
if impl(ghc <8.0)
5757
build-depends: semigroups
58-
exposed-modules: Kafka.Consumer
58+
exposed-modules: Kafka.Admin
59+
Kafka.Admin.AdminProperties
60+
Kafka.Admin.Types
61+
Kafka.Consumer
5962
Kafka.Consumer.ConsumerProperties
6063
Kafka.Consumer.Subscription
6164
Kafka.Consumer.Types

src/Kafka/Admin.hs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
module Kafka.Admin(
2+
module X
3+
, newKAdmin
4+
, createTopic
5+
, closeKAdmin
6+
) where
7+
8+
import Control.Monad
9+
import Control.Monad.Trans.Class
10+
import Control.Monad.Trans.Maybe
11+
import Control.Monad.IO.Class
12+
import Data.Text
13+
import Data.Maybe
14+
import Data.Bifunctor
15+
import Data.List.NonEmpty
16+
import qualified Data.List.NonEmpty as NEL
17+
import qualified Data.Text as T
18+
import qualified Data.Set as S
19+
20+
import Kafka.Internal.RdKafka
21+
import Kafka.Internal.Setup
22+
23+
import Kafka.Types as X
24+
import Kafka.Admin.AdminProperties as X
25+
import Kafka.Admin.Types as X
26+
27+
newKAdmin ::( MonadIO m )
28+
=> AdminProperties
29+
-> m (Either KafkaError KAdmin)
30+
newKAdmin properties = liftIO $ do
31+
kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties)
32+
maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf'
33+
case maybeKafka of
34+
Left err -> pure $ Left $ KafkaError err
35+
Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig
36+
37+
closeKAdmin :: KAdmin
38+
-> IO ()
39+
closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka)
40+
--- CREATE TOPIC ---
41+
createTopic :: KAdmin
42+
-> NewTopic
43+
-> IO (Either KafkaError TopicName)
44+
createTopic kAdmin topic = liftIO $ do
45+
let kafkaPtr = getRdKafka kAdmin
46+
queue <- newRdKafkaQueue kafkaPtr
47+
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
48+
49+
topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue
50+
case topicRes of
51+
Left err -> do
52+
pure $ Left (NEL.head err)
53+
Right _ -> do
54+
pure $ Right $ topicName topic
55+
56+
withNewTopic :: NewTopic
57+
-> (RdKafkaNewTopicTPtr -> IO a)
58+
-> IO (Either (NonEmpty KafkaError) a)
59+
withNewTopic t transform = do
60+
mkNewTopicRes <- mkNewTopic t topicPtr
61+
case mkNewTopicRes of
62+
Left err -> do
63+
return $ Left err
64+
Right topic -> do
65+
res <- transform topic
66+
return $ Right res
67+
68+
topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
69+
topicPtr topic = do
70+
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
71+
case ptrRes of
72+
Left str -> pure $ Left (KafkaError $ T.pack str)
73+
Right ptr -> pure $ Right ptr
74+
75+
mkNewTopic :: NewTopic
76+
-> (NewTopic -> IO (Either KafkaError a))
77+
-> IO (Either (NonEmpty KafkaError) a)
78+
mkNewTopic topic create = do
79+
res <- create topic
80+
case res of
81+
Left err -> pure $ Left (Data.List.NonEmpty.singleton err)
82+
Right resource -> pure $ Right resource
83+

src/Kafka/Admin/AdminProperties.hs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{-# LANGUAGE OverloadedStrings #-}
2+
3+
module Kafka.Admin.AdminProperties where
4+
5+
import Data.Map
6+
import qualified Data.Map as M
7+
import Data.Text
8+
9+
import Kafka.Types
10+
11+
newtype AdminProperties = AdminProperties {
12+
adminProps :: Map Text Text
13+
}
14+
15+
instance Semigroup AdminProperties where
16+
( AdminProperties props1 ) <> ( AdminProperties props2 ) =
17+
AdminProperties ( props2 `union` props1 )
18+
{-# INLINE (<>) #-}
19+
20+
instance Monoid AdminProperties where
21+
mempty = AdminProperties {
22+
adminProps = M.empty
23+
}
24+
{-# INLINE mempty #-}
25+
mappend = (<>)
26+
{-# INLINE mappend #-}
27+
28+
brokers :: [BrokerAddress] -> AdminProperties
29+
brokers b =
30+
let b' = intercalate "," ((\( BrokerAddress i ) -> i ) <$> b )
31+
in extraProps $ fromList [("bootstrap.servers", b')]
32+
33+
clientId :: ClientId -> AdminProperties
34+
clientId (ClientId cid) =
35+
extraProps $ M.fromList [("client.id", cid)]
36+
37+
timeOut :: Timeout -> AdminProperties
38+
timeOut (Timeout to) =
39+
let to' = ( pack $ show to )
40+
in extraProps $ fromList [("request.timeout.ms", to')]
41+
42+
extraProps :: Map Text Text -> AdminProperties
43+
extraProps m = mempty { adminProps = m }

src/Kafka/Admin/Types.hs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
module Kafka.Admin.Types (
2+
KAdmin(..)
3+
, PartitionCount (..)
4+
, ReplicationFactor (..)
5+
, NewTopic (..)
6+
) where
7+
8+
import Data.Map
9+
10+
import Kafka.Types
11+
import Kafka.Internal.Setup
12+
13+
data KAdmin = KAdmin {
14+
kaKafkaPtr :: !Kafka
15+
, kaKafkaConf :: !KafkaConf
16+
}
17+
18+
instance HasKafka KAdmin where
19+
getKafka = kaKafkaPtr
20+
{-# INLINE getKafka #-}
21+
22+
instance HasKafkaConf KAdmin where
23+
getKafkaConf = kaKafkaConf
24+
{-# INLINE getKafkaConf #-}
25+
26+
newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq)
27+
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq)
28+
29+
data NewTopic = NewTopic {
30+
topicName :: TopicName
31+
, topicPartitionCount :: PartitionCount
32+
, topicReplicationFactor :: ReplicationFactor
33+
, topicConfig :: Map String String
34+
} deriving (Show)

0 commit comments

Comments
 (0)