Skip to content

Commit eb193f5

Browse files
JoranVanBelleAlexeyRaga
authored andcommitted
fix(admin): make creation unsafe and wait for response
1 parent c02def1 commit eb193f5

File tree

3 files changed

+157
-35
lines changed

3 files changed

+157
-35
lines changed

.github/workflows/haskell.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
steps:
2424
- uses: actions/checkout@v4
2525

26-
- uses: actions/cache@v2
26+
- uses: actions/cache@v3
2727
name: Cache librdkafka
2828
with:
2929
path: .librdkafka

src/Kafka/Internal/RdKafka.chs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,8 @@ rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort
11641164
-- Topics
11651165
{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #}
11661166

1167-
1167+
data RdKafkaTopicResultT
1168+
{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT#}
11681169

11691170
data RdKafkaAdminOptionsT
11701171
{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #}
@@ -1180,6 +1181,9 @@ data RdKafkaNewTopicT
11801181
foreign import ccall unsafe "rdkafka.h &rd_kafka_AdminOptions_destroy" -- prevent memory leak
11811182
finalRdKafkaAdminOptionsDestroy :: FinalizerPtr RdKafkaAdminOptionsT
11821183

1184+
{#fun rd_kafka_NewTopic_set_config as ^
1185+
{`RdKafkaNewTopicTPtr', `String', `String'} -> `Either RdKafkaRespErrT ()' cIntToRespEither #}
1186+
11831187
newRdKAdminOptions :: RdKafkaTPtr -> RdKafkaAdminOpT -> IO RdKafkaAdminOptionsTPtr
11841188
newRdKAdminOptions kafkaPtr opt = do
11851189
res <- rdKafkaAdminOptionsNew kafkaPtr opt
@@ -1193,6 +1197,9 @@ rdKafkaNewTopicDestroy tPtr = do
11931197
foreign import ccall "&rd_kafka_NewTopic_destroy"
11941198
rdKafkaNewTopicDestroyFinalizer :: FinalizerPtr RdKafkaNewTopicT
11951199

1200+
data RdKafkaCreateTopicsResultT
1201+
{#pointer *rd_kafka_CreateTopics_result_t as RdKafkaCreateTopicsResultTPtr foreign -> RdKafkaCreateTopicsResultT #}
1202+
11961203
newRdKafkaNewTopic :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr)
11971204
newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
11981205
allocaBytes nErrorBytes $ \ptr -> do
@@ -1202,6 +1209,46 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
12021209
then peekCString ptr >>= pure . Left
12031210
else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res)
12041211

1212+
newRdKafkaNewTopicUnsafe :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr)
1213+
newRdKafkaNewTopicUnsafe topicName topicPartition topicReplicationFactor = do
1214+
allocaBytes nErrorBytes $ \ptr -> do
1215+
res <- rdKafkaNewTopicNew topicName topicPartition topicReplicationFactor ptr (fromIntegral nErrorBytes)
1216+
withForeignPtr res $ \realPtr -> do
1217+
if realPtr == nullPtr
1218+
then peekCString ptr >>= pure . Left
1219+
else pure (Right res)
1220+
1221+
rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaCreateTopicsResultTPtr)
1222+
rdKafkaEventCreateTopicsResult evtPtr =
1223+
withForeignPtr evtPtr $ \evtPtr' -> do
1224+
res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr')
1225+
if (res == nullPtr)
1226+
then pure Nothing
1227+
else Just <$> newForeignPtr_ (castPtr res)
1228+
1229+
rdKafkaCreateTopicsResultTopics :: RdKafkaCreateTopicsResultTPtr
1230+
-> IO [Either (String, RdKafkaRespErrT, String) String]
1231+
rdKafkaCreateTopicsResultTopics tRes =
1232+
withForeignPtr tRes $ \tRes' ->
1233+
alloca $ \sPtr -> do
1234+
res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr
1235+
size <- peekIntConv sPtr
1236+
arr <- peekArray size res
1237+
traverse unpackRdKafkaTopicResult arr
1238+
1239+
-- | Unpacks raw result into
1240+
-- 'Either (topicName, errorType, errorMsg) topicName'
1241+
unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT
1242+
-> IO (Either (String, RdKafkaRespErrT, String) String)
1243+
unpackRdKafkaTopicResult resPtr = do
1244+
name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString
1245+
err <- {#call rd_kafka_topic_result_error#} resPtr
1246+
case cIntToEnum err of
1247+
RdKafkaRespErrNoError -> pure $ Right name
1248+
respErr -> do
1249+
errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString
1250+
pure $ Left (name, respErr, errMsg)
1251+
12051252
--- Create topic
12061253
rdKafkaCreateTopic :: RdKafkaTPtr
12071254
-> RdKafkaNewTopicTPtr
@@ -1222,7 +1269,7 @@ data RdKafkaDeleteTopicT
12221269
{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #}
12231270

12241271
data RdKafkaDeleteTopicsResultT
1225-
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}
1272+
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicsResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}
12261273

12271274
newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr)
12281275
newRdKafkaDeleteTopic topicNameStr =
@@ -1232,6 +1279,14 @@ newRdKafkaDeleteTopic topicNameStr =
12321279
then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr
12331280
else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res
12341281

1282+
rdKafkaEventDeleteTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaDeleteTopicsResultTPtr)
1283+
rdKafkaEventDeleteTopicsResult evtPtr =
1284+
withForeignPtr evtPtr $ \evtPtr' -> do
1285+
res <- {#call rd_kafka_event_DeleteTopics_result#} (castPtr evtPtr')
1286+
if (res == nullPtr)
1287+
then pure Nothing
1288+
else Just <$> newForeignPtr_ (castPtr res)
1289+
12351290
rdKafkaDeleteTopics :: RdKafkaTPtr
12361291
-> [RdKafkaDeleteTopicTPtr]
12371292
-> RdKafkaAdminOptionsTPtr
@@ -1242,6 +1297,16 @@ rdKafkaDeleteTopics kafkaPtr topics opts queue = do
12421297
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
12431298
{#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr
12441299

1300+
rdKafkaDeleteTopicsResultTopics :: RdKafkaDeleteTopicsResultTPtr
1301+
-> IO [Either (String, RdKafkaRespErrT, String) String]
1302+
rdKafkaDeleteTopicsResultTopics tRes =
1303+
withForeignPtr tRes $ \tRes' ->
1304+
alloca $ \sPtr -> do
1305+
res <- {#call rd_kafka_DeleteTopics_result_topics#} (castPtr tRes') sPtr
1306+
size <- peekIntConv sPtr
1307+
arr <- peekArray size res
1308+
traverse unpackRdKafkaTopicResult arr
1309+
12451310
-- Marshall / Unmarshall
12461311
enumToCInt :: Enum a => a -> CInt
12471312
enumToCInt = fromIntegral . fromEnum
@@ -1255,6 +1320,12 @@ cIntConv :: (Integral a, Num b) => a -> b
12551320
cIntConv = fromIntegral
12561321
{-# INLINE cIntConv #-}
12571322

1323+
cIntToRespEither err =
1324+
case cIntToEnum err of
1325+
RdKafkaRespErrNoError -> Right ()
1326+
respErr -> Left respErr
1327+
{-# INLINE cIntToRespEither #-}
1328+
12581329
boolToCInt :: Bool -> CInt
12591330
boolToCInt True = CInt 1
12601331
boolToCInt False = CInt 0

src/Kafka/Topic.hs

Lines changed: 83 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,21 @@ module X
44
, deleteTopic
55
) where
66

7+
import Control.Exception
78
import Control.Monad.IO.Class
9+
import Control.Monad.Trans.Class
10+
import Control.Monad.Trans.Except
11+
import Control.Monad.Trans.Maybe
12+
import Data.Bifunctor
13+
import Data.Foldable
814
import Data.List.NonEmpty
9-
import qualified Data.List.NonEmpty as NEL
10-
import Data.Text
11-
import qualified Data.Text as T
12-
13-
import Kafka.Internal.RdKafka
14-
import Kafka.Internal.Setup
15+
import qualified Data.List.NonEmpty as NEL
16+
import qualified Data.Map as M
17+
import Data.Maybe
18+
import qualified Data.Set as S
19+
import qualified Data.Text as T
20+
import Kafka.Internal.RdKafka
21+
import Kafka.Internal.Setup
1522

1623
import Kafka.Topic.Types as X
1724
import Kafka.Types as X
@@ -24,11 +31,17 @@ createTopic k topic = do
2431
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny
2532

2633
topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue
34+
2735
case topicRes of
2836
Left err -> do
2937
pure $ Left (NEL.head err)
3038
Right _ -> do
31-
pure $ Right $ topicName topic
39+
res <- waitForResponse (topicName topic) rdKafkaEventCreateTopicsResult rdKafkaCreateTopicsResultTopics queue
40+
case listToMaybe res of
41+
Nothing -> pure $ Left KafkaInvalidReturnValue
42+
Just result -> pure $ case result of
43+
Left (_, e, _) -> Left e
44+
Right tName -> Right tName
3245

3346
--- DELETE TOPIC ---
3447
deleteTopic :: HasKafka k
@@ -45,19 +58,17 @@ deleteTopic k topic = liftIO $ do
4558
Left err -> do
4659
pure $ Left (NEL.head err)
4760
Right _ -> do
48-
pure $ Right topic
61+
res <- waitForResponse topic rdKafkaEventDeleteTopicsResult rdKafkaDeleteTopicsResultTopics queue
62+
case listToMaybe res of
63+
Nothing -> pure $ Left KafkaInvalidReturnValue
64+
Just result -> pure $ case result of
65+
Left (_, e, _) -> Left e
66+
Right tName -> Right tName
4967

5068
withNewTopic :: NewTopic
5169
-> (RdKafkaNewTopicTPtr -> IO a)
5270
-> IO (Either (NonEmpty KafkaError) a)
53-
withNewTopic t transform = do
54-
mkNewTopicRes <- mkNewTopic t newTopicPtr
55-
case mkNewTopicRes of
56-
Left err -> do
57-
return $ Left err
58-
Right topic -> do
59-
res <- transform topic
60-
return $ Right res
71+
withNewTopic t = withUnsafeOne t mkNewTopicUnsafe rdKafkaNewTopicDestroy
6172

6273
withOldTopic :: TopicName
6374
-> (RdKafkaDeleteTopicTPtr -> IO a)
@@ -71,28 +82,21 @@ withOldTopic tName transform = do
7182
res <- transform topic
7283
return $ Right res
7384

74-
newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
75-
newTopicPtr topic = do
76-
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
77-
case ptrRes of
78-
Left str -> pure $ Left (KafkaError $ T.pack str)
79-
Right ptr -> pure $ Right ptr
80-
8185
oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr)
8286
oldTopicPtr tName = do
83-
res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName
87+
res <- newRdKafkaDeleteTopic $ T.unpack . unTopicName $ tName
8488
case res of
8589
Left str -> pure $ Left (KafkaError $ T.pack str)
8690
Right ptr -> pure $ Right ptr
8791

88-
mkNewTopic :: NewTopic
89-
-> (NewTopic -> IO (Either KafkaError a))
90-
-> IO (Either (NonEmpty KafkaError) a)
91-
mkNewTopic topic create = do
92-
res <- create topic
93-
case res of
94-
Left err -> pure $ Left (singletonList err)
95-
Right resource -> pure $ Right resource
92+
mkNewTopicUnsafe :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
93+
mkNewTopicUnsafe topic = runExceptT $ do
94+
topic' <- withErrStr $ newRdKafkaNewTopicUnsafe (T.unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
95+
_ <- withErrKafka $ whileRight (uncurry $ rdKafkaNewTopicSetConfig undefined) (M.toList $ topicConfig topic)
96+
pure topic'
97+
where
98+
withErrStr = withExceptT (KafkaError . T.pack) . ExceptT
99+
withErrKafka = withExceptT KafkaResponseError . ExceptT
96100

97101
rmOldTopic :: TopicName
98102
-> (TopicName -> IO (Either KafkaError a))
@@ -103,5 +107,52 @@ rmOldTopic tName remove = do
103107
Left err -> pure $ Left (singletonList err)
104108
Right resource -> pure $ Right resource
105109

110+
withUnsafeOne :: a -- ^ Item to handle
111+
-> (a -> IO (Either KafkaError b)) -- ^ Create an unsafe element
112+
-> (b -> IO ()) -- ^ Destroy the unsafe element
113+
-> (b -> IO c) -- ^ Handler
114+
-> IO (Either (NonEmpty KafkaError) c)
115+
withUnsafeOne a mkOne cleanup f =
116+
bracket (mkOne a) cleanupOne processOne
117+
where
118+
cleanupOne (Right b) = cleanup b
119+
cleanupOne (Left _) = pure () -- no resource to clean if creation failed
120+
121+
processOne (Right b) = Right <$> f b
122+
processOne (Left e) = pure (Left (singletonList e))
123+
124+
whileRight :: Monad m
125+
=> (a -> m (Either e ()))
126+
-> [a]
127+
-> m (Either e ())
128+
whileRight f as = runExceptT $ traverse_ (ExceptT . f) as
129+
130+
waitForResponse :: TopicName
131+
-> (RdKafkaEventTPtr -> IO (Maybe a))
132+
-> (a -> IO [Either (String, RdKafkaRespErrT, String) String])
133+
-> RdKafkaQueueTPtr
134+
-> IO [Either (TopicName, KafkaError, String) TopicName]
135+
waitForResponse topic fromEvent toResults q =
136+
fromMaybe [] <$> runMaybeT (go [])
137+
where
138+
awaited = S.singleton topic
139+
140+
go accRes = do
141+
qRes <- MaybeT $ rdKafkaQueuePoll q 1000
142+
eRes <- MaybeT $ fromEvent qRes
143+
tRes <- lift $ toResults eRes
144+
let results = wrapTopicName <$> tRes
145+
let topics = S.fromList $ getTopicName <$> results
146+
let newRes = results <> accRes
147+
let remaining = S.difference awaited topics
148+
if S.null remaining
149+
then pure newRes
150+
else go newRes
151+
152+
getTopicName = either (\(t,_,_) -> t) id
153+
wrapTopicName = bimap (\(t,e,s) -> (TopicName (T.pack t), KafkaResponseError e, s))
154+
(TopicName . T.pack)
155+
106156
singletonList :: a -> NonEmpty a
107157
singletonList x = x :| []
158+

0 commit comments

Comments
 (0)