Skip to content

Commit 3baa954

Browse files
committed
chore: use HasKafka in favor of KAdmin
1 parent ba286d3 commit 3baa954

File tree

6 files changed

+22
-112
lines changed

6 files changed

+22
-112
lines changed

hw-kafka-client.cabal

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@ library
5555
build-tool-depends: c2hs:c2hs
5656
if impl(ghc <8.0)
5757
build-depends: semigroups
58-
exposed-modules: Kafka.Admin
59-
Kafka.Admin.AdminProperties
60-
Kafka.Admin.Types
58+
exposed-modules: Kafka.Topic
59+
Kafka.Topic.Types
6160
Kafka.Consumer
6261
Kafka.Consumer.ConsumerProperties
6362
Kafka.Consumer.Subscription

src/Kafka/Admin/AdminProperties.hs

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/Kafka/Admin.hs renamed to src/Kafka/Topic.hs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
module Kafka.Admin(
1+
module Kafka.Topic(
22
module X
3-
, newKAdmin
43
, createTopic
54
, deleteTopic
6-
, closeKAdmin
75
) where
86

97
import Control.Monad
@@ -17,28 +15,12 @@ import Kafka.Internal.RdKafka
1715
import Kafka.Internal.Setup
1816

1917
import Kafka.Types as X
20-
import Kafka.Admin.AdminProperties as X
21-
import Kafka.Admin.Types as X
18+
import Kafka.Topic.Types as X
2219

23-
newKAdmin ::( MonadIO m )
24-
=> AdminProperties
25-
-> m (Either KafkaError KAdmin)
26-
newKAdmin properties = liftIO $ do
27-
kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties)
28-
maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf'
29-
case maybeKafka of
30-
Left err -> pure $ Left $ KafkaError err
31-
Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig
32-
33-
closeKAdmin :: KAdmin
34-
-> IO ()
35-
closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka)
3620
--- CREATE TOPIC ---
37-
createTopic :: KAdmin
38-
-> NewTopic
39-
-> IO (Either KafkaError TopicName)
40-
createTopic kAdmin topic = liftIO $ do
41-
let kafkaPtr = getRdKafka kAdmin
21+
createTopic :: HasKafka k => k -> NewTopic -> IO (Either KafkaError TopicName)
22+
createTopic k topic = do
23+
let kafkaPtr = getRdKafka k
4224
queue <- newRdKafkaQueue kafkaPtr
4325
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
4426

@@ -50,11 +32,12 @@ createTopic kAdmin topic = liftIO $ do
5032
pure $ Right $ topicName topic
5133

5234
--- DELETE TOPIC ---
53-
deleteTopic :: KAdmin
35+
deleteTopic :: HasKafka k
36+
=> k
5437
-> TopicName
5538
-> IO (Either KafkaError TopicName)
56-
deleteTopic kAdmin topic = liftIO $ do
57-
let kafkaPtr = getRdKafka kAdmin
39+
deleteTopic k topic = liftIO $ do
40+
let kafkaPtr = getRdKafka k
5841
queue <- newRdKafkaQueue kafkaPtr
5942
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
6043

src/Kafka/Admin/Types.hs renamed to src/Kafka/Topic/Types.hs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
module Kafka.Admin.Types (
2-
KAdmin(..)
3-
, PartitionCount (..)
1+
module Kafka.Topic.Types (
2+
PartitionCount (..)
43
, ReplicationFactor (..)
54
, NewTopic (..)
65
) where
@@ -10,19 +9,6 @@ import Data.Map
109
import Kafka.Types
1110
import Kafka.Internal.Setup
1211

13-
data KAdmin = KAdmin {
14-
kaKafkaPtr :: !Kafka
15-
, kaKafkaConf :: !KafkaConf
16-
}
17-
18-
instance HasKafka KAdmin where
19-
getKafka = kaKafkaPtr
20-
{-# INLINE getKafka #-}
21-
22-
instance HasKafkaConf KAdmin where
23-
getKafkaConf = kaKafkaConf
24-
{-# INLINE getKafkaConf #-}
25-
2612
newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq)
2713
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq)
2814

tests-it/Kafka/IntegrationSpec.hs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import Kafka.Consumer
1818
import qualified Data.Text as T
1919
import Kafka.Metadata
2020
import Kafka.Producer
21-
import Kafka.Admin
21+
import Kafka.Topic
2222
import Kafka.TestEnv
2323
import Test.Hspec
2424

@@ -174,20 +174,19 @@ spec = do
174174
forM_ res $ \rcs ->
175175
forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders)
176176

177-
describe "Kafka.Admin.Spec" $ do
177+
describe "Kafka.Topic.Spec" $ do
178178
let topicName = addRandomChars "admin.topic.created." 5
179179

180180
topicsMVar <- runIO newEmptyMVar
181181

182-
specWithAdmin "Create topic" $ do
183-
184-
it "should create a new topic" $ \(admin :: KAdmin) -> do
182+
specWithConsumer "Read all topics" consumerProps $ do
183+
184+
it "should create a topic" $ \(consumer :: KafkaConsumer) -> do
185185
tName <- topicName
186186
let newTopic = mkNewTopic (TopicName ( T.pack(tName) ))
187-
result <- createTopic admin newTopic
187+
result <- createTopic consumer newTopic
188188
result `shouldSatisfy` isRight
189189

190-
specWithConsumer "Read all topics" consumerProps $ do
191190

192191
it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do
193192
res <- allTopicsMetadata consumer (Timeout 1000)
@@ -205,13 +204,12 @@ spec = do
205204
topicsLen `shouldSatisfy` (>0)
206205
hasTopic `shouldBe` True
207206

208-
specWithAdmin "Remove topics" $ do
209-
210-
it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do
207+
it "should delete all the topics currently existing" $ \(consumer :: KafkaConsumer) -> do
211208
topics <- takeMVar topicsMVar
212209
forM_ topics $ \topic -> do
213-
result <- deleteTopic admin topic
210+
result <- deleteTopic consumer topic
214211
result `shouldSatisfy` isRight
212+
215213
----------------------------------------------------------------------------------------------------------------
216214

217215
data ReadState = Skip | Read

tests-it/Kafka/TestEnv.hs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import qualified System.Random as Rnd
1515
import Control.Concurrent
1616
import Kafka.Consumer as C
1717
import Kafka.Producer as P
18-
import Kafka.Admin as A
1918

2019
import Test.Hspec
2120

@@ -58,9 +57,6 @@ producerProps = P.brokersList [brokerAddress]
5857
<> P.setCallback (logCallback (\l s1 s2 -> print $ "[Producer] " <> show l <> ": " <> s1 <> ", " <> s2))
5958
<> P.setCallback (errorCallback (\e r -> print $ "[Producer] " <> show e <> ": " <> r))
6059

61-
adminProperties :: AdminProperties
62-
adminProperties = A.brokers [brokerAddress]
63-
6460
testSubscription :: TopicName -> Subscription
6561
testSubscription t = topics [t]
6662
<> offsetReset Earliest
@@ -80,9 +76,6 @@ mkConsumerWith props = do
8076
(RebalanceAssign _) -> putMVar var True
8177
_ -> pure ()
8278

83-
mkAdmin :: IO KAdmin
84-
mkAdmin = newKAdmin adminProperties >>= \(Right k) -> pure k
85-
8679
specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec
8780
specWithConsumer s p f =
8881
beforeAll (mkConsumerWith p)
@@ -97,9 +90,3 @@ specWithKafka s p f =
9790
beforeAll ((,) <$> mkConsumerWith p <*> mkProducer)
9891
$ afterAll (\(consumer, producer) -> void $ closeProducer producer >> closeConsumer consumer)
9992
$ describe s f
100-
101-
specWithAdmin :: String -> SpecWith KAdmin -> Spec
102-
specWithAdmin s f =
103-
beforeAll mkAdmin
104-
$ afterAll (void . closeKAdmin)
105-
$ describe s f

0 commit comments

Comments
 (0)