Skip to content

Commit c0aa135

Browse files
committed
feat(admin): remove topics
1 parent 0c67d0d commit c0aa135

File tree

3 files changed

+104
-37
lines changed

3 files changed

+104
-37
lines changed

src/Kafka/Admin.hs

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@ module Kafka.Admin(
22
module X
33
, newKAdmin
44
, createTopic
5+
, deleteTopic
56
, closeKAdmin
67
) where
78

89
import Control.Monad
9-
import Control.Monad.Trans.Class
10-
import Control.Monad.Trans.Maybe
1110
import Control.Monad.IO.Class
1211
import Data.Text
13-
import Data.Maybe
14-
import Data.Bifunctor
1512
import Data.List.NonEmpty
1613
import qualified Data.List.NonEmpty as NEL
1714
import qualified Data.Text as T
18-
import qualified Data.Set as S
1915

2016
import Kafka.Internal.RdKafka
2117
import Kafka.Internal.Setup
@@ -53,31 +49,74 @@ createTopic kAdmin topic = liftIO $ do
5349
Right _ -> do
5450
pure $ Right $ topicName topic
5551

52+
--- DELETE TOPIC ---
53+
deleteTopic :: KAdmin
54+
-> TopicName
55+
-> IO (Either KafkaError TopicName)
56+
deleteTopic kAdmin topic = liftIO $ do
57+
let kafkaPtr = getRdKafka kAdmin
58+
queue <- newRdKafkaQueue kafkaPtr
59+
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
60+
61+
topicRes <- withOldTopic topic $ \topic' -> rdKafkaDeleteTopics kafkaPtr [topic'] opts queue
62+
case topicRes of
63+
Left err -> do
64+
pure $ Left (NEL.head err)
65+
Right _ -> do
66+
pure $ Right topic
67+
5668
withNewTopic :: NewTopic
5769
-> (RdKafkaNewTopicTPtr -> IO a)
5870
-> IO (Either (NonEmpty KafkaError) a)
5971
withNewTopic t transform = do
60-
mkNewTopicRes <- mkNewTopic t topicPtr
72+
mkNewTopicRes <- mkNewTopic t newTopicPtr
6173
case mkNewTopicRes of
6274
Left err -> do
6375
return $ Left err
6476
Right topic -> do
6577
res <- transform topic
6678
return $ Right res
6779

68-
topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
69-
topicPtr topic = do
80+
withOldTopic :: TopicName
81+
-> (RdKafkaDeleteTopicTPtr -> IO a)
82+
-> IO (Either (NonEmpty KafkaError) a)
83+
withOldTopic tName transform = do
84+
rmOldTopicRes <- rmOldTopic tName oldTopicPtr
85+
case rmOldTopicRes of
86+
Left err -> do
87+
return $ Left err
88+
Right topic -> do
89+
res <- transform topic
90+
return $ Right res
91+
92+
newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
93+
newTopicPtr topic = do
7094
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
7195
case ptrRes of
7296
Left str -> pure $ Left (KafkaError $ T.pack str)
7397
Right ptr -> pure $ Right ptr
7498

99+
oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr)
100+
oldTopicPtr tName = do
101+
res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName
102+
case res of
103+
Left str -> pure $ Left (KafkaError $ T.pack str)
104+
Right ptr -> pure $ Right ptr
105+
75106
mkNewTopic :: NewTopic
76107
-> (NewTopic -> IO (Either KafkaError a))
77108
-> IO (Either (NonEmpty KafkaError) a)
78109
mkNewTopic topic create = do
79110
res <- create topic
80111
case res of
81-
Left err -> pure $ Left (Data.List.NonEmpty.singleton err)
112+
Left err -> pure $ Left (NEL.singleton err)
82113
Right resource -> pure $ Right resource
83114

115+
rmOldTopic :: TopicName
116+
-> (TopicName -> IO (Either KafkaError a))
117+
-> IO (Either (NonEmpty KafkaError) a)
118+
rmOldTopic tName remove = do
119+
res <- remove tName
120+
case res of
121+
Left err -> pure $ Left (NEL.singleton err)
122+
Right resource -> pure $ Right resource

src/Kafka/Internal/RdKafka.chs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import Foreign.Storable (Storable(..))
1717
import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr)
1818
import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr)
1919
import Foreign.C.Error (Errno(..), getErrno)
20-
import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString)
20+
import Foreign.C.String (CString, newCString, withCString, withCAString, peekCAString, peekCString)
2121
import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong)
2222
import System.IO (Handle, stdin, stdout, stderr)
2323
import System.Posix.IO (handleToFd)
@@ -1203,6 +1203,7 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
12031203
then peekCString ptr >>= pure . Left
12041204
else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res)
12051205

1206+
--- Create topic
12061207
rdKafkaCreateTopic :: RdKafkaTPtr
12071208
-> RdKafkaNewTopicTPtr
12081209
-> RdKafkaAdminOptionsTPtr
@@ -1214,34 +1215,33 @@ rdKafkaCreateTopic kafkaPtr topic opts queue = do
12141215
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
12151216
{#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr
12161217

1217-
rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaTopicResultTPtr)
1218-
rdKafkaEventCreateTopicsResult evtPtr =
1219-
withForeignPtr evtPtr $ \evtPtr' -> do
1220-
res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr')
1218+
--- Delete topic
1219+
foreign import ccall unsafe "rdkafka.h &rd_kafka_DeleteTopic_destroy"
1220+
rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT
1221+
1222+
data RdKafkaDeleteTopicT
1223+
{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #}
1224+
1225+
data RdKafkaDeleteTopicsResultT
1226+
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}
1227+
1228+
newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr)
1229+
newRdKafkaDeleteTopic topicNameStr =
1230+
withCString topicNameStr $ \topicNameStrPtr -> do
1231+
res <- {#call rd_kafka_DeleteTopic_new#} topicNameStrPtr
12211232
if (res == nullPtr)
1222-
then pure Nothing
1223-
else Just <$> newForeignPtr_ (castPtr res)
1224-
1225-
rdKafkaCreateTopicsResultTopics :: RdKafkaTopicResultTPtr
1226-
-> IO [Either (String, RdKafkaRespErrT, String) String]
1227-
rdKafkaCreateTopicsResultTopics tRes =
1228-
withForeignPtr tRes $ \tRes' ->
1229-
alloca $ \sPtr -> do
1230-
res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr
1231-
size <- peekIntConv sPtr
1232-
array <- peekArray size res
1233-
traverse unpackRdKafkaTopicResult array
1234-
1235-
unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT
1236-
-> IO (Either (String, RdKafkaRespErrT, String) String)
1237-
unpackRdKafkaTopicResult resPtr = do
1238-
name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString
1239-
err <- {#call rd_kafka_topic_result_error#} resPtr
1240-
case cIntToEnum err of
1241-
respErr -> do
1242-
errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString
1243-
pure $ Left (name, respErr, errMsg)
1244-
RdKafkaRespErrNoError -> pure $ Right name
1233+
then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr
1234+
else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res
1235+
1236+
rdKafkaDeleteTopics :: RdKafkaTPtr
1237+
-> [RdKafkaDeleteTopicTPtr]
1238+
-> RdKafkaAdminOptionsTPtr
1239+
-> RdKafkaQueueTPtr
1240+
-> IO ()
1241+
rdKafkaDeleteTopics kafkaPtr topics opts queue = do
1242+
withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr ->
1243+
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
1244+
{#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr
12451245

12461246
-- Marshall / Unmarshall
12471247
enumToCInt :: Enum a => a -> CInt

tests-it/Kafka/IntegrationSpec.hs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,41 @@ spec = do
177177
describe "Kafka.Admin.Spec" $ do
178178
let topicName = addRandomChars "admin.topic.created." 5
179179

180+
topicsMVar <- runIO newEmptyMVar
181+
180182
specWithAdmin "Create topic" $ do
181183

182184
it "should create a new topic" $ \(admin :: KAdmin) -> do
183185
tName <- topicName
184186
let newTopic = mkNewTopic (TopicName ( T.pack(tName) ))
185187
result <- createTopic admin newTopic
186188
result `shouldSatisfy` isRight
189+
190+
specWithConsumer "Read all topics" consumerProps $ do
191+
192+
it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do
193+
res <- allTopicsMetadata consumer (Timeout 1000)
194+
res `shouldSatisfy` isRight
195+
let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) }
196+
let res' = fmap filterUserTopics res
197+
length . kmBrokers <$> res' `shouldBe` Right 1
198+
199+
let topics = either (const []) (map tmTopicName . kmTopics) res'
200+
putMVar topicsMVar topics
201+
202+
let topicsLen = either (const 0) (length . kmTopics) res'
203+
let hasTopic = either (const False) (any (\t -> tmTopicName t == testTopic) . kmTopics) res'
204+
205+
topicsLen `shouldSatisfy` (>0)
206+
hasTopic `shouldBe` True
207+
208+
specWithAdmin "Remove topics" $ do
209+
210+
it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do
211+
topics <- takeMVar topicsMVar
212+
forM_ topics $ \topic -> do
213+
result <- deleteTopic admin topic
214+
result `shouldSatisfy` isRight
187215
----------------------------------------------------------------------------------------------------------------
188216

189217
data ReadState = Skip | Read

0 commit comments

Comments
 (0)