From 55641d533c9675f47d2d7757937a991c4468870d Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 24 Mar 2022 15:34:14 +0100 Subject: [PATCH 01/36] Add stats callback to kafka workers --- nri-kafka/src/Kafka/Worker.hs | 1 + nri-kafka/src/Kafka/Worker/Internal.hs | 26 ++++++++++++++++++++------ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/nri-kafka/src/Kafka/Worker.hs b/nri-kafka/src/Kafka/Worker.hs index df9f0465..42740109 100644 --- a/nri-kafka/src/Kafka/Worker.hs +++ b/nri-kafka/src/Kafka/Worker.hs @@ -27,6 +27,7 @@ module Kafka.Worker Internal.PartitionOffset (..), Partition.SeekCmd (..), Internal.CommitToKafkaAsWell (..), + Internal.StatsCallback, ) where diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index ab7467d5..9f68e176 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -9,6 +9,7 @@ import qualified Control.Concurrent.STM as STM import qualified Control.Concurrent.STM.TVar as TVar import qualified Control.Exception.Safe as Exception import qualified Data.Aeson as Aeson +import Data.ByteString (ByteString) import qualified Data.UUID import qualified Data.UUID.V4 import qualified Dict @@ -69,6 +70,8 @@ data PartitionOffset = PartitionOffset offset :: Int } +type StatsCallback = (ByteString -> Task Text ()) + -- | Create a subscription for a topic. -- -- > main :: IO () @@ -177,9 +180,9 @@ data OffsetSource where OffsetSource -- | Starts the kafka worker handling messages. -process :: Settings.Settings -> Text -> TopicSubscription -> Prelude.IO () -process settings groupIdText topicSubscriptions = do - processWithoutShutdownEnsurance settings (Consumer.ConsumerGroupId groupIdText) topicSubscriptions +process :: Settings.Settings -> Text -> TopicSubscription -> Maybe StatsCallback -> Prelude.IO () +process settings groupIdText topicSubscriptions maybeStatsCallback = do + processWithoutShutdownEnsurance settings (Consumer.ConsumerGroupId groupIdText) topicSubscriptions maybeStatsCallback -- Start an ensurance policy to make sure we exit in 5 seconds. We've seen -- cases where our graceful shutdown seems to hang, resulting in a worker -- that's not doing anything. We should try to fix those failures, but for the @@ -198,14 +201,14 @@ process settings groupIdText topicSubscriptions = do -- | Like `process`, but doesn't exit the current process by itself. This risks -- leaving zombie processes when used in production but is safer in tests, where -- the worker shares the OS process with other test code and the test runner. -processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Prelude.IO () -processWithoutShutdownEnsurance settings groupId topicSubscriptions = do +processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Maybe StatsCallback -> Prelude.IO () +processWithoutShutdownEnsurance settings groupId topicSubscriptions maybeStatsCallback = do let TopicSubscription {onMessage, topic, offsetSource, commitToKafkaAsWell} = topicSubscriptions state <- initState onQuitSignal (Stopping.stopTakingRequests (stopping state) "Received stop signal") Conduit.withAcquire (Observability.handler (Settings.observability settings)) <| \observabilityHandler -> do Exception.bracketWithError - (createConsumer settings groupId observabilityHandler offsetSource commitToKafkaAsWell onMessage topic state) + (createConsumer settings groupId observabilityHandler offsetSource commitToKafkaAsWell onMessage maybeStatsCallback topic state) (cleanUp observabilityHandler (rebalanceInfo state) (stopping state)) (runThreads settings state) @@ -232,6 +235,7 @@ createConsumer :: OffsetSource -> CommitToKafkaAsWell -> Partition.MessageCallback -> + Maybe StatsCallback -> Kafka.Topic -> State -> Prelude.IO Consumer.KafkaConsumer @@ -247,6 +251,7 @@ createConsumer offsetSource commitToKafkaAsWell callback + maybeStatsCallback topic state = do let rebalance = @@ -269,6 +274,15 @@ createConsumer ( Dict.fromList [("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs))] ) + ++ case maybeStatsCallback of + Nothing -> Prelude.mempty + Just statsCallback -> + Consumer.setCallback + ( Consumer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback content) + Prelude.pure () + ) let subscription' = Consumer.topics [Consumer.TopicName (Kafka.unTopic topic)] ++ Consumer.offsetReset Consumer.Earliest From b9f8eacd2f587a8e71483b51e0fec9a774e01663 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 24 Mar 2022 15:50:22 +0100 Subject: [PATCH 02/36] Add stats callback to kafka --- nri-kafka/src/Kafka.hs | 20 +++++++++++++++----- nri-kafka/src/Kafka/Internal.hs | 3 +++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 7ea45f11..f932c15c 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -11,6 +11,7 @@ module Kafka Settings.Settings, Settings.decoder, handler, + Internal.StatsCallback, -- * Creating messages Internal.Msg, @@ -155,9 +156,9 @@ key msg = Maybe.map Internal.unKey (Internal.key msg) -- | Function for creating a Kafka handler. -- -- See 'Kafka.Settings' for potential customizations. -handler :: Settings.Settings -> Conduit.Acquire Internal.Handler -handler settings = do - producer <- Conduit.mkAcquire (mkProducer settings) Producer.closeProducer +handler :: Settings.Settings -> Maybe Internal.StatsCallback -> Conduit.Acquire Internal.Handler +handler settings maybeStatsCallback = do + producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer _ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) liftIO (mkHandler settings producer) @@ -215,8 +216,8 @@ doSTM doAnything stm = |> map Ok |> Platform.doAnything doAnything -mkProducer :: Settings.Settings -> Prelude.IO Producer.KafkaProducer -mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages} = do +mkProducer :: Settings.Settings -> Maybe Internal.StatsCallback -> Prelude.IO Producer.KafkaProducer +mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages} maybeStatsCallback = do let properties = Producer.brokersList brokerAddresses ++ Producer.sendTimeout deliveryTimeout @@ -235,6 +236,15 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout ("acks", "all") ] ) + ++ case maybeStatsCallback of + Nothing -> Prelude.mempty + Just statsCallback -> + Producer.setCallback + ( Producer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback content) + Prelude.pure () + ) eitherProducer <- Producer.newProducer properties case eitherProducer of Prelude.Left err -> diff --git a/nri-kafka/src/Kafka/Internal.hs b/nri-kafka/src/Kafka/Internal.hs index 2eebce92..c545904d 100644 --- a/nri-kafka/src/Kafka/Internal.hs +++ b/nri-kafka/src/Kafka/Internal.hs @@ -4,6 +4,7 @@ module Kafka.Internal where import qualified Control.Exception.Safe as Exception import qualified Data.Aeson as Aeson +import Data.ByteString (ByteString) import qualified Kafka.Producer as Producer import qualified Prelude @@ -81,3 +82,5 @@ instance Aeson.ToJSON MetaData instance Aeson.FromJSON MetaData newtype Offset = Offset Int + +type StatsCallback = (ByteString -> Task Text ()) From 37a47ba1d3e1e5fbe69fe573099bce5c172d5c59 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 24 Mar 2022 16:01:28 +0100 Subject: [PATCH 03/36] Add type for stats --- nri-kafka/src/Kafka/Worker/Internal.hs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 9f68e176..fe5d575e 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -70,7 +70,31 @@ data PartitionOffset = PartitionOffset offset :: Int } -type StatsCallback = (ByteString -> Task Text ()) +type StatsCallback = (Stats -> Task Text ()) + +data Stats = Stats {rtt :: Rtt} + +data Rtt = Rtt + { min :: Int, + max :: Int, + avg :: Int, + sum :: Int, + stddev :: Int, + p50 :: Int, + p75 :: Int, + p90 :: Int, + p95 :: Int, + p99 :: Int, + p99_99 :: Int, + outofrange :: Int, + hdrsize :: Int, + cnt :: Int + } + +-- TODO add aeson instances +-- TODO move to shared location to use in both worker and producer +-- TODO add more metrics +-- TODO expose set of all available metrics -- | Create a subscription for a topic. -- From 6b9665120ab9a2ec1f68aa3ecf061f22e8178a98 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 24 Mar 2022 16:05:39 +0100 Subject: [PATCH 04/36] Decode stats in consumer callback --- nri-kafka/src/Kafka/Worker/Internal.hs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index fe5d575e..f9594dac 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -9,7 +9,6 @@ import qualified Control.Concurrent.STM as STM import qualified Control.Concurrent.STM.TVar as TVar import qualified Control.Exception.Safe as Exception import qualified Data.Aeson as Aeson -import Data.ByteString (ByteString) import qualified Data.UUID import qualified Data.UUID.V4 import qualified Dict @@ -72,7 +71,10 @@ data PartitionOffset = PartitionOffset type StatsCallback = (Stats -> Task Text ()) -data Stats = Stats {rtt :: Rtt} +newtype Stats = Stats {rtt :: Rtt} + deriving (Generic) + +instance Aeson.FromJSON Stats data Rtt = Rtt { min :: Int, @@ -90,6 +92,9 @@ data Rtt = Rtt hdrsize :: Int, cnt :: Int } + deriving (Generic) + +instance Aeson.FromJSON Rtt -- TODO add aeson instances -- TODO move to shared location to use in both worker and producer @@ -304,8 +309,12 @@ createConsumer Consumer.setCallback ( Consumer.statsCallback <| \content -> do log <- Platform.silentHandler - _ <- Task.attempt log (statsCallback content) - Prelude.pure () + case Aeson.decodeStrict content of + Nothing -> + Prelude.pure () + Just stats -> do + _ <- Task.attempt log (statsCallback stats) + Prelude.pure () ) let subscription' = Consumer.topics [Consumer.TopicName (Kafka.unTopic topic)] From cdf7626024e4456638be5c363b387fcbd3bce84b Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 24 Mar 2022 16:10:52 +0100 Subject: [PATCH 05/36] Decode stats --- nri-kafka/src/Kafka.hs | 23 ++++--- nri-kafka/src/Kafka/Internal.hs | 3 - nri-kafka/src/Kafka/Stats.hs | 84 ++++++++++++++++++++++++++ nri-kafka/src/Kafka/Worker.hs | 8 ++- nri-kafka/src/Kafka/Worker/Internal.hs | 43 ++----------- 5 files changed, 113 insertions(+), 48 deletions(-) create mode 100644 nri-kafka/src/Kafka/Stats.hs diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index f932c15c..e1dc8f41 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -11,7 +11,12 @@ module Kafka Settings.Settings, Settings.decoder, handler, - Internal.StatsCallback, + + -- * Stats + Stats.StatsCallback, + Stats.Stats (..), + Stats.Rtt (..), + Stats.allStats, -- * Creating messages Internal.Msg, @@ -44,6 +49,7 @@ import qualified Dict import qualified Kafka.Internal as Internal import qualified Kafka.Producer as Producer import qualified Kafka.Settings as Settings +import qualified Kafka.Stats as Stats import qualified Platform import qualified Prelude @@ -156,7 +162,7 @@ key msg = Maybe.map Internal.unKey (Internal.key msg) -- | Function for creating a Kafka handler. -- -- See 'Kafka.Settings' for potential customizations. -handler :: Settings.Settings -> Maybe Internal.StatsCallback -> Conduit.Acquire Internal.Handler +handler :: Settings.Settings -> Maybe Stats.StatsCallback -> Conduit.Acquire Internal.Handler handler settings maybeStatsCallback = do producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer _ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) @@ -216,7 +222,7 @@ doSTM doAnything stm = |> map Ok |> Platform.doAnything doAnything -mkProducer :: Settings.Settings -> Maybe Internal.StatsCallback -> Prelude.IO Producer.KafkaProducer +mkProducer :: Settings.Settings -> Maybe Stats.StatsCallback -> Prelude.IO Producer.KafkaProducer mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages} maybeStatsCallback = do let properties = Producer.brokersList brokerAddresses @@ -240,10 +246,13 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout Nothing -> Prelude.mempty Just statsCallback -> Producer.setCallback - ( Producer.statsCallback <| \content -> do - log <- Platform.silentHandler - _ <- Task.attempt log (statsCallback content) - Prelude.pure () + ( Producer.statsCallback <| \content -> + case Aeson.decodeStrict content of + Nothing -> Prelude.pure () + Just stats -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback stats) + Prelude.pure () ) eitherProducer <- Producer.newProducer properties case eitherProducer of diff --git a/nri-kafka/src/Kafka/Internal.hs b/nri-kafka/src/Kafka/Internal.hs index c545904d..2eebce92 100644 --- a/nri-kafka/src/Kafka/Internal.hs +++ b/nri-kafka/src/Kafka/Internal.hs @@ -4,7 +4,6 @@ module Kafka.Internal where import qualified Control.Exception.Safe as Exception import qualified Data.Aeson as Aeson -import Data.ByteString (ByteString) import qualified Kafka.Producer as Producer import qualified Prelude @@ -82,5 +81,3 @@ instance Aeson.ToJSON MetaData instance Aeson.FromJSON MetaData newtype Offset = Offset Int - -type StatsCallback = (ByteString -> Task Text ()) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs new file mode 100644 index 00000000..52e63b5b --- /dev/null +++ b/nri-kafka/src/Kafka/Stats.hs @@ -0,0 +1,84 @@ +module Kafka.Stats where + +import qualified Data.Aeson as Aeson +import Dict (Dict) +import Set (Set) +import qualified Set + +type StatsCallback = (Stats -> Task Text ()) + +-- | See https://github.com/edenhill/librdkafka/blob/0261c86228e910cc84c4c7ab74e563c121f50696/STATISTICS.md +data Stats = Stats + { name :: Text, + client_id :: Text, + type_ :: Text, + ts :: Int, + time :: Int, + brokers :: Dict Text Broker + } + deriving (Generic) + +instance Aeson.FromJSON Stats + +data Broker = Broker + { brokerName :: Text, + brokerRtt :: Rtt + } + deriving (Generic) + +instance Aeson.FromJSON Broker where + parseJSON = Aeson.genericParseJSON (removePrefix "broker") + +removePrefix :: Text -> Aeson.Options +removePrefix prefix = + Aeson.defaultOptions + { Aeson.fieldLabelModifier = + Aeson.camelTo2 '_' << Text.toList + << (if Text.isEmpty prefix then identity else Text.replace prefix "") + << Text.fromList + } + +data Rtt = Rtt + { min :: Int, + max :: Int, + avg :: Int, + sum :: Int, + stddev :: Int, + p50 :: Int, + p75 :: Int, + p90 :: Int, + p95 :: Int, + p99 :: Int, + p99_99 :: Int, + outofrange :: Int, + hdrsize :: Int, + cnt :: Int + } + deriving (Generic) + +instance Aeson.FromJSON Rtt + +allStats :: Set Text +allStats = + Set.fromList + [ "name", + "client_id", + "type", + "ts", + "time", + "brokers.name", + "brokers.rtt.min", + "brokers.rtt.max", + "brokers.rtt.avg", + "brokers.rtt.sum", + "brokers.rtt.stddev", + "brokers.rtt.p50", + "brokers.rtt.p75", + "brokers.rtt.p90", + "brokers.rtt.p95", + "brokers.rtt.p99", + "brokers.rtt.p99_99", + "brokers.rtt.outofrange", + "brokers.rtt.hdrsize", + "brokers.rtt.cnt" + ] diff --git a/nri-kafka/src/Kafka/Worker.hs b/nri-kafka/src/Kafka/Worker.hs index 42740109..daf36ce5 100644 --- a/nri-kafka/src/Kafka/Worker.hs +++ b/nri-kafka/src/Kafka/Worker.hs @@ -27,10 +27,16 @@ module Kafka.Worker Internal.PartitionOffset (..), Partition.SeekCmd (..), Internal.CommitToKafkaAsWell (..), - Internal.StatsCallback, + + -- * Stats + Stats.StatsCallback, + Stats.Stats (..), + Stats.Rtt (..), + Stats.allStats, ) where +import qualified Kafka.Stats as Stats import qualified Kafka.Worker.Internal as Internal import qualified Kafka.Worker.Partition as Partition import qualified Kafka.Worker.Settings as Settings diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index f9594dac..145fb95e 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -16,6 +16,7 @@ import qualified GHC.Clock import qualified Kafka.Consumer as Consumer import qualified Kafka.Internal as Kafka import qualified Kafka.Metadata +import qualified Kafka.Stats as Stats import qualified Kafka.Worker.Analytics as Analytics import qualified Kafka.Worker.Fetcher as Fetcher import qualified Kafka.Worker.Partition as Partition @@ -69,38 +70,6 @@ data PartitionOffset = PartitionOffset offset :: Int } -type StatsCallback = (Stats -> Task Text ()) - -newtype Stats = Stats {rtt :: Rtt} - deriving (Generic) - -instance Aeson.FromJSON Stats - -data Rtt = Rtt - { min :: Int, - max :: Int, - avg :: Int, - sum :: Int, - stddev :: Int, - p50 :: Int, - p75 :: Int, - p90 :: Int, - p95 :: Int, - p99 :: Int, - p99_99 :: Int, - outofrange :: Int, - hdrsize :: Int, - cnt :: Int - } - deriving (Generic) - -instance Aeson.FromJSON Rtt - --- TODO add aeson instances --- TODO move to shared location to use in both worker and producer --- TODO add more metrics --- TODO expose set of all available metrics - -- | Create a subscription for a topic. -- -- > main :: IO () @@ -209,7 +178,7 @@ data OffsetSource where OffsetSource -- | Starts the kafka worker handling messages. -process :: Settings.Settings -> Text -> TopicSubscription -> Maybe StatsCallback -> Prelude.IO () +process :: Settings.Settings -> Text -> TopicSubscription -> Maybe Stats.StatsCallback -> Prelude.IO () process settings groupIdText topicSubscriptions maybeStatsCallback = do processWithoutShutdownEnsurance settings (Consumer.ConsumerGroupId groupIdText) topicSubscriptions maybeStatsCallback -- Start an ensurance policy to make sure we exit in 5 seconds. We've seen @@ -230,7 +199,7 @@ process settings groupIdText topicSubscriptions maybeStatsCallback = do -- | Like `process`, but doesn't exit the current process by itself. This risks -- leaving zombie processes when used in production but is safer in tests, where -- the worker shares the OS process with other test code and the test runner. -processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Maybe StatsCallback -> Prelude.IO () +processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Maybe Stats.StatsCallback -> Prelude.IO () processWithoutShutdownEnsurance settings groupId topicSubscriptions maybeStatsCallback = do let TopicSubscription {onMessage, topic, offsetSource, commitToKafkaAsWell} = topicSubscriptions state <- initState @@ -264,7 +233,7 @@ createConsumer :: OffsetSource -> CommitToKafkaAsWell -> Partition.MessageCallback -> - Maybe StatsCallback -> + Maybe Stats.StatsCallback -> Kafka.Topic -> State -> Prelude.IO Consumer.KafkaConsumer @@ -307,12 +276,12 @@ createConsumer Nothing -> Prelude.mempty Just statsCallback -> Consumer.setCallback - ( Consumer.statsCallback <| \content -> do - log <- Platform.silentHandler + ( Consumer.statsCallback <| \content -> case Aeson.decodeStrict content of Nothing -> Prelude.pure () Just stats -> do + log <- Platform.silentHandler _ <- Task.attempt log (statsCallback stats) Prelude.pure () ) From 3e65ec6e79144755b3ebbbce72b48dc4bff830e4 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 24 Mar 2022 17:20:43 +0100 Subject: [PATCH 06/36] Expose stats module --- nri-kafka/package.yaml | 1 + nri-kafka/src/Kafka.hs | 6 ------ nri-kafka/src/Kafka/Stats.hs | 13 ++++++++++++- nri-kafka/src/Kafka/Worker.hs | 7 ------- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/nri-kafka/package.yaml b/nri-kafka/package.yaml index 51a9861a..64800520 100644 --- a/nri-kafka/package.yaml +++ b/nri-kafka/package.yaml @@ -34,6 +34,7 @@ library: exposed-modules: - Kafka - Kafka.Worker + - Kafka.Stats - Kafka.Test source-dirs: src tests: diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index e1dc8f41..21e54ea5 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -12,12 +12,6 @@ module Kafka Settings.decoder, handler, - -- * Stats - Stats.StatsCallback, - Stats.Stats (..), - Stats.Rtt (..), - Stats.allStats, - -- * Creating messages Internal.Msg, emptyMsg, diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 52e63b5b..f4f29eba 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -1,4 +1,15 @@ -module Kafka.Stats where +-- | Kafka is a module for _writing_ to Kafka +-- +-- See Kafka.Worker for the basic building blocks of a CLI app that will poll & +-- process kafka messages +module Kafka.Stats + ( StatsCallback, + Stats (..), + Broker (..), + Rtt (..), + allStats, + ) +where import qualified Data.Aeson as Aeson import Dict (Dict) diff --git a/nri-kafka/src/Kafka/Worker.hs b/nri-kafka/src/Kafka/Worker.hs index daf36ce5..df9f0465 100644 --- a/nri-kafka/src/Kafka/Worker.hs +++ b/nri-kafka/src/Kafka/Worker.hs @@ -27,16 +27,9 @@ module Kafka.Worker Internal.PartitionOffset (..), Partition.SeekCmd (..), Internal.CommitToKafkaAsWell (..), - - -- * Stats - Stats.StatsCallback, - Stats.Stats (..), - Stats.Rtt (..), - Stats.allStats, ) where -import qualified Kafka.Stats as Stats import qualified Kafka.Worker.Internal as Internal import qualified Kafka.Worker.Partition as Partition import qualified Kafka.Worker.Settings as Settings From 9974886c3d236be0c98121d1d5111705fe04c67d Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 31 Mar 2022 11:36:31 -0300 Subject: [PATCH 07/36] Let us debug these things --- nri-kafka/src/Kafka/Stats.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index f4f29eba..b8a54e8b 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -27,7 +27,7 @@ data Stats = Stats time :: Int, brokers :: Dict Text Broker } - deriving (Generic) + deriving (Generic, Show) instance Aeson.FromJSON Stats @@ -35,7 +35,7 @@ data Broker = Broker { brokerName :: Text, brokerRtt :: Rtt } - deriving (Generic) + deriving (Generic, Show) instance Aeson.FromJSON Broker where parseJSON = Aeson.genericParseJSON (removePrefix "broker") @@ -65,7 +65,7 @@ data Rtt = Rtt hdrsize :: Int, cnt :: Int } - deriving (Generic) + deriving (Generic, Show) instance Aeson.FromJSON Rtt From 336c48dd4f1fb8b6e26ec8043673034a6b679db7 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 31 Mar 2022 11:54:30 -0300 Subject: [PATCH 08/36] Tell kafka to report back with stats every 1s --- nri-kafka/src/Kafka.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 21e54ea5..00d5cbd4 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -233,7 +233,8 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout -- Enable idemptent producers -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference ("enable.idempotence", "true"), - ("acks", "all") + ("acks", "all"), + ("statistics.interval.ms", "1000") ] ) ++ case maybeStatsCallback of From 36b05d6c9fa4ecd94ae3c4a37d1413e5c9bf968e Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 31 Mar 2022 11:54:43 -0300 Subject: [PATCH 09/36] Crash on debug failures --- nri-kafka/src/Kafka.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 00d5cbd4..0d9729e1 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -243,7 +243,8 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout Producer.setCallback ( Producer.statsCallback <| \content -> case Aeson.decodeStrict content of - Nothing -> Prelude.pure () + Nothing -> let _ = Debug.log "kafka stats content" content + in Debug.todo "decoding broke" Just stats -> do log <- Platform.silentHandler _ <- Task.attempt log (statsCallback stats) From 1668eb528c3a17635abc44e3f96faa5a5a27f5a1 Mon Sep 17 00:00:00 2001 From: Caique Figueiredo Date: Thu, 7 Apr 2022 11:49:06 -0300 Subject: [PATCH 10/36] Show things --- nri-kafka/src/Kafka.hs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 0d9729e1..e42b6f02 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -243,8 +243,7 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout Producer.setCallback ( Producer.statsCallback <| \content -> case Aeson.decodeStrict content of - Nothing -> let _ = Debug.log "kafka stats content" content - in Debug.todo "decoding broke" + Nothing -> Debug.todo <| Data.Text.Encoding.decodeUtf8 content Just stats -> do log <- Platform.silentHandler _ <- Task.attempt log (statsCallback stats) From a8b12580409c4e9b7867dccaeb936058166505c2 Mon Sep 17 00:00:00 2001 From: Caique Figueiredo Date: Thu, 7 Apr 2022 12:07:11 -0300 Subject: [PATCH 11/36] WIP expose dict metrics --- nri-kafka/src/Kafka/Stats.hs | 51 +++++++----------------------------- 1 file changed, 9 insertions(+), 42 deletions(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index b8a54e8b..7a217137 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -1,4 +1,5 @@ -- | Kafka is a module for _writing_ to Kafka +-- See https://github.com/edenhill/librdkafka/blob/0261c86228e910cc84c4c7ab74e563c121f50696/STATISTICS.md -- -- See Kafka.Worker for the basic building blocks of a CLI app that will poll & -- process kafka messages @@ -18,28 +19,6 @@ import qualified Set type StatsCallback = (Stats -> Task Text ()) --- | See https://github.com/edenhill/librdkafka/blob/0261c86228e910cc84c4c7ab74e563c121f50696/STATISTICS.md -data Stats = Stats - { name :: Text, - client_id :: Text, - type_ :: Text, - ts :: Int, - time :: Int, - brokers :: Dict Text Broker - } - deriving (Generic, Show) - -instance Aeson.FromJSON Stats - -data Broker = Broker - { brokerName :: Text, - brokerRtt :: Rtt - } - deriving (Generic, Show) - -instance Aeson.FromJSON Broker where - parseJSON = Aeson.genericParseJSON (removePrefix "broker") - removePrefix :: Text -> Aeson.Options removePrefix prefix = Aeson.defaultOptions @@ -49,26 +28,6 @@ removePrefix prefix = << Text.fromList } -data Rtt = Rtt - { min :: Int, - max :: Int, - avg :: Int, - sum :: Int, - stddev :: Int, - p50 :: Int, - p75 :: Int, - p90 :: Int, - p95 :: Int, - p99 :: Int, - p99_99 :: Int, - outofrange :: Int, - hdrsize :: Int, - cnt :: Int - } - deriving (Generic, Show) - -instance Aeson.FromJSON Rtt - allStats :: Set Text allStats = Set.fromList @@ -93,3 +52,11 @@ allStats = "brokers.rtt.hdrsize", "brokers.rtt.cnt" ] + +-- | Currengly suggested strucure +data Value = IntValue Int | TextValue Text | BoolValue Bool +type Stats = Dict Key Value +data Key = Key (List Text) + +-- | Type exposed for clients +-- TODO statsDict :: Text Aeson.Value From a04332ee7b05e5f94b054b6588e1154c9330334f Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 11:15:33 +0200 Subject: [PATCH 12/36] Fix test helpers --- nri-kafka/test/Helpers.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/nri-kafka/test/Helpers.hs b/nri-kafka/test/Helpers.hs index f29baee7..0212a460 100644 --- a/nri-kafka/test/Helpers.hs +++ b/nri-kafka/test/Helpers.hs @@ -73,6 +73,7 @@ spawnWorker handler' topic callback = |> Platform.doAnything (doAnything handler') ) ) + Nothing |> Async.race_ (returnWhenTerminating handler') |> Async.async Async.link async From 1014811bec3380cea3aea56835cf8c91f18f1a87 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 11:16:32 +0200 Subject: [PATCH 13/36] Add function to decode json into a flat dict --- nri-kafka/nri-kafka.cabal | 9 ++++ nri-kafka/package.yaml | 2 + nri-kafka/src/Data/Aeson/Extra.hs | 52 ++++++++++++++++++++ nri-kafka/test/Main.hs | 4 +- nri-kafka/test/Spec/Data/Aeson/Extra.hs | 65 +++++++++++++++++++++++++ 5 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 nri-kafka/src/Data/Aeson/Extra.hs create mode 100644 nri-kafka/test/Spec/Data/Aeson/Extra.hs diff --git a/nri-kafka/nri-kafka.cabal b/nri-kafka/nri-kafka.cabal index edc1ce16..01fdfe56 100644 --- a/nri-kafka/nri-kafka.cabal +++ b/nri-kafka/nri-kafka.cabal @@ -31,8 +31,10 @@ library exposed-modules: Kafka Kafka.Worker + Kafka.Stats Kafka.Test other-modules: + Data.Aeson.Extra Kafka.Internal Kafka.Settings Kafka.Settings.Internal @@ -78,17 +80,21 @@ library , text >=1.2.3.1 && <2.1 , time >=1.8.0.2 && <2 , unix >=2.7.2.2 && <2.8.0.0 + , unordered-containers >=0.2.14.0 && <0.3 , uuid >=1.3.0 && <1.4 + , vector >=0.12.3.1 && <0.13 default-language: Haskell2010 test-suite tests type: exitcode-stdio-1.0 main-is: Main.hs other-modules: + Data.Aeson.Extra Kafka Kafka.Internal Kafka.Settings Kafka.Settings.Internal + Kafka.Stats Kafka.Test Kafka.Worker Kafka.Worker.Analytics @@ -98,6 +104,7 @@ test-suite tests Kafka.Worker.Settings Kafka.Worker.Stopping Helpers + Spec.Data.Aeson.Extra Spec.Kafka.Worker.Integration Spec.Kafka.Worker.Partition Paths_nri_kafka @@ -137,5 +144,7 @@ test-suite tests , text >=1.2.3.1 && <2.1 , time >=1.8.0.2 && <2 , unix >=2.7.2.2 && <2.8.0.0 + , unordered-containers >=0.2.14.0 && <0.3 , uuid >=1.3.0 && <1.4 + , vector >=0.12.3.1 && <0.13 default-language: Haskell2010 diff --git a/nri-kafka/package.yaml b/nri-kafka/package.yaml index 64800520..4eb278b4 100644 --- a/nri-kafka/package.yaml +++ b/nri-kafka/package.yaml @@ -30,6 +30,8 @@ dependencies: - time >= 1.8.0.2 && < 2 - unix >= 2.7.2.2 && < 2.8.0.0 - uuid >=1.3.0 && < 1.4 + - unordered-containers >= 0.2.14.0 && < 0.3 + - vector >= 0.12.3.1 && < 0.13 library: exposed-modules: - Kafka diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs new file mode 100644 index 00000000..c5366399 --- /dev/null +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -0,0 +1,52 @@ +module Data.Aeson.Extra (decodeIntoFlatDict) where + +import qualified Data.Aeson as Aeson +import Data.ByteString.Lazy (ByteString) +import qualified Data.HashMap.Strict as HM +import qualified Data.Vector as Vector +import Dict (Dict) +import qualified Dict +import Prelude (Either (Left, Right)) +import qualified Prelude + +newtype Error = Error Text + deriving (Show, Eq) + +-- | Decodes JSON into a flat dict. +-- +-- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}" +-- > ==> Ok (Dict.fromList [("a.b", 1)]) +-- +-- > decodeIntoFlatDict "{\"a\": [1,2,3]}" +-- > ==> Ok (Dict.fromList [("a[0]", 1), ("a[1]", 2), ("a[2]", 3)]) +decodeIntoFlatDict :: ByteString -> Result Error (Dict Text Aeson.Value) +decodeIntoFlatDict content = + case Aeson.eitherDecode' content of + Left err -> Err (Error (Text.fromList err)) + Right value -> + case value of + Aeson.Object obj -> + obj + |> HM.foldlWithKey' (objectToDict identity) Dict.empty + |> Ok + Aeson.Array arr -> Ok (arrayToDict identity Dict.empty arr) + _ -> Err (Error "We can only parse top-level objects or arrays") + +objectToDict :: (Text -> Text) -> Dict Text Aeson.Value -> Text -> Aeson.Value -> Dict Text Aeson.Value +objectToDict toKey acc key val = + case val of + Aeson.Object obj -> + HM.foldlWithKey' (objectToDict (\subKey -> toKey (key ++ "." ++ subKey))) acc obj + Aeson.Array arr -> arrayToDict (\subKey -> toKey (key ++ subKey)) acc arr + _ -> Dict.insert (toKey key) val acc + +arrayToDict :: (Text -> Text) -> Dict Text Aeson.Value -> Aeson.Array -> Dict Text Aeson.Value +arrayToDict toKey = + Vector.ifoldl + ( \acc index item -> + objectToDict + (\index' -> toKey ("[" ++ index' ++ "]")) + acc + (Text.fromInt (Prelude.fromIntegral index)) + item + ) diff --git a/nri-kafka/test/Main.hs b/nri-kafka/test/Main.hs index 8ec4991a..88e7afa8 100644 --- a/nri-kafka/test/Main.hs +++ b/nri-kafka/test/Main.hs @@ -1,5 +1,6 @@ module Main (main) where +import qualified Spec.Data.Aeson.Extra import qualified Spec.Kafka.Worker.Integration import qualified Spec.Kafka.Worker.Partition import qualified Test @@ -13,5 +14,6 @@ tests = Test.describe "lib/kafka" [ Spec.Kafka.Worker.Integration.tests, - Spec.Kafka.Worker.Partition.tests + Spec.Kafka.Worker.Partition.tests, + Spec.Data.Aeson.Extra.tests ] diff --git a/nri-kafka/test/Spec/Data/Aeson/Extra.hs b/nri-kafka/test/Spec/Data/Aeson/Extra.hs new file mode 100644 index 00000000..39dade47 --- /dev/null +++ b/nri-kafka/test/Spec/Data/Aeson/Extra.hs @@ -0,0 +1,65 @@ +module Spec.Data.Aeson.Extra (tests) where + +import qualified Data.Aeson as Aeson +import Data.Aeson.Extra (decodeIntoFlatDict) +import qualified Dict +import qualified Expect +import Test + +tests :: Test +tests = + describe + "decodeIntoFlatDict" + [ test "simple object" <| \() -> + "{\"foo\": 1}" + |> decodeIntoFlatDict + |> Expect.equal (Ok (Dict.fromList [("foo", Aeson.Number 1)])), + test "with nested object" <| \() -> + "{\"foo\": 1, \"bar\": { \"moo\": \"cow\" }}" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ("foo", Aeson.Number 1), + ("bar.moo", Aeson.String "cow") + ] + ) + ), + test "with more nesting object" <| \() -> + "{\"foo\": 1, \"bar\": { \"moo\": \"cow\", \"hello\": { \"world\": true }}}" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ("foo", Aeson.Number 1), + ("bar.moo", Aeson.String "cow"), + ("bar.hello.world", Aeson.Bool True) + ] + ) + ), + test "with nested arrays" <| \() -> + "{\"foo\": 1, \"bar\": [ 1, 2, 3 ]}" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ("foo", Aeson.Number 1), + ("bar[0]", Aeson.Number 1), + ("bar[1]", Aeson.Number 2), + ("bar[2]", Aeson.Number 3) + ] + ) + ), + test "with top-level array" <| \() -> + "[1, 2, 3 ]" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ("[0]", Aeson.Number 1), + ("[1]", Aeson.Number 2), + ("[2]", Aeson.Number 3) + ] + ) + ) + ] From 9b7bee1b5657f608de4c1b70e0e8e1aafaed0b3a Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 11:25:31 +0200 Subject: [PATCH 14/36] Decode stats into flat dict --- nri-kafka/src/Data/Aeson/Extra.hs | 13 +++--- nri-kafka/src/Kafka.hs | 11 ++--- nri-kafka/src/Kafka/Stats.hs | 58 ++++---------------------- nri-kafka/src/Kafka/Worker/Internal.hs | 12 ++---- 4 files changed, 20 insertions(+), 74 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index c5366399..cb844133 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -1,7 +1,7 @@ module Data.Aeson.Extra (decodeIntoFlatDict) where import qualified Data.Aeson as Aeson -import Data.ByteString.Lazy (ByteString) +import Data.ByteString (ByteString) import qualified Data.HashMap.Strict as HM import qualified Data.Vector as Vector import Dict (Dict) @@ -9,9 +9,6 @@ import qualified Dict import Prelude (Either (Left, Right)) import qualified Prelude -newtype Error = Error Text - deriving (Show, Eq) - -- | Decodes JSON into a flat dict. -- -- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}" @@ -19,10 +16,10 @@ newtype Error = Error Text -- -- > decodeIntoFlatDict "{\"a\": [1,2,3]}" -- > ==> Ok (Dict.fromList [("a[0]", 1), ("a[1]", 2), ("a[2]", 3)]) -decodeIntoFlatDict :: ByteString -> Result Error (Dict Text Aeson.Value) +decodeIntoFlatDict :: ByteString -> Result Text (Dict Text Aeson.Value) decodeIntoFlatDict content = - case Aeson.eitherDecode' content of - Left err -> Err (Error (Text.fromList err)) + case Aeson.eitherDecodeStrict content of + Left err -> Err (Text.fromList err) Right value -> case value of Aeson.Object obj -> @@ -30,7 +27,7 @@ decodeIntoFlatDict content = |> HM.foldlWithKey' (objectToDict identity) Dict.empty |> Ok Aeson.Array arr -> Ok (arrayToDict identity Dict.empty arr) - _ -> Err (Error "We can only parse top-level objects or arrays") + _ -> Err "We can only parse top-level objects or arrays" objectToDict :: (Text -> Text) -> Dict Text Aeson.Value -> Text -> Aeson.Value -> Dict Text Aeson.Value objectToDict toKey acc key val = diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index e42b6f02..c20dda7c 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -241,13 +241,10 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout Nothing -> Prelude.mempty Just statsCallback -> Producer.setCallback - ( Producer.statsCallback <| \content -> - case Aeson.decodeStrict content of - Nothing -> Debug.todo <| Data.Text.Encoding.decodeUtf8 content - Just stats -> do - log <- Platform.silentHandler - _ <- Task.attempt log (statsCallback stats) - Prelude.pure () + ( Producer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback (Stats.decode content)) + Prelude.pure () ) eitherProducer <- Producer.newProducer properties case eitherProducer of diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 7a217137..4af1c83c 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -3,60 +3,16 @@ -- -- See Kafka.Worker for the basic building blocks of a CLI app that will poll & -- process kafka messages -module Kafka.Stats - ( StatsCallback, - Stats (..), - Broker (..), - Rtt (..), - allStats, - ) -where +module Kafka.Stats (StatsCallback, Stats, decode) where import qualified Data.Aeson as Aeson +import qualified Data.Aeson.Extra as Aeson.Extra +import Data.ByteString (ByteString) import Dict (Dict) -import Set (Set) -import qualified Set -type StatsCallback = (Stats -> Task Text ()) +type Stats = Dict Text Aeson.Value -removePrefix :: Text -> Aeson.Options -removePrefix prefix = - Aeson.defaultOptions - { Aeson.fieldLabelModifier = - Aeson.camelTo2 '_' << Text.toList - << (if Text.isEmpty prefix then identity else Text.replace prefix "") - << Text.fromList - } +type StatsCallback = (Result Text Stats -> Task Text ()) -allStats :: Set Text -allStats = - Set.fromList - [ "name", - "client_id", - "type", - "ts", - "time", - "brokers.name", - "brokers.rtt.min", - "brokers.rtt.max", - "brokers.rtt.avg", - "brokers.rtt.sum", - "brokers.rtt.stddev", - "brokers.rtt.p50", - "brokers.rtt.p75", - "brokers.rtt.p90", - "brokers.rtt.p95", - "brokers.rtt.p99", - "brokers.rtt.p99_99", - "brokers.rtt.outofrange", - "brokers.rtt.hdrsize", - "brokers.rtt.cnt" - ] - --- | Currengly suggested strucure -data Value = IntValue Int | TextValue Text | BoolValue Bool -type Stats = Dict Key Value -data Key = Key (List Text) - --- | Type exposed for clients --- TODO statsDict :: Text Aeson.Value +decode :: ByteString -> Result Text Stats +decode raw = Aeson.Extra.decodeIntoFlatDict raw diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 145fb95e..c6262c1e 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -276,14 +276,10 @@ createConsumer Nothing -> Prelude.mempty Just statsCallback -> Consumer.setCallback - ( Consumer.statsCallback <| \content -> - case Aeson.decodeStrict content of - Nothing -> - Prelude.pure () - Just stats -> do - log <- Platform.silentHandler - _ <- Task.attempt log (statsCallback stats) - Prelude.pure () + ( Consumer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback (Stats.decode content)) + Prelude.pure () ) let subscription' = Consumer.topics [Consumer.TopicName (Kafka.unTopic topic)] From c1b1ee719a0034e6f472fc6eb8c1b604e9321cb6 Mon Sep 17 00:00:00 2001 From: ilias Date: Thu, 14 Apr 2022 12:10:45 +0200 Subject: [PATCH 15/36] Keep track of the Path, so not really a flat dict --- nri-kafka/src/Data/Aeson/Extra.hs | 31 ++++++++++++++----------- nri-kafka/src/Kafka/Stats.hs | 2 +- nri-kafka/test/Spec/Data/Aeson/Extra.hs | 28 +++++++++++----------- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index cb844133..77d301e2 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -1,4 +1,4 @@ -module Data.Aeson.Extra (decodeIntoFlatDict) where +module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..)) where import qualified Data.Aeson as Aeson import Data.ByteString (ByteString) @@ -9,6 +9,11 @@ import qualified Dict import Prelude (Either (Left, Right)) import qualified Prelude +data Segment = Key Text | Index Int + deriving (Ord, Eq, Show) + +type Path = List Segment + -- | Decodes JSON into a flat dict. -- -- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}" @@ -16,7 +21,7 @@ import qualified Prelude -- -- > decodeIntoFlatDict "{\"a\": [1,2,3]}" -- > ==> Ok (Dict.fromList [("a[0]", 1), ("a[1]", 2), ("a[2]", 3)]) -decodeIntoFlatDict :: ByteString -> Result Text (Dict Text Aeson.Value) +decodeIntoFlatDict :: ByteString -> Result Text (Dict Path Aeson.Value) decodeIntoFlatDict content = case Aeson.eitherDecodeStrict content of Left err -> Err (Text.fromList err) @@ -24,26 +29,26 @@ decodeIntoFlatDict content = case value of Aeson.Object obj -> obj - |> HM.foldlWithKey' (objectToDict identity) Dict.empty + |> HM.foldlWithKey' (\acc' k -> objectToDict [] acc' (Key k)) Dict.empty |> Ok - Aeson.Array arr -> Ok (arrayToDict identity Dict.empty arr) + Aeson.Array arr -> Ok (arrayToDict [] Dict.empty arr) _ -> Err "We can only parse top-level objects or arrays" -objectToDict :: (Text -> Text) -> Dict Text Aeson.Value -> Text -> Aeson.Value -> Dict Text Aeson.Value -objectToDict toKey acc key val = +objectToDict :: Path -> Dict Path Aeson.Value -> Segment -> Aeson.Value -> Dict Path Aeson.Value +objectToDict path acc segment val = case val of Aeson.Object obj -> - HM.foldlWithKey' (objectToDict (\subKey -> toKey (key ++ "." ++ subKey))) acc obj - Aeson.Array arr -> arrayToDict (\subKey -> toKey (key ++ subKey)) acc arr - _ -> Dict.insert (toKey key) val acc + HM.foldlWithKey' (\acc' k -> objectToDict (segment : path) acc' (Key k)) acc obj + Aeson.Array arr -> arrayToDict (segment : path) acc arr + _ -> Dict.insert (List.reverse (segment : path)) val acc -arrayToDict :: (Text -> Text) -> Dict Text Aeson.Value -> Aeson.Array -> Dict Text Aeson.Value -arrayToDict toKey = +arrayToDict :: Path -> Dict Path Aeson.Value -> Aeson.Array -> Dict Path Aeson.Value +arrayToDict path = Vector.ifoldl ( \acc index item -> objectToDict - (\index' -> toKey ("[" ++ index' ++ "]")) + path acc - (Text.fromInt (Prelude.fromIntegral index)) + (Index <| Prelude.fromIntegral index) item ) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 4af1c83c..adab844c 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -10,7 +10,7 @@ import qualified Data.Aeson.Extra as Aeson.Extra import Data.ByteString (ByteString) import Dict (Dict) -type Stats = Dict Text Aeson.Value +type Stats = Dict Aeson.Extra.Path Aeson.Value type StatsCallback = (Result Text Stats -> Task Text ()) diff --git a/nri-kafka/test/Spec/Data/Aeson/Extra.hs b/nri-kafka/test/Spec/Data/Aeson/Extra.hs index 39dade47..63737b42 100644 --- a/nri-kafka/test/Spec/Data/Aeson/Extra.hs +++ b/nri-kafka/test/Spec/Data/Aeson/Extra.hs @@ -1,7 +1,7 @@ module Spec.Data.Aeson.Extra (tests) where import qualified Data.Aeson as Aeson -import Data.Aeson.Extra (decodeIntoFlatDict) +import Data.Aeson.Extra (Segment (..), decodeIntoFlatDict) import qualified Dict import qualified Expect import Test @@ -13,15 +13,15 @@ tests = [ test "simple object" <| \() -> "{\"foo\": 1}" |> decodeIntoFlatDict - |> Expect.equal (Ok (Dict.fromList [("foo", Aeson.Number 1)])), + |> Expect.equal (Ok (Dict.fromList [([Key "foo"], Aeson.Number 1)])), test "with nested object" <| \() -> "{\"foo\": 1, \"bar\": { \"moo\": \"cow\" }}" |> decodeIntoFlatDict |> Expect.equal ( Ok ( Dict.fromList - [ ("foo", Aeson.Number 1), - ("bar.moo", Aeson.String "cow") + [ ([Key "foo"], Aeson.Number 1), + ([Key "bar", Key "moo"], Aeson.String "cow") ] ) ), @@ -31,9 +31,9 @@ tests = |> Expect.equal ( Ok ( Dict.fromList - [ ("foo", Aeson.Number 1), - ("bar.moo", Aeson.String "cow"), - ("bar.hello.world", Aeson.Bool True) + [ ([Key "foo"], Aeson.Number 1), + ([Key "bar", Key "moo"], Aeson.String "cow"), + ([Key "bar", Key "hello", Key "world"], Aeson.Bool True) ] ) ), @@ -43,10 +43,10 @@ tests = |> Expect.equal ( Ok ( Dict.fromList - [ ("foo", Aeson.Number 1), - ("bar[0]", Aeson.Number 1), - ("bar[1]", Aeson.Number 2), - ("bar[2]", Aeson.Number 3) + [ ([Key "foo"], Aeson.Number 1), + ([Key "bar", Index 0], Aeson.Number 1), + ([Key "bar", Index 1], Aeson.Number 2), + ([Key "bar", Index 2], Aeson.Number 3) ] ) ), @@ -56,9 +56,9 @@ tests = |> Expect.equal ( Ok ( Dict.fromList - [ ("[0]", Aeson.Number 1), - ("[1]", Aeson.Number 2), - ("[2]", Aeson.Number 3) + [ ([Index 0], Aeson.Number 1), + ([Index 1], Aeson.Number 2), + ([Index 2], Aeson.Number 3) ] ) ) From 3c0c5a1ade94c3cf163e12a74016fe1c10811c02 Mon Sep 17 00:00:00 2001 From: ilias Date: Thu, 14 Apr 2022 12:41:52 +0200 Subject: [PATCH 16/36] Add pathToText helper --- nri-kafka/src/Data/Aeson/Extra.hs | 26 ++++++++++++++++++++- nri-kafka/test/Spec/Data/Aeson/Extra.hs | 31 ++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index 77d301e2..af81c424 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -1,4 +1,4 @@ -module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..)) where +module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..), pathToText) where import qualified Data.Aeson as Aeson import Data.ByteString (ByteString) @@ -14,6 +14,30 @@ data Segment = Key Text | Index Int type Path = List Segment +-- | Turns a path into a json-path like text +-- +-- > pathToText [Key "foo", Index 0, Key "bar"] +-- > ==> "foo[0].bar" +pathToText :: Path -> Text +pathToText path = + case path of + [] -> "" + segment : [] -> segmentToText segment + segment : next : rest -> + segmentToText segment ++ separator next ++ pathToText (next : rest) + +segmentToText :: Segment -> Text +segmentToText (Key k) = + k + |> Text.replace "." "\\." + |> Text.replace "[" "\\[" + |> Text.replace "]" "\\]" +segmentToText (Index idx) = "[" ++ Text.fromInt idx ++ "]" + +separator :: Segment -> Text +separator (Key _) = "." +separator (Index _) = "" + -- | Decodes JSON into a flat dict. -- -- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}" diff --git a/nri-kafka/test/Spec/Data/Aeson/Extra.hs b/nri-kafka/test/Spec/Data/Aeson/Extra.hs index 63737b42..2ce24d04 100644 --- a/nri-kafka/test/Spec/Data/Aeson/Extra.hs +++ b/nri-kafka/test/Spec/Data/Aeson/Extra.hs @@ -1,13 +1,21 @@ module Spec.Data.Aeson.Extra (tests) where import qualified Data.Aeson as Aeson -import Data.Aeson.Extra (Segment (..), decodeIntoFlatDict) +import Data.Aeson.Extra (Segment (..), decodeIntoFlatDict, pathToText) import qualified Dict import qualified Expect import Test tests :: Test tests = + describe + "Data.Aeson.Extra" + [ decodeIntoFlatDictTest, + pathToTextTest + ] + +decodeIntoFlatDictTest :: Test +decodeIntoFlatDictTest = describe "decodeIntoFlatDict" [ test "simple object" <| \() -> @@ -63,3 +71,24 @@ tests = ) ) ] + +pathToTextTest :: Test +pathToTextTest = + describe + "pathToText" + [ test "keys get separated" <| \() -> + pathToText [Key "foo", Key "bar"] + |> Expect.equal "foo.bar", + test "indexes have brackets" <| \() -> + pathToText [Index 0] + |> Expect.equal "[0]", + test "indexes within a key" <| \() -> + pathToText [Key "foo", Index 0, Index 1] + |> Expect.equal "foo[0][1]", + test "keys within an index" <| \() -> + pathToText [Index 0, Key "foo", Key "bar"] + |> Expect.equal "[0].foo.bar", + test "keys get escaped" <| \() -> + pathToText [Key "foo.bar[0]"] + |> Expect.equal "foo\\.bar\\[0\\]" + ] From a0004d1792612bf322afe3af374010d63ee61974 Mon Sep 17 00:00:00 2001 From: ilias Date: Thu, 14 Apr 2022 12:44:04 +0200 Subject: [PATCH 17/36] Support top-level values --- nri-kafka/src/Data/Aeson/Extra.hs | 2 +- nri-kafka/test/Spec/Data/Aeson/Extra.hs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index af81c424..3345baf5 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -56,7 +56,7 @@ decodeIntoFlatDict content = |> HM.foldlWithKey' (\acc' k -> objectToDict [] acc' (Key k)) Dict.empty |> Ok Aeson.Array arr -> Ok (arrayToDict [] Dict.empty arr) - _ -> Err "We can only parse top-level objects or arrays" + _ -> Ok (Dict.singleton [] value) objectToDict :: Path -> Dict Path Aeson.Value -> Segment -> Aeson.Value -> Dict Path Aeson.Value objectToDict path acc segment val = diff --git a/nri-kafka/test/Spec/Data/Aeson/Extra.hs b/nri-kafka/test/Spec/Data/Aeson/Extra.hs index 2ce24d04..9ba496a7 100644 --- a/nri-kafka/test/Spec/Data/Aeson/Extra.hs +++ b/nri-kafka/test/Spec/Data/Aeson/Extra.hs @@ -69,7 +69,11 @@ decodeIntoFlatDictTest = ([Index 2], Aeson.Number 3) ] ) - ) + ), + test "with top-level value" <| \() -> + "true" + |> decodeIntoFlatDict + |> Expect.equal (Ok (Dict.fromList [([], Aeson.Bool True)])) ] pathToTextTest :: Test From 84a8d28d8def556d45a0360fc84208117eac2eea Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 13:21:57 +0200 Subject: [PATCH 18/36] Less casing --- nri-kafka/src/Data/Aeson/Extra.hs | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index 3345baf5..dfa055a1 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -49,30 +49,27 @@ decodeIntoFlatDict :: ByteString -> Result Text (Dict Path Aeson.Value) decodeIntoFlatDict content = case Aeson.eitherDecodeStrict content of Left err -> Err (Text.fromList err) - Right value -> - case value of - Aeson.Object obj -> - obj - |> HM.foldlWithKey' (\acc' k -> objectToDict [] acc' (Key k)) Dict.empty - |> Ok - Aeson.Array arr -> Ok (arrayToDict [] Dict.empty arr) - _ -> Ok (Dict.singleton [] value) + Right value -> Ok (valueToDict [] Dict.empty Nothing value) -objectToDict :: Path -> Dict Path Aeson.Value -> Segment -> Aeson.Value -> Dict Path Aeson.Value -objectToDict path acc segment val = - case val of - Aeson.Object obj -> - HM.foldlWithKey' (\acc' k -> objectToDict (segment : path) acc' (Key k)) acc obj - Aeson.Array arr -> arrayToDict (segment : path) acc arr - _ -> Dict.insert (List.reverse (segment : path)) val acc +valueToDict :: Path -> Dict Path Aeson.Value -> Maybe Segment -> Aeson.Value -> Dict Path Aeson.Value +valueToDict path acc maybeSegment val = + let newPath = + case maybeSegment of + Nothing -> path + Just segment -> segment : path + in case val of + Aeson.Object obj -> + HM.foldlWithKey' (\acc' k -> valueToDict newPath acc' (Just (Key k))) acc obj + Aeson.Array arr -> arrayToDict newPath acc arr + _ -> Dict.insert (List.reverse newPath) val acc arrayToDict :: Path -> Dict Path Aeson.Value -> Aeson.Array -> Dict Path Aeson.Value arrayToDict path = Vector.ifoldl ( \acc index item -> - objectToDict + valueToDict path acc - (Index <| Prelude.fromIntegral index) + (Just (Index (Prelude.fromIntegral index))) item ) From 2e0efe7b462dd9b66eff09a6683ebe7fca273aae Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 13:35:52 +0200 Subject: [PATCH 19/36] Simpler function (maybe slightly slower). Does it matter? --- nri-kafka/src/Data/Aeson/Extra.hs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index dfa055a1..5a7b4ae5 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -49,27 +49,31 @@ decodeIntoFlatDict :: ByteString -> Result Text (Dict Path Aeson.Value) decodeIntoFlatDict content = case Aeson.eitherDecodeStrict content of Left err -> Err (Text.fromList err) - Right value -> Ok (valueToDict [] Dict.empty Nothing value) + Right value -> Ok (valueToDict [] Nothing value) -valueToDict :: Path -> Dict Path Aeson.Value -> Maybe Segment -> Aeson.Value -> Dict Path Aeson.Value -valueToDict path acc maybeSegment val = +valueToDict :: Path -> Maybe Segment -> Aeson.Value -> Dict Path Aeson.Value +valueToDict path maybeSegment val = let newPath = case maybeSegment of Nothing -> path Just segment -> segment : path in case val of + Aeson.Array arr -> arrayToDict newPath arr Aeson.Object obj -> - HM.foldlWithKey' (\acc' k -> valueToDict newPath acc' (Just (Key k))) acc obj - Aeson.Array arr -> arrayToDict newPath acc arr - _ -> Dict.insert (List.reverse newPath) val acc + HM.foldlWithKey' + ( \acc k v -> + valueToDict newPath (Just (Key k)) v + |> Dict.union acc + ) + Dict.empty + obj + _ -> Dict.singleton (List.reverse newPath) val -arrayToDict :: Path -> Dict Path Aeson.Value -> Aeson.Array -> Dict Path Aeson.Value +arrayToDict :: Path -> Aeson.Array -> Dict Path Aeson.Value arrayToDict path = Vector.ifoldl ( \acc index item -> - valueToDict - path - acc - (Just (Index (Prelude.fromIntegral index))) - item + valueToDict path (Just (Index (Prelude.fromIntegral index))) item + |> Dict.union acc ) + Dict.empty From 4c2161d44f2bbe3fd8a7417b9d1fcf1817b90b6b Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 13:38:38 +0200 Subject: [PATCH 20/36] More cleanup --- nri-kafka/src/Data/Aeson/Extra.hs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index 5a7b4ae5..47521726 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -49,10 +49,10 @@ decodeIntoFlatDict :: ByteString -> Result Text (Dict Path Aeson.Value) decodeIntoFlatDict content = case Aeson.eitherDecodeStrict content of Left err -> Err (Text.fromList err) - Right value -> Ok (valueToDict [] Nothing value) + Right value -> Ok (valueToDict [] value Nothing) -valueToDict :: Path -> Maybe Segment -> Aeson.Value -> Dict Path Aeson.Value -valueToDict path maybeSegment val = +valueToDict :: Path -> Aeson.Value -> Maybe Segment -> Dict Path Aeson.Value +valueToDict path val maybeSegment = let newPath = case maybeSegment of Nothing -> path @@ -62,7 +62,8 @@ valueToDict path maybeSegment val = Aeson.Object obj -> HM.foldlWithKey' ( \acc k v -> - valueToDict newPath (Just (Key k)) v + Just (Key k) + |> valueToDict newPath v |> Dict.union acc ) Dict.empty @@ -73,7 +74,9 @@ arrayToDict :: Path -> Aeson.Array -> Dict Path Aeson.Value arrayToDict path = Vector.ifoldl ( \acc index item -> - valueToDict path (Just (Index (Prelude.fromIntegral index))) item + Index (Prelude.fromIntegral index) + |> Just + |> valueToDict path item |> Dict.union acc ) Dict.empty From 3435035b46d43d9bf2d58fe6fd28472c9b278e94 Mon Sep 17 00:00:00 2001 From: ilias Date: Thu, 14 Apr 2022 13:49:52 +0200 Subject: [PATCH 21/36] Reexpose Path and pathToText --- nri-kafka/src/Kafka/Stats.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index adab844c..4ecd4162 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -3,9 +3,10 @@ -- -- See Kafka.Worker for the basic building blocks of a CLI app that will poll & -- process kafka messages -module Kafka.Stats (StatsCallback, Stats, decode) where +module Kafka.Stats (StatsCallback, Stats, decode, Path, pathToText) where import qualified Data.Aeson as Aeson +import Data.Aeson.Extra (Path, pathToText) import qualified Data.Aeson.Extra as Aeson.Extra import Data.ByteString (ByteString) import Dict (Dict) From f8dff2475505a466cd3372dc0810133f3a3d739a Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 14:24:45 +0200 Subject: [PATCH 22/36] Add topLevel metric types --- nri-kafka/src/Kafka/Stats.hs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 4ecd4162..d0ff5d89 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -10,6 +10,7 @@ import Data.Aeson.Extra (Path, pathToText) import qualified Data.Aeson.Extra as Aeson.Extra import Data.ByteString (ByteString) import Dict (Dict) +import qualified Dict type Stats = Dict Aeson.Extra.Path Aeson.Value @@ -17,3 +18,31 @@ type StatsCallback = (Result Text Stats -> Task Text ()) decode :: ByteString -> Result Text Stats decode raw = Aeson.Extra.decodeIntoFlatDict raw + +data Value = StringValue | IntValue | IntGauge + +topLevel :: Dict Text Value +topLevel = + Dict.fromList + [ ("name", StringValue), + ("client_id", StringValue), + ("type", StringValue), + ("ts", IntValue), + ("time", IntValue), + ("age", IntValue), + ("replyq", IntGauge), + ("msg_cnt", IntGauge), + ("msg_size", IntGauge), + ("msg_max", IntValue), + ("msg_size_max", IntValue), + ("tx", IntValue), + ("tx_bytes", IntValue), + ("rx", IntValue), + ("rx_bytes", IntValue), + ("txmsgs", IntValue), + ("txmsg_bytes", IntValue), + ("rxmsgs", IntValue), + ("rxmsg_bytes", IntValue), + ("simple_cnt", IntGauge), + ("metadata_cache_cnt", IntGauge) + ] From 76aee8f315c5153491452ae5f3228eba59aec485 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 14:45:35 +0200 Subject: [PATCH 23/36] Attach metric type info to values --- nri-kafka/src/Kafka/Stats.hs | 227 +++++++++++++++++++++++++++++++---- 1 file changed, 201 insertions(+), 26 deletions(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index d0ff5d89..97f68335 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -3,7 +3,7 @@ -- -- See Kafka.Worker for the basic building blocks of a CLI app that will poll & -- process kafka messages -module Kafka.Stats (StatsCallback, Stats, decode, Path, pathToText) where +module Kafka.Stats (StatsCallback, Stats, decode, Path, pathToText, Metric (..)) where import qualified Data.Aeson as Aeson import Data.Aeson.Extra (Path, pathToText) @@ -11,38 +11,213 @@ import qualified Data.Aeson.Extra as Aeson.Extra import Data.ByteString (ByteString) import Dict (Dict) import qualified Dict +import qualified Tuple +import qualified Prelude -type Stats = Dict Aeson.Extra.Path Aeson.Value +type Stats = Dict Aeson.Extra.Path Metric type StatsCallback = (Result Text Stats -> Task Text ()) decode :: ByteString -> Result Text Stats -decode raw = Aeson.Extra.decodeIntoFlatDict raw +decode raw = + case Aeson.Extra.decodeIntoFlatDict raw of + Err err -> Err err + Ok stats -> + stats + |> Dict.toList + |> List.map + ( \(path, value) -> + toMetric path value + |> Result.map (Tuple.pair path) + ) + |> Prelude.sequence + |> Result.map Dict.fromList -data Value = StringValue | IntValue | IntGauge +data Metric = StringMetric Text | IntMetric Int | IntGauge Int | BoolMetric Bool -topLevel :: Dict Text Value +toMetric :: Path -> Aeson.Value -> Result Text Metric +toMetric path value = + case List.reverse path of + (Aeson.Extra.Key last) : _ -> + case Dict.get last allMetrics of + Nothing -> Err ("Unknown metric: " ++ pathToText path) + Just metricType -> + case (metricType, value) of + (StringType, Aeson.String str) -> Ok (StringMetric str) + (IntType, Aeson.Number num) -> Ok (IntMetric (Prelude.floor num)) + (IntGaugeType, Aeson.Number num) -> Ok (IntGauge (Prelude.floor num)) + (BoolType, Aeson.Bool bool) -> Ok (BoolMetric bool) + _ -> Err ("Metric type mismatch: " ++ pathToText path) + _ -> Err "Empty path" + +data MetricType = StringType | IntType | IntGaugeType | BoolType + +allMetrics :: Dict Text MetricType +allMetrics = + List.foldl + Dict.union + Dict.empty + [ topLevel, + brokers, + windowStats, + brokersToppars, + topics, + partitions, + cgrp, + eos + ] + +topLevel :: Dict Text MetricType topLevel = Dict.fromList - [ ("name", StringValue), - ("client_id", StringValue), - ("type", StringValue), - ("ts", IntValue), - ("time", IntValue), - ("age", IntValue), - ("replyq", IntGauge), - ("msg_cnt", IntGauge), - ("msg_size", IntGauge), - ("msg_max", IntValue), - ("msg_size_max", IntValue), - ("tx", IntValue), - ("tx_bytes", IntValue), - ("rx", IntValue), - ("rx_bytes", IntValue), - ("txmsgs", IntValue), - ("txmsg_bytes", IntValue), - ("rxmsgs", IntValue), - ("rxmsg_bytes", IntValue), - ("simple_cnt", IntGauge), - ("metadata_cache_cnt", IntGauge) + [ ("name", StringType), + ("client_id", StringType), + ("type", StringType), + ("ts", IntType), + ("time", IntType), + ("age", IntType), + ("replyq", IntGaugeType), + ("msg_cnt", IntGaugeType), + ("msg_size", IntGaugeType), + ("msg_max", IntType), + ("msg_size_max", IntType), + ("tx", IntType), + ("tx_bytes", IntType), + ("rx", IntType), + ("rx_bytes", IntType), + ("txmsgs", IntType), + ("txmsg_bytes", IntType), + ("rxmsgs", IntType), + ("rxmsg_bytes", IntType), + ("simple_cnt", IntGaugeType), + ("metadata_cache_cnt", IntGaugeType) + ] + +brokers :: Dict Text MetricType +brokers = + Dict.fromList + [ ("name", StringType), + ("nodeid", IntType), + ("nodename", StringType), + ("source", StringType), + ("state", StringType), + ("stateage", IntGaugeType), + ("outbuf_cnt", IntGaugeType), + ("outbuf_msg_cnt", IntGaugeType), + ("waitresp_cnt", IntGaugeType), + ("waitresp_msg_cnt", IntGaugeType), + ("tx", IntType), + ("txbytes", IntType), + ("txerrs", IntType), + ("txretries", IntType), + ("txidle", IntType), + ("req_timeouts", IntType), + ("rx", IntType), + ("rxbytes", IntType), + ("rxerrs", IntType), + ("rxcorriderrs", IntType), + ("rxpartial", IntType), + ("rxidle", IntType), + ("zbuf_grow", IntType), + ("buf_grow", IntType), + ("wakeups", IntType), + ("connects", IntType), + ("disconnects", IntType) + ] + +windowStats :: Dict Text MetricType +windowStats = + Dict.fromList + [ ("min", IntGaugeType), + ("max", IntGaugeType), + ("avg", IntGaugeType), + ("sum", IntGaugeType), + ("cnt", IntGaugeType), + ("stddev", IntGaugeType), + ("hdrsize", IntGaugeType), + ("p50", IntGaugeType), + ("p75", IntGaugeType), + ("p90", IntGaugeType), + ("p95", IntGaugeType), + ("p99", IntGaugeType), + ("p99_99", IntGaugeType), + ("outofrange", IntGaugeType) + ] + +brokersToppars :: Dict Text MetricType +brokersToppars = + Dict.fromList + [ ("topic", StringType), + ("partition", IntType) + ] + +topics :: Dict Text MetricType +topics = + Dict.fromList + [ ("topic", StringType), + ("age ", IntGaugeType), + ("metadata_age", IntGaugeType) + ] + +partitions :: Dict Text MetricType +partitions = + Dict.fromList + [ ("partition", IntType), + ("broker", IntType), + ("leader", IntType), + ("desired", BoolType), + ("unknown", BoolType), + ("msgq_cnt", IntGaugeType), + ("msgq_bytes", IntGaugeType), + ("xmit_msgq_cnt", IntGaugeType), + ("xmit_msgq_bytes", IntGaugeType), + ("fetchq_cnt", IntGaugeType), + ("fetchq_size", IntGaugeType), + ("fetch_state", StringType), + ("query_offset", IntGaugeType), + ("next_offset", IntGaugeType), + ("app_offset", IntGaugeType), + ("stored_offset", IntGaugeType), + ("committed_offset", IntGaugeType), + ("eof_offset", IntGaugeType), + ("lo_offset", IntGaugeType), + ("hi_offset", IntGaugeType), + ("ls_offset", IntGaugeType), + ("consumer_lag", IntGaugeType), + ("consumer_lag_stored", IntGaugeType), + ("txmsgs", IntType), + ("txbytes", IntType), + ("rxmsgs", IntType), + ("rxbytes", IntType), + ("msgs", IntType), + ("rx_ver_drops", IntType), + ("msgs_inflight", IntGaugeType), + ("next_ack_seq", IntGaugeType), + ("next_err_seq", IntGaugeType), + ("acked_msgid", IntType) + ] + +cgrp :: Dict Text MetricType +cgrp = + Dict.fromList + [ ("state", StringType), + ("stateage", IntGaugeType), + ("join_state", StringType), + ("rebalance_age", IntGaugeType), + ("rebalance_cnt", IntType), + ("rebalance_reason", StringType), + ("assignment_size", IntGaugeType) + ] + +eos :: Dict Text MetricType +eos = + Dict.fromList + [ ("idemp_state", StringType), + ("idemp_stateage", IntGaugeType), + ("txn_state", StringType), + ("txn_stateage", IntGaugeType), + ("txn_may_enq", BoolType), + ("producer_id", IntGaugeType), + ("producer_epoch", IntGaugeType), + ("epoch_cnt", IntType) ] From 2a7bdd170bc05b67b187c560f11970ca9895168d Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 14:50:41 +0200 Subject: [PATCH 24/36] Add show instance to metric --- nri-kafka/src/Kafka/Stats.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 97f68335..1f7c79b9 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -34,6 +34,7 @@ decode raw = |> Result.map Dict.fromList data Metric = StringMetric Text | IntMetric Int | IntGauge Int | BoolMetric Bool + deriving (Show) toMetric :: Path -> Aeson.Value -> Result Text Metric toMetric path value = From 2482a3af177dfafa2e12072dadbba82fc0014b10 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 15:48:02 +0200 Subject: [PATCH 25/36] Special case `req` --- nri-kafka/src/Kafka/Stats.hs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 1f7c79b9..01407750 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -41,16 +41,27 @@ toMetric path value = case List.reverse path of (Aeson.Extra.Key last) : _ -> case Dict.get last allMetrics of - Nothing -> Err ("Unknown metric: " ++ pathToText path) - Just metricType -> - case (metricType, value) of - (StringType, Aeson.String str) -> Ok (StringMetric str) - (IntType, Aeson.Number num) -> Ok (IntMetric (Prelude.floor num)) - (IntGaugeType, Aeson.Number num) -> Ok (IntGauge (Prelude.floor num)) - (BoolType, Aeson.Bool bool) -> Ok (BoolMetric bool) - _ -> Err ("Metric type mismatch: " ++ pathToText path) + Nothing -> + -- Get second to last segment of path for `req` case. + case List.tail (List.reverse path) of + Just ((Aeson.Extra.Key secondToLast) : _) -> + case Dict.get secondToLast allMetrics of + Just metricType -> metricTypeToMetric metricType value path + Nothing -> Err ("Unknown metric type: " ++ pathToText path) + _ -> + Err ("Unknown metric: " ++ pathToText path) + Just metricType -> metricTypeToMetric metricType value path _ -> Err "Empty path" +metricTypeToMetric :: MetricType -> Aeson.Value -> Path -> Result Text Metric +metricTypeToMetric metricType value path = + case (metricType, value) of + (StringType, Aeson.String str) -> Ok (StringMetric str) + (IntType, Aeson.Number num) -> Ok (IntMetric (Prelude.floor num)) + (IntGaugeType, Aeson.Number num) -> Ok (IntGauge (Prelude.floor num)) + (BoolType, Aeson.Bool bool) -> Ok (BoolMetric bool) + _ -> Err ("Metric type mismatch: " ++ pathToText path) + data MetricType = StringType | IntType | IntGaugeType | BoolType allMetrics :: Dict Text MetricType From 5cd61aa311ae1373ee4c80850d7a14510741f7c2 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 15:48:40 +0200 Subject: [PATCH 26/36] Cleanup --- nri-kafka/src/Kafka/Stats.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 01407750..4eaa50dc 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -14,7 +14,7 @@ import qualified Dict import qualified Tuple import qualified Prelude -type Stats = Dict Aeson.Extra.Path Metric +type Stats = Dict Path Metric type StatsCallback = (Result Text Stats -> Task Text ()) From 90b65c473c5042972b3af87155f0039d447a7eac Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Thu, 14 Apr 2022 16:05:17 +0200 Subject: [PATCH 27/36] Add req to known metrics --- nri-kafka/src/Kafka/Stats.hs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 4eaa50dc..1ef1cb69 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -134,7 +134,9 @@ brokers = ("buf_grow", IntType), ("wakeups", IntType), ("connects", IntType), - ("disconnects", IntType) + ("disconnects", IntType), + -- This is an object that has custom entries. We are special casing it in metricTypeToMetric + ("req", IntType) ] windowStats :: Dict Text MetricType From ba38d08fb9a04412cfc0d034026dac32e8d5202b Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 14 Apr 2022 11:54:27 -0300 Subject: [PATCH 28/36] There's a typo in rdkafka --- nri-kafka/src/Kafka/Stats.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 1ef1cb69..b348f514 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -193,6 +193,7 @@ partitions = ("app_offset", IntGaugeType), ("stored_offset", IntGaugeType), ("committed_offset", IntGaugeType), + ("commited_offset", IntGaugeType), ("eof_offset", IntGaugeType), ("lo_offset", IntGaugeType), ("hi_offset", IntGaugeType), From debf3d3ff9f48dba0d296f8987df8937f3218a83 Mon Sep 17 00:00:00 2001 From: ilias Date: Thu, 21 Apr 2022 11:58:06 +0200 Subject: [PATCH 29/36] Remove pathToText It can be moved into hqe, where we actually need a less general implementation to play nice with prometheus --- nri-kafka/src/Data/Aeson/Extra.hs | 26 +------------------------ nri-kafka/src/Kafka/Stats.hs | 11 ++++++----- nri-kafka/test/Spec/Data/Aeson/Extra.hs | 26 ++----------------------- 3 files changed, 9 insertions(+), 54 deletions(-) diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs index 47521726..b48b423b 100644 --- a/nri-kafka/src/Data/Aeson/Extra.hs +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -1,4 +1,4 @@ -module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..), pathToText) where +module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..)) where import qualified Data.Aeson as Aeson import Data.ByteString (ByteString) @@ -14,30 +14,6 @@ data Segment = Key Text | Index Int type Path = List Segment --- | Turns a path into a json-path like text --- --- > pathToText [Key "foo", Index 0, Key "bar"] --- > ==> "foo[0].bar" -pathToText :: Path -> Text -pathToText path = - case path of - [] -> "" - segment : [] -> segmentToText segment - segment : next : rest -> - segmentToText segment ++ separator next ++ pathToText (next : rest) - -segmentToText :: Segment -> Text -segmentToText (Key k) = - k - |> Text.replace "." "\\." - |> Text.replace "[" "\\[" - |> Text.replace "]" "\\]" -segmentToText (Index idx) = "[" ++ Text.fromInt idx ++ "]" - -separator :: Segment -> Text -separator (Key _) = "." -separator (Index _) = "" - -- | Decodes JSON into a flat dict. -- -- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}" diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index b348f514..7e431e88 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -3,12 +3,13 @@ -- -- See Kafka.Worker for the basic building blocks of a CLI app that will poll & -- process kafka messages -module Kafka.Stats (StatsCallback, Stats, decode, Path, pathToText, Metric (..)) where +module Kafka.Stats (StatsCallback, Stats, decode, Path, Segment(..), Metric (..)) where import qualified Data.Aeson as Aeson -import Data.Aeson.Extra (Path, pathToText) +import Data.Aeson.Extra (Path, Segment(..)) import qualified Data.Aeson.Extra as Aeson.Extra import Data.ByteString (ByteString) +import qualified Data.Text import Dict (Dict) import qualified Dict import qualified Tuple @@ -47,9 +48,9 @@ toMetric path value = Just ((Aeson.Extra.Key secondToLast) : _) -> case Dict.get secondToLast allMetrics of Just metricType -> metricTypeToMetric metricType value path - Nothing -> Err ("Unknown metric type: " ++ pathToText path) + Nothing -> Err ("Unknown metric type: " ++ Data.Text.pack (Prelude.show path)) _ -> - Err ("Unknown metric: " ++ pathToText path) + Err ("Unknown metric: " ++ Data.Text.pack (Prelude.show path)) Just metricType -> metricTypeToMetric metricType value path _ -> Err "Empty path" @@ -60,7 +61,7 @@ metricTypeToMetric metricType value path = (IntType, Aeson.Number num) -> Ok (IntMetric (Prelude.floor num)) (IntGaugeType, Aeson.Number num) -> Ok (IntGauge (Prelude.floor num)) (BoolType, Aeson.Bool bool) -> Ok (BoolMetric bool) - _ -> Err ("Metric type mismatch: " ++ pathToText path) + _ -> Err ("Metric type mismatch: " ++ Data.Text.pack (Prelude.show path)) data MetricType = StringType | IntType | IntGaugeType | BoolType diff --git a/nri-kafka/test/Spec/Data/Aeson/Extra.hs b/nri-kafka/test/Spec/Data/Aeson/Extra.hs index 9ba496a7..bbafa378 100644 --- a/nri-kafka/test/Spec/Data/Aeson/Extra.hs +++ b/nri-kafka/test/Spec/Data/Aeson/Extra.hs @@ -1,7 +1,7 @@ module Spec.Data.Aeson.Extra (tests) where import qualified Data.Aeson as Aeson -import Data.Aeson.Extra (Segment (..), decodeIntoFlatDict, pathToText) +import Data.Aeson.Extra (Segment (..), decodeIntoFlatDict) import qualified Dict import qualified Expect import Test @@ -10,8 +10,7 @@ tests :: Test tests = describe "Data.Aeson.Extra" - [ decodeIntoFlatDictTest, - pathToTextTest + [ decodeIntoFlatDictTest ] decodeIntoFlatDictTest :: Test @@ -75,24 +74,3 @@ decodeIntoFlatDictTest = |> decodeIntoFlatDict |> Expect.equal (Ok (Dict.fromList [([], Aeson.Bool True)])) ] - -pathToTextTest :: Test -pathToTextTest = - describe - "pathToText" - [ test "keys get separated" <| \() -> - pathToText [Key "foo", Key "bar"] - |> Expect.equal "foo.bar", - test "indexes have brackets" <| \() -> - pathToText [Index 0] - |> Expect.equal "[0]", - test "indexes within a key" <| \() -> - pathToText [Key "foo", Index 0, Index 1] - |> Expect.equal "foo[0][1]", - test "keys within an index" <| \() -> - pathToText [Index 0, Key "foo", Key "bar"] - |> Expect.equal "[0].foo.bar", - test "keys get escaped" <| \() -> - pathToText [Key "foo.bar[0]"] - |> Expect.equal "foo\\.bar\\[0\\]" - ] From 612f86cc7e0d959dd26910e5d6d5f753f61bc482 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 11:33:00 +0200 Subject: [PATCH 30/36] Formatting --- nri-kafka/src/Kafka/Stats.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs index 7e431e88..66485bd2 100644 --- a/nri-kafka/src/Kafka/Stats.hs +++ b/nri-kafka/src/Kafka/Stats.hs @@ -3,10 +3,10 @@ -- -- See Kafka.Worker for the basic building blocks of a CLI app that will poll & -- process kafka messages -module Kafka.Stats (StatsCallback, Stats, decode, Path, Segment(..), Metric (..)) where +module Kafka.Stats (StatsCallback, Stats, decode, Path, Segment (..), Metric (..)) where import qualified Data.Aeson as Aeson -import Data.Aeson.Extra (Path, Segment(..)) +import Data.Aeson.Extra (Path, Segment (..)) import qualified Data.Aeson.Extra as Aeson.Extra import Data.ByteString (ByteString) import qualified Data.Text From 196c78c6eba6ea61847ffeca058f37bb04d87e88 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 11:33:33 +0200 Subject: [PATCH 31/36] Add todo --- nri-kafka/src/Kafka.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index c20dda7c..d98decdc 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -234,6 +234,7 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference ("enable.idempotence", "true"), ("acks", "all"), + -- TODO make this configurable? ("statistics.interval.ms", "1000") ] ) From 9d0c4d427a2ab0c66d993f61b0087924548d9d3b Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 11:34:35 +0200 Subject: [PATCH 32/36] Set stats interval to consumer --- nri-kafka/src/Kafka/Worker/Internal.hs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index c6262c1e..641f2815 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -270,7 +270,10 @@ createConsumer ++ Consumer.compression Consumer.Snappy ++ Consumer.extraProps ( Dict.fromList - [("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs))] + [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), + -- TODO make this configurable? + ("statistics.interval.ms", "1000") + ] ) ++ case maybeStatsCallback of Nothing -> Prelude.mempty From 11efc0fc4c7504f63ba321ac002d3bfc11d0bef9 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 14:24:14 +0200 Subject: [PATCH 33/36] Add settings for statistics.interval.ms --- nri-kafka/src/Kafka.hs | 5 ++--- nri-kafka/src/Kafka/Settings.hs | 22 ++++++++++++++++++++-- nri-kafka/src/Kafka/Worker/Internal.hs | 6 +++--- nri-kafka/src/Kafka/Worker/Settings.hs | 20 +++++++++++++++++++- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index d98decdc..f732faac 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -217,7 +217,7 @@ doSTM doAnything stm = |> Platform.doAnything doAnything mkProducer :: Settings.Settings -> Maybe Stats.StatsCallback -> Prelude.IO Producer.KafkaProducer -mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages} maybeStatsCallback = do +mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages, Settings.statisticsIntervalMs} maybeStatsCallback = do let properties = Producer.brokersList brokerAddresses ++ Producer.sendTimeout deliveryTimeout @@ -234,8 +234,7 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference ("enable.idempotence", "true"), ("acks", "all"), - -- TODO make this configurable? - ("statistics.interval.ms", "1000") + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) ] ) ++ case maybeStatsCallback of diff --git a/nri-kafka/src/Kafka/Settings.hs b/nri-kafka/src/Kafka/Settings.hs index 86b53062..2640f5d6 100644 --- a/nri-kafka/src/Kafka/Settings.hs +++ b/nri-kafka/src/Kafka/Settings.hs @@ -5,6 +5,7 @@ module Kafka.Settings BatchNumMessages, unBatchNumMessages, exampleBatchNumMessages, + StatisticsIntervalMs (..), ) where @@ -21,12 +22,18 @@ data Settings = Settings -- | Message delivery timeout. See hw-kafka's documentation for more info deliveryTimeout :: Kafka.Producer.Timeout, -- | Number of messages to batch together before sending to Kafka. - batchNumMessages :: BatchNumMessages + batchNumMessages :: BatchNumMessages, + -- | librdkafka statistics emit interval. The application also needs to + -- register a stats callback using rd_kafka_conf_set_stats_cb(). The + -- granularity is 1000ms. A value of 0 disables statistics. + statisticsIntervalMs :: StatisticsIntervalMs } -- | Number of messages to batch together before sending to Kafka. newtype BatchNumMessages = BatchNumMessages {unBatchNumMessages :: Int} +newtype StatisticsIntervalMs = StatisticsIntervalMs {unStatisticsIntervalMs :: Int} + -- | example BatchNumMessages to use in tests exampleBatchNumMessages :: BatchNumMessages exampleBatchNumMessages = BatchNumMessages 1 @@ -38,12 +45,13 @@ exampleBatchNumMessages = BatchNumMessages 1 -- KAFKA_BATCH_SIZE=10000 decoder :: Environment.Decoder Settings decoder = - map4 + map5 Settings Internal.decoderBrokerAddresses Internal.decoderKafkaLogLevel decoderDeliveryTimeout decoderBatchNumMessages + decoderStatisticsIntervalMs decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout decoderDeliveryTimeout = @@ -64,3 +72,13 @@ decoderBatchNumMessages = Environment.defaultValue = "10000" } (map BatchNumMessages Environment.int) + +decoderStatisticsIntervalMs :: Environment.Decoder StatisticsIntervalMs +decoderStatisticsIntervalMs = + Environment.variable + Environment.Variable + { Environment.name = "KAFKA_STATISTICS_INTERVAL_MS", + Environment.description = "librdkafka statistics emit interval. The application also needs to register a stats callback using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.", + Environment.defaultValue = "0" + } + (map StatisticsIntervalMs Environment.int) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 641f2815..bff7517e 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -242,7 +242,8 @@ createConsumer { Settings.brokerAddresses, Settings.logLevel, Settings.maxPollIntervalMs, - Settings.onProcessMessageSkip + Settings.onProcessMessageSkip, + Settings.statisticsIntervalMs } groupId observability @@ -271,8 +272,7 @@ createConsumer ++ Consumer.extraProps ( Dict.fromList [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), - -- TODO make this configurable? - ("statistics.interval.ms", "1000") + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) ] ) ++ case maybeStatsCallback of diff --git a/nri-kafka/src/Kafka/Worker/Settings.hs b/nri-kafka/src/Kafka/Worker/Settings.hs index 232e2603..ef8b413a 100644 --- a/nri-kafka/src/Kafka/Worker/Settings.hs +++ b/nri-kafka/src/Kafka/Worker/Settings.hs @@ -5,6 +5,7 @@ module Kafka.Worker.Settings MaxMsgsPerPartitionBufferedLocally (..), MaxPollIntervalMs (..), SkipOrNot (..), + StatisticsIntervalMs (..), ) where @@ -33,7 +34,11 @@ data Settings = Settings maxPollIntervalMs :: MaxPollIntervalMs, -- | This option provides us the possibility to skip messages on failure. -- Useful for testing Kafka worker. DoNotSkip is a reasonable default! - onProcessMessageSkip :: SkipOrNot + onProcessMessageSkip :: SkipOrNot, + -- | librdkafka statistics emit interval. The application also needs to + -- register a stats callback using rd_kafka_conf_set_stats_cb(). The + -- granularity is 1000ms. A value of 0 disables statistics. + statisticsIntervalMs :: StatisticsIntervalMs } -- | This option provides us the possibility to skip messages on failure. @@ -51,6 +56,8 @@ newtype MaxMsgsPerPartitionBufferedLocally = MaxMsgsPerPartitionBufferedLocally -- | Time between polling newtype MaxPollIntervalMs = MaxPollIntervalMs {unMaxPollIntervalMs :: Int} +newtype StatisticsIntervalMs = StatisticsIntervalMs {unStatisticsIntervalMs :: Int} + -- | decodes Settings from environmental variables -- Also consumes Observability env variables (see nri-observability) -- KAFKA_BROKER_ADDRESSES=localhost:9092 # comma delimeted list @@ -74,6 +81,7 @@ decoder = |> andMap decoderPollBatchSize |> andMap decoderMaxPollIntervalMs |> andMap decoderOnProcessMessageFailure + |> andMap decoderStatisticsIntervalMs decoderPollingTimeout :: Environment.Decoder Consumer.Timeout decoderPollingTimeout = @@ -149,3 +157,13 @@ decoderOnProcessMessageFailure = else Ok DoNotSkip ) ) + +decoderStatisticsIntervalMs :: Environment.Decoder StatisticsIntervalMs +decoderStatisticsIntervalMs = + Environment.variable + Environment.Variable + { Environment.name = "KAFKA_STATISTICS_INTERVAL_MS", + Environment.description = "librdkafka statistics emit interval. The application also needs to register a stats callback using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.", + Environment.defaultValue = "0" + } + (map StatisticsIntervalMs Environment.int) From a5bc956773a022331db6626da36498d133c0c120 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 14:25:05 +0200 Subject: [PATCH 34/36] Use andMap --- nri-kafka/src/Kafka/Settings.hs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/nri-kafka/src/Kafka/Settings.hs b/nri-kafka/src/Kafka/Settings.hs index 2640f5d6..82240e85 100644 --- a/nri-kafka/src/Kafka/Settings.hs +++ b/nri-kafka/src/Kafka/Settings.hs @@ -12,6 +12,7 @@ where import qualified Environment import qualified Kafka.Producer import qualified Kafka.Settings.Internal as Internal +import qualified Prelude -- | Settings required to write to Kafka data Settings = Settings @@ -45,13 +46,12 @@ exampleBatchNumMessages = BatchNumMessages 1 -- KAFKA_BATCH_SIZE=10000 decoder :: Environment.Decoder Settings decoder = - map5 - Settings - Internal.decoderBrokerAddresses - Internal.decoderKafkaLogLevel - decoderDeliveryTimeout - decoderBatchNumMessages - decoderStatisticsIntervalMs + Prelude.pure Settings + |> andMap Internal.decoderBrokerAddresses + |> andMap Internal.decoderKafkaLogLevel + |> andMap decoderDeliveryTimeout + |> andMap decoderBatchNumMessages + |> andMap decoderStatisticsIntervalMs decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout decoderDeliveryTimeout = From da0e849105ca87c257ab6ca11d866808ce20539c Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 14:25:43 +0200 Subject: [PATCH 35/36] Format args --- nri-kafka/src/Kafka.hs | 86 +++++++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index f732faac..8e9a28d8 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -217,45 +217,53 @@ doSTM doAnything stm = |> Platform.doAnything doAnything mkProducer :: Settings.Settings -> Maybe Stats.StatsCallback -> Prelude.IO Producer.KafkaProducer -mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages, Settings.statisticsIntervalMs} maybeStatsCallback = do - let properties = - Producer.brokersList brokerAddresses - ++ Producer.sendTimeout deliveryTimeout - ++ Producer.logLevel logLevel - ++ Producer.compression Producer.Snappy - ++ Producer.extraProps - ( Dict.fromList - [ ( "batch.num.messages", - batchNumMessages - |> Settings.unBatchNumMessages - |> Text.fromInt - ), - -- Enable idemptent producers - -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference - ("enable.idempotence", "true"), - ("acks", "all"), - ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) - ] - ) - ++ case maybeStatsCallback of - Nothing -> Prelude.mempty - Just statsCallback -> - Producer.setCallback - ( Producer.statsCallback <| \content -> do - log <- Platform.silentHandler - _ <- Task.attempt log (statsCallback (Stats.decode content)) - Prelude.pure () - ) - eitherProducer <- Producer.newProducer properties - case eitherProducer of - Prelude.Left err -> - -- We create the handler as part of starting the application. Throwing - -- means that if there's a problem with the settings the application will - -- fail immediately upon start. It won't result in runtime errors during - -- operation. - Exception.throwIO err - Prelude.Right producer -> - Prelude.pure producer +mkProducer + Settings.Settings + { Settings.brokerAddresses, + Settings.deliveryTimeout, + Settings.logLevel, + Settings.batchNumMessages, + Settings.statisticsIntervalMs + } + maybeStatsCallback = do + let properties = + Producer.brokersList brokerAddresses + ++ Producer.sendTimeout deliveryTimeout + ++ Producer.logLevel logLevel + ++ Producer.compression Producer.Snappy + ++ Producer.extraProps + ( Dict.fromList + [ ( "batch.num.messages", + batchNumMessages + |> Settings.unBatchNumMessages + |> Text.fromInt + ), + -- Enable idemptent producers + -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference + ("enable.idempotence", "true"), + ("acks", "all"), + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) + ] + ) + ++ case maybeStatsCallback of + Nothing -> Prelude.mempty + Just statsCallback -> + Producer.setCallback + ( Producer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback (Stats.decode content)) + Prelude.pure () + ) + eitherProducer <- Producer.newProducer properties + case eitherProducer of + Prelude.Left err -> + -- We create the handler as part of starting the application. Throwing + -- means that if there's a problem with the settings the application will + -- fail immediately upon start. It won't result in runtime errors during + -- operation. + Exception.throwIO err + Prelude.Right producer -> + Prelude.pure producer sendHelperAsync :: Producer.KafkaProducer -> From f8b81f649b27ef9ebdfdee6787725fdb27fb5136 Mon Sep 17 00:00:00 2001 From: Stoeffel Date: Tue, 26 Apr 2022 14:31:43 +0200 Subject: [PATCH 36/36] Update changelog --- nri-kafka/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nri-kafka/CHANGELOG.md b/nri-kafka/CHANGELOG.md index 428227c5..839ffdd2 100644 --- a/nri-kafka/CHANGELOG.md +++ b/nri-kafka/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.1.0.5 + +- Added `StatsCallback` support to `Kafka` and `Kafka.Worker`. +- Added `STATISTICS_INTERVAL_MS` environment variable to `Kafka` and `Kafka.Worker`. + # 0.1.0.4 - Added new `ElsewhereButToKafkaAsWell` mode to `CommitOffsets`, which commits offsets to Kafka once the external Offset storage has been updated. Kafka commits are performed only to keep Kafka informed about consumer lag.