@@ -17,7 +17,7 @@ import Foreign.Storable (Storable(..))
1717import Foreign.Ptr (Ptr , FunPtr , castPtr , nullPtr )
1818import Foreign.ForeignPtr (FinalizerPtr , addForeignPtrFinalizer , newForeignPtr_ , withForeignPtr , ForeignPtr , newForeignPtr )
1919import Foreign.C.Error (Errno (.. ), getErrno )
20- import Foreign.C.String (CString , newCString , withCAString , peekCAString , peekCString )
20+ import Foreign.C.String (CString , newCString , withCString , withCAString , peekCAString , peekCString )
2121import Foreign.C.Types (CFile , CInt (.. ), CSize , CChar , CLong )
2222import System.IO (Handle , stdin , stdout , stderr )
2323import System.Posix.IO (handleToFd )
@@ -1203,6 +1203,7 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
12031203 then peekCString ptr >>= pure . Left
12041204 else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res)
12051205
1206+ --- Create topic
12061207rdKafkaCreateTopic :: RdKafkaTPtr
12071208 -> RdKafkaNewTopicTPtr
12081209 -> RdKafkaAdminOptionsTPtr
@@ -1214,34 +1215,33 @@ rdKafkaCreateTopic kafkaPtr topic opts queue = do
12141215 withForeignPtrsArrayLen topics $ \ tLen tPtr -> do
12151216 {# call rd_kafka_CreateTopics# } kPtr tPtr (fromIntegral tLen) oPtr qPtr
12161217
1217- rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaTopicResultTPtr )
1218- rdKafkaEventCreateTopicsResult evtPtr =
1219- withForeignPtr evtPtr $ \ evtPtr' -> do
1220- res <- {# call rd_kafka_event_CreateTopics_result# } (castPtr evtPtr')
1218+ --- Delete topic
1219+ foreign import ccall unsafe " rdkafka.h &rd_kafka_DeleteTopic_destroy"
1220+ rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT
1221+
1222+ data RdKafkaDeleteTopicT
1223+ {# pointer * rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT # }
1224+
1225+ data RdKafkaDeleteTopicsResultT
1226+ {# pointer * rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT # }
1227+
1228+ newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr )
1229+ newRdKafkaDeleteTopic topicNameStr =
1230+ withCString topicNameStr $ \ topicNameStrPtr -> do
1231+ res <- {# call rd_kafka_DeleteTopic_new# } topicNameStrPtr
12211232 if (res == nullPtr)
1222- then pure Nothing
1223- else Just <$> newForeignPtr_ (castPtr res)
1224-
1225- rdKafkaCreateTopicsResultTopics :: RdKafkaTopicResultTPtr
1226- -> IO [Either (String , RdKafkaRespErrT , String ) String ]
1227- rdKafkaCreateTopicsResultTopics tRes =
1228- withForeignPtr tRes $ \ tRes' ->
1229- alloca $ \ sPtr -> do
1230- res <- {# call rd_kafka_CreateTopics_result_topics# } (castPtr tRes') sPtr
1231- size <- peekIntConv sPtr
1232- array <- peekArray size res
1233- traverse unpackRdKafkaTopicResult array
1234-
1235- unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT
1236- -> IO (Either (String , RdKafkaRespErrT , String ) String )
1237- unpackRdKafkaTopicResult resPtr = do
1238- name <- {# call rd_kafka_topic_result_name# } resPtr >>= peekCString
1239- err <- {# call rd_kafka_topic_result_error# } resPtr
1240- case cIntToEnum err of
1241- respErr -> do
1242- errMsg <- {# call rd_kafka_topic_result_error_string# } resPtr >>= peekCString
1243- pure $ Left (name, respErr, errMsg)
1244- RdKafkaRespErrNoError -> pure $ Right name
1233+ then return $ Left $ " Something went wrong while deleting topic " ++ topicNameStr
1234+ else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res
1235+
1236+ rdKafkaDeleteTopics :: RdKafkaTPtr
1237+ -> [RdKafkaDeleteTopicTPtr ]
1238+ -> RdKafkaAdminOptionsTPtr
1239+ -> RdKafkaQueueTPtr
1240+ -> IO ()
1241+ rdKafkaDeleteTopics kafkaPtr topics opts queue = do
1242+ withForeignPtrs kafkaPtr opts queue $ \ kPtr oPtr qPtr ->
1243+ withForeignPtrsArrayLen topics $ \ tLen tPtr -> do
1244+ {# call rd_kafka_DeleteTopics# } kPtr tPtr (fromIntegral tLen) oPtr qPtr
12451245
12461246-- Marshall / Unmarshall
12471247enumToCInt :: Enum a => a -> CInt
0 commit comments