diff --git a/nri-kafka/CHANGELOG.md b/nri-kafka/CHANGELOG.md index 428227c5..839ffdd2 100644 --- a/nri-kafka/CHANGELOG.md +++ b/nri-kafka/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.1.0.5 + +- Added `StatsCallback` support to `Kafka` and `Kafka.Worker`. +- Added `STATISTICS_INTERVAL_MS` environment variable to `Kafka` and `Kafka.Worker`. + # 0.1.0.4 - Added new `ElsewhereButToKafkaAsWell` mode to `CommitOffsets`, which commits offsets to Kafka once the external Offset storage has been updated. Kafka commits are performed only to keep Kafka informed about consumer lag. diff --git a/nri-kafka/nri-kafka.cabal b/nri-kafka/nri-kafka.cabal index edc1ce16..01fdfe56 100644 --- a/nri-kafka/nri-kafka.cabal +++ b/nri-kafka/nri-kafka.cabal @@ -31,8 +31,10 @@ library exposed-modules: Kafka Kafka.Worker + Kafka.Stats Kafka.Test other-modules: + Data.Aeson.Extra Kafka.Internal Kafka.Settings Kafka.Settings.Internal @@ -78,17 +80,21 @@ library , text >=1.2.3.1 && <2.1 , time >=1.8.0.2 && <2 , unix >=2.7.2.2 && <2.8.0.0 + , unordered-containers >=0.2.14.0 && <0.3 , uuid >=1.3.0 && <1.4 + , vector >=0.12.3.1 && <0.13 default-language: Haskell2010 test-suite tests type: exitcode-stdio-1.0 main-is: Main.hs other-modules: + Data.Aeson.Extra Kafka Kafka.Internal Kafka.Settings Kafka.Settings.Internal + Kafka.Stats Kafka.Test Kafka.Worker Kafka.Worker.Analytics @@ -98,6 +104,7 @@ test-suite tests Kafka.Worker.Settings Kafka.Worker.Stopping Helpers + Spec.Data.Aeson.Extra Spec.Kafka.Worker.Integration Spec.Kafka.Worker.Partition Paths_nri_kafka @@ -137,5 +144,7 @@ test-suite tests , text >=1.2.3.1 && <2.1 , time >=1.8.0.2 && <2 , unix >=2.7.2.2 && <2.8.0.0 + , unordered-containers >=0.2.14.0 && <0.3 , uuid >=1.3.0 && <1.4 + , vector >=0.12.3.1 && <0.13 default-language: Haskell2010 diff --git a/nri-kafka/package.yaml b/nri-kafka/package.yaml index 51a9861a..4eb278b4 100644 --- a/nri-kafka/package.yaml +++ b/nri-kafka/package.yaml @@ -30,10 +30,13 @@ dependencies: - time >= 1.8.0.2 && < 2 - unix >= 2.7.2.2 && < 2.8.0.0 - uuid >=1.3.0 && < 1.4 + - unordered-containers >= 0.2.14.0 && < 0.3 + - vector >= 0.12.3.1 && < 0.13 library: exposed-modules: - Kafka - Kafka.Worker + - Kafka.Stats - Kafka.Test source-dirs: src tests: diff --git a/nri-kafka/src/Data/Aeson/Extra.hs b/nri-kafka/src/Data/Aeson/Extra.hs new file mode 100644 index 00000000..b48b423b --- /dev/null +++ b/nri-kafka/src/Data/Aeson/Extra.hs @@ -0,0 +1,58 @@ +module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..)) where + +import qualified Data.Aeson as Aeson +import Data.ByteString (ByteString) +import qualified Data.HashMap.Strict as HM +import qualified Data.Vector as Vector +import Dict (Dict) +import qualified Dict +import Prelude (Either (Left, Right)) +import qualified Prelude + +data Segment = Key Text | Index Int + deriving (Ord, Eq, Show) + +type Path = List Segment + +-- | Decodes JSON into a flat dict. +-- +-- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}" +-- > ==> Ok (Dict.fromList [("a.b", 1)]) +-- +-- > decodeIntoFlatDict "{\"a\": [1,2,3]}" +-- > ==> Ok (Dict.fromList [("a[0]", 1), ("a[1]", 2), ("a[2]", 3)]) +decodeIntoFlatDict :: ByteString -> Result Text (Dict Path Aeson.Value) +decodeIntoFlatDict content = + case Aeson.eitherDecodeStrict content of + Left err -> Err (Text.fromList err) + Right value -> Ok (valueToDict [] value Nothing) + +valueToDict :: Path -> Aeson.Value -> Maybe Segment -> Dict Path Aeson.Value +valueToDict path val maybeSegment = + let newPath = + case maybeSegment of + Nothing -> path + Just segment -> segment : path + in case val of + Aeson.Array arr -> arrayToDict newPath arr + Aeson.Object obj -> + HM.foldlWithKey' + ( \acc k v -> + Just (Key k) + |> valueToDict newPath v + |> Dict.union acc + ) + Dict.empty + obj + _ -> Dict.singleton (List.reverse newPath) val + +arrayToDict :: Path -> Aeson.Array -> Dict Path Aeson.Value +arrayToDict path = + Vector.ifoldl + ( \acc index item -> + Index (Prelude.fromIntegral index) + |> Just + |> valueToDict path item + |> Dict.union acc + ) + Dict.empty diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 7ea45f11..8e9a28d8 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -43,6 +43,7 @@ import qualified Dict import qualified Kafka.Internal as Internal import qualified Kafka.Producer as Producer import qualified Kafka.Settings as Settings +import qualified Kafka.Stats as Stats import qualified Platform import qualified Prelude @@ -155,9 +156,9 @@ key msg = Maybe.map Internal.unKey (Internal.key msg) -- | Function for creating a Kafka handler. -- -- See 'Kafka.Settings' for potential customizations. -handler :: Settings.Settings -> Conduit.Acquire Internal.Handler -handler settings = do - producer <- Conduit.mkAcquire (mkProducer settings) Producer.closeProducer +handler :: Settings.Settings -> Maybe Stats.StatsCallback -> Conduit.Acquire Internal.Handler +handler settings maybeStatsCallback = do + producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer _ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) liftIO (mkHandler settings producer) @@ -215,36 +216,54 @@ doSTM doAnything stm = |> map Ok |> Platform.doAnything doAnything -mkProducer :: Settings.Settings -> Prelude.IO Producer.KafkaProducer -mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages} = do - let properties = - Producer.brokersList brokerAddresses - ++ Producer.sendTimeout deliveryTimeout - ++ Producer.logLevel logLevel - ++ Producer.compression Producer.Snappy - ++ Producer.extraProps - ( Dict.fromList - [ ( "batch.num.messages", - batchNumMessages - |> Settings.unBatchNumMessages - |> Text.fromInt - ), - -- Enable idemptent producers - -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference - ("enable.idempotence", "true"), - ("acks", "all") - ] - ) - eitherProducer <- Producer.newProducer properties - case eitherProducer of - Prelude.Left err -> - -- We create the handler as part of starting the application. Throwing - -- means that if there's a problem with the settings the application will - -- fail immediately upon start. It won't result in runtime errors during - -- operation. - Exception.throwIO err - Prelude.Right producer -> - Prelude.pure producer +mkProducer :: Settings.Settings -> Maybe Stats.StatsCallback -> Prelude.IO Producer.KafkaProducer +mkProducer + Settings.Settings + { Settings.brokerAddresses, + Settings.deliveryTimeout, + Settings.logLevel, + Settings.batchNumMessages, + Settings.statisticsIntervalMs + } + maybeStatsCallback = do + let properties = + Producer.brokersList brokerAddresses + ++ Producer.sendTimeout deliveryTimeout + ++ Producer.logLevel logLevel + ++ Producer.compression Producer.Snappy + ++ Producer.extraProps + ( Dict.fromList + [ ( "batch.num.messages", + batchNumMessages + |> Settings.unBatchNumMessages + |> Text.fromInt + ), + -- Enable idemptent producers + -- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference + ("enable.idempotence", "true"), + ("acks", "all"), + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) + ] + ) + ++ case maybeStatsCallback of + Nothing -> Prelude.mempty + Just statsCallback -> + Producer.setCallback + ( Producer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback (Stats.decode content)) + Prelude.pure () + ) + eitherProducer <- Producer.newProducer properties + case eitherProducer of + Prelude.Left err -> + -- We create the handler as part of starting the application. Throwing + -- means that if there's a problem with the settings the application will + -- fail immediately upon start. It won't result in runtime errors during + -- operation. + Exception.throwIO err + Prelude.Right producer -> + Prelude.pure producer sendHelperAsync :: Producer.KafkaProducer -> diff --git a/nri-kafka/src/Kafka/Settings.hs b/nri-kafka/src/Kafka/Settings.hs index 86b53062..82240e85 100644 --- a/nri-kafka/src/Kafka/Settings.hs +++ b/nri-kafka/src/Kafka/Settings.hs @@ -5,12 +5,14 @@ module Kafka.Settings BatchNumMessages, unBatchNumMessages, exampleBatchNumMessages, + StatisticsIntervalMs (..), ) where import qualified Environment import qualified Kafka.Producer import qualified Kafka.Settings.Internal as Internal +import qualified Prelude -- | Settings required to write to Kafka data Settings = Settings @@ -21,12 +23,18 @@ data Settings = Settings -- | Message delivery timeout. See hw-kafka's documentation for more info deliveryTimeout :: Kafka.Producer.Timeout, -- | Number of messages to batch together before sending to Kafka. - batchNumMessages :: BatchNumMessages + batchNumMessages :: BatchNumMessages, + -- | librdkafka statistics emit interval. The application also needs to + -- register a stats callback using rd_kafka_conf_set_stats_cb(). The + -- granularity is 1000ms. A value of 0 disables statistics. + statisticsIntervalMs :: StatisticsIntervalMs } -- | Number of messages to batch together before sending to Kafka. newtype BatchNumMessages = BatchNumMessages {unBatchNumMessages :: Int} +newtype StatisticsIntervalMs = StatisticsIntervalMs {unStatisticsIntervalMs :: Int} + -- | example BatchNumMessages to use in tests exampleBatchNumMessages :: BatchNumMessages exampleBatchNumMessages = BatchNumMessages 1 @@ -38,12 +46,12 @@ exampleBatchNumMessages = BatchNumMessages 1 -- KAFKA_BATCH_SIZE=10000 decoder :: Environment.Decoder Settings decoder = - map4 - Settings - Internal.decoderBrokerAddresses - Internal.decoderKafkaLogLevel - decoderDeliveryTimeout - decoderBatchNumMessages + Prelude.pure Settings + |> andMap Internal.decoderBrokerAddresses + |> andMap Internal.decoderKafkaLogLevel + |> andMap decoderDeliveryTimeout + |> andMap decoderBatchNumMessages + |> andMap decoderStatisticsIntervalMs decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout decoderDeliveryTimeout = @@ -64,3 +72,13 @@ decoderBatchNumMessages = Environment.defaultValue = "10000" } (map BatchNumMessages Environment.int) + +decoderStatisticsIntervalMs :: Environment.Decoder StatisticsIntervalMs +decoderStatisticsIntervalMs = + Environment.variable + Environment.Variable + { Environment.name = "KAFKA_STATISTICS_INTERVAL_MS", + Environment.description = "librdkafka statistics emit interval. The application also needs to register a stats callback using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.", + Environment.defaultValue = "0" + } + (map StatisticsIntervalMs Environment.int) diff --git a/nri-kafka/src/Kafka/Stats.hs b/nri-kafka/src/Kafka/Stats.hs new file mode 100644 index 00000000..66485bd2 --- /dev/null +++ b/nri-kafka/src/Kafka/Stats.hs @@ -0,0 +1,239 @@ +-- | Kafka is a module for _writing_ to Kafka +-- See https://github.com/edenhill/librdkafka/blob/0261c86228e910cc84c4c7ab74e563c121f50696/STATISTICS.md +-- +-- See Kafka.Worker for the basic building blocks of a CLI app that will poll & +-- process kafka messages +module Kafka.Stats (StatsCallback, Stats, decode, Path, Segment (..), Metric (..)) where + +import qualified Data.Aeson as Aeson +import Data.Aeson.Extra (Path, Segment (..)) +import qualified Data.Aeson.Extra as Aeson.Extra +import Data.ByteString (ByteString) +import qualified Data.Text +import Dict (Dict) +import qualified Dict +import qualified Tuple +import qualified Prelude + +type Stats = Dict Path Metric + +type StatsCallback = (Result Text Stats -> Task Text ()) + +decode :: ByteString -> Result Text Stats +decode raw = + case Aeson.Extra.decodeIntoFlatDict raw of + Err err -> Err err + Ok stats -> + stats + |> Dict.toList + |> List.map + ( \(path, value) -> + toMetric path value + |> Result.map (Tuple.pair path) + ) + |> Prelude.sequence + |> Result.map Dict.fromList + +data Metric = StringMetric Text | IntMetric Int | IntGauge Int | BoolMetric Bool + deriving (Show) + +toMetric :: Path -> Aeson.Value -> Result Text Metric +toMetric path value = + case List.reverse path of + (Aeson.Extra.Key last) : _ -> + case Dict.get last allMetrics of + Nothing -> + -- Get second to last segment of path for `req` case. + case List.tail (List.reverse path) of + Just ((Aeson.Extra.Key secondToLast) : _) -> + case Dict.get secondToLast allMetrics of + Just metricType -> metricTypeToMetric metricType value path + Nothing -> Err ("Unknown metric type: " ++ Data.Text.pack (Prelude.show path)) + _ -> + Err ("Unknown metric: " ++ Data.Text.pack (Prelude.show path)) + Just metricType -> metricTypeToMetric metricType value path + _ -> Err "Empty path" + +metricTypeToMetric :: MetricType -> Aeson.Value -> Path -> Result Text Metric +metricTypeToMetric metricType value path = + case (metricType, value) of + (StringType, Aeson.String str) -> Ok (StringMetric str) + (IntType, Aeson.Number num) -> Ok (IntMetric (Prelude.floor num)) + (IntGaugeType, Aeson.Number num) -> Ok (IntGauge (Prelude.floor num)) + (BoolType, Aeson.Bool bool) -> Ok (BoolMetric bool) + _ -> Err ("Metric type mismatch: " ++ Data.Text.pack (Prelude.show path)) + +data MetricType = StringType | IntType | IntGaugeType | BoolType + +allMetrics :: Dict Text MetricType +allMetrics = + List.foldl + Dict.union + Dict.empty + [ topLevel, + brokers, + windowStats, + brokersToppars, + topics, + partitions, + cgrp, + eos + ] + +topLevel :: Dict Text MetricType +topLevel = + Dict.fromList + [ ("name", StringType), + ("client_id", StringType), + ("type", StringType), + ("ts", IntType), + ("time", IntType), + ("age", IntType), + ("replyq", IntGaugeType), + ("msg_cnt", IntGaugeType), + ("msg_size", IntGaugeType), + ("msg_max", IntType), + ("msg_size_max", IntType), + ("tx", IntType), + ("tx_bytes", IntType), + ("rx", IntType), + ("rx_bytes", IntType), + ("txmsgs", IntType), + ("txmsg_bytes", IntType), + ("rxmsgs", IntType), + ("rxmsg_bytes", IntType), + ("simple_cnt", IntGaugeType), + ("metadata_cache_cnt", IntGaugeType) + ] + +brokers :: Dict Text MetricType +brokers = + Dict.fromList + [ ("name", StringType), + ("nodeid", IntType), + ("nodename", StringType), + ("source", StringType), + ("state", StringType), + ("stateage", IntGaugeType), + ("outbuf_cnt", IntGaugeType), + ("outbuf_msg_cnt", IntGaugeType), + ("waitresp_cnt", IntGaugeType), + ("waitresp_msg_cnt", IntGaugeType), + ("tx", IntType), + ("txbytes", IntType), + ("txerrs", IntType), + ("txretries", IntType), + ("txidle", IntType), + ("req_timeouts", IntType), + ("rx", IntType), + ("rxbytes", IntType), + ("rxerrs", IntType), + ("rxcorriderrs", IntType), + ("rxpartial", IntType), + ("rxidle", IntType), + ("zbuf_grow", IntType), + ("buf_grow", IntType), + ("wakeups", IntType), + ("connects", IntType), + ("disconnects", IntType), + -- This is an object that has custom entries. We are special casing it in metricTypeToMetric + ("req", IntType) + ] + +windowStats :: Dict Text MetricType +windowStats = + Dict.fromList + [ ("min", IntGaugeType), + ("max", IntGaugeType), + ("avg", IntGaugeType), + ("sum", IntGaugeType), + ("cnt", IntGaugeType), + ("stddev", IntGaugeType), + ("hdrsize", IntGaugeType), + ("p50", IntGaugeType), + ("p75", IntGaugeType), + ("p90", IntGaugeType), + ("p95", IntGaugeType), + ("p99", IntGaugeType), + ("p99_99", IntGaugeType), + ("outofrange", IntGaugeType) + ] + +brokersToppars :: Dict Text MetricType +brokersToppars = + Dict.fromList + [ ("topic", StringType), + ("partition", IntType) + ] + +topics :: Dict Text MetricType +topics = + Dict.fromList + [ ("topic", StringType), + ("age ", IntGaugeType), + ("metadata_age", IntGaugeType) + ] + +partitions :: Dict Text MetricType +partitions = + Dict.fromList + [ ("partition", IntType), + ("broker", IntType), + ("leader", IntType), + ("desired", BoolType), + ("unknown", BoolType), + ("msgq_cnt", IntGaugeType), + ("msgq_bytes", IntGaugeType), + ("xmit_msgq_cnt", IntGaugeType), + ("xmit_msgq_bytes", IntGaugeType), + ("fetchq_cnt", IntGaugeType), + ("fetchq_size", IntGaugeType), + ("fetch_state", StringType), + ("query_offset", IntGaugeType), + ("next_offset", IntGaugeType), + ("app_offset", IntGaugeType), + ("stored_offset", IntGaugeType), + ("committed_offset", IntGaugeType), + ("commited_offset", IntGaugeType), + ("eof_offset", IntGaugeType), + ("lo_offset", IntGaugeType), + ("hi_offset", IntGaugeType), + ("ls_offset", IntGaugeType), + ("consumer_lag", IntGaugeType), + ("consumer_lag_stored", IntGaugeType), + ("txmsgs", IntType), + ("txbytes", IntType), + ("rxmsgs", IntType), + ("rxbytes", IntType), + ("msgs", IntType), + ("rx_ver_drops", IntType), + ("msgs_inflight", IntGaugeType), + ("next_ack_seq", IntGaugeType), + ("next_err_seq", IntGaugeType), + ("acked_msgid", IntType) + ] + +cgrp :: Dict Text MetricType +cgrp = + Dict.fromList + [ ("state", StringType), + ("stateage", IntGaugeType), + ("join_state", StringType), + ("rebalance_age", IntGaugeType), + ("rebalance_cnt", IntType), + ("rebalance_reason", StringType), + ("assignment_size", IntGaugeType) + ] + +eos :: Dict Text MetricType +eos = + Dict.fromList + [ ("idemp_state", StringType), + ("idemp_stateage", IntGaugeType), + ("txn_state", StringType), + ("txn_stateage", IntGaugeType), + ("txn_may_enq", BoolType), + ("producer_id", IntGaugeType), + ("producer_epoch", IntGaugeType), + ("epoch_cnt", IntType) + ] diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index ab7467d5..bff7517e 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -16,6 +16,7 @@ import qualified GHC.Clock import qualified Kafka.Consumer as Consumer import qualified Kafka.Internal as Kafka import qualified Kafka.Metadata +import qualified Kafka.Stats as Stats import qualified Kafka.Worker.Analytics as Analytics import qualified Kafka.Worker.Fetcher as Fetcher import qualified Kafka.Worker.Partition as Partition @@ -177,9 +178,9 @@ data OffsetSource where OffsetSource -- | Starts the kafka worker handling messages. -process :: Settings.Settings -> Text -> TopicSubscription -> Prelude.IO () -process settings groupIdText topicSubscriptions = do - processWithoutShutdownEnsurance settings (Consumer.ConsumerGroupId groupIdText) topicSubscriptions +process :: Settings.Settings -> Text -> TopicSubscription -> Maybe Stats.StatsCallback -> Prelude.IO () +process settings groupIdText topicSubscriptions maybeStatsCallback = do + processWithoutShutdownEnsurance settings (Consumer.ConsumerGroupId groupIdText) topicSubscriptions maybeStatsCallback -- Start an ensurance policy to make sure we exit in 5 seconds. We've seen -- cases where our graceful shutdown seems to hang, resulting in a worker -- that's not doing anything. We should try to fix those failures, but for the @@ -198,14 +199,14 @@ process settings groupIdText topicSubscriptions = do -- | Like `process`, but doesn't exit the current process by itself. This risks -- leaving zombie processes when used in production but is safer in tests, where -- the worker shares the OS process with other test code and the test runner. -processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Prelude.IO () -processWithoutShutdownEnsurance settings groupId topicSubscriptions = do +processWithoutShutdownEnsurance :: Settings.Settings -> Consumer.ConsumerGroupId -> TopicSubscription -> Maybe Stats.StatsCallback -> Prelude.IO () +processWithoutShutdownEnsurance settings groupId topicSubscriptions maybeStatsCallback = do let TopicSubscription {onMessage, topic, offsetSource, commitToKafkaAsWell} = topicSubscriptions state <- initState onQuitSignal (Stopping.stopTakingRequests (stopping state) "Received stop signal") Conduit.withAcquire (Observability.handler (Settings.observability settings)) <| \observabilityHandler -> do Exception.bracketWithError - (createConsumer settings groupId observabilityHandler offsetSource commitToKafkaAsWell onMessage topic state) + (createConsumer settings groupId observabilityHandler offsetSource commitToKafkaAsWell onMessage maybeStatsCallback topic state) (cleanUp observabilityHandler (rebalanceInfo state) (stopping state)) (runThreads settings state) @@ -232,6 +233,7 @@ createConsumer :: OffsetSource -> CommitToKafkaAsWell -> Partition.MessageCallback -> + Maybe Stats.StatsCallback -> Kafka.Topic -> State -> Prelude.IO Consumer.KafkaConsumer @@ -240,13 +242,15 @@ createConsumer { Settings.brokerAddresses, Settings.logLevel, Settings.maxPollIntervalMs, - Settings.onProcessMessageSkip + Settings.onProcessMessageSkip, + Settings.statisticsIntervalMs } groupId observability offsetSource commitToKafkaAsWell callback + maybeStatsCallback topic state = do let rebalance = @@ -267,8 +271,19 @@ createConsumer ++ Consumer.compression Consumer.Snappy ++ Consumer.extraProps ( Dict.fromList - [("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs))] + [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) + ] ) + ++ case maybeStatsCallback of + Nothing -> Prelude.mempty + Just statsCallback -> + Consumer.setCallback + ( Consumer.statsCallback <| \content -> do + log <- Platform.silentHandler + _ <- Task.attempt log (statsCallback (Stats.decode content)) + Prelude.pure () + ) let subscription' = Consumer.topics [Consumer.TopicName (Kafka.unTopic topic)] ++ Consumer.offsetReset Consumer.Earliest diff --git a/nri-kafka/src/Kafka/Worker/Settings.hs b/nri-kafka/src/Kafka/Worker/Settings.hs index 232e2603..ef8b413a 100644 --- a/nri-kafka/src/Kafka/Worker/Settings.hs +++ b/nri-kafka/src/Kafka/Worker/Settings.hs @@ -5,6 +5,7 @@ module Kafka.Worker.Settings MaxMsgsPerPartitionBufferedLocally (..), MaxPollIntervalMs (..), SkipOrNot (..), + StatisticsIntervalMs (..), ) where @@ -33,7 +34,11 @@ data Settings = Settings maxPollIntervalMs :: MaxPollIntervalMs, -- | This option provides us the possibility to skip messages on failure. -- Useful for testing Kafka worker. DoNotSkip is a reasonable default! - onProcessMessageSkip :: SkipOrNot + onProcessMessageSkip :: SkipOrNot, + -- | librdkafka statistics emit interval. The application also needs to + -- register a stats callback using rd_kafka_conf_set_stats_cb(). The + -- granularity is 1000ms. A value of 0 disables statistics. + statisticsIntervalMs :: StatisticsIntervalMs } -- | This option provides us the possibility to skip messages on failure. @@ -51,6 +56,8 @@ newtype MaxMsgsPerPartitionBufferedLocally = MaxMsgsPerPartitionBufferedLocally -- | Time between polling newtype MaxPollIntervalMs = MaxPollIntervalMs {unMaxPollIntervalMs :: Int} +newtype StatisticsIntervalMs = StatisticsIntervalMs {unStatisticsIntervalMs :: Int} + -- | decodes Settings from environmental variables -- Also consumes Observability env variables (see nri-observability) -- KAFKA_BROKER_ADDRESSES=localhost:9092 # comma delimeted list @@ -74,6 +81,7 @@ decoder = |> andMap decoderPollBatchSize |> andMap decoderMaxPollIntervalMs |> andMap decoderOnProcessMessageFailure + |> andMap decoderStatisticsIntervalMs decoderPollingTimeout :: Environment.Decoder Consumer.Timeout decoderPollingTimeout = @@ -149,3 +157,13 @@ decoderOnProcessMessageFailure = else Ok DoNotSkip ) ) + +decoderStatisticsIntervalMs :: Environment.Decoder StatisticsIntervalMs +decoderStatisticsIntervalMs = + Environment.variable + Environment.Variable + { Environment.name = "KAFKA_STATISTICS_INTERVAL_MS", + Environment.description = "librdkafka statistics emit interval. The application also needs to register a stats callback using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.", + Environment.defaultValue = "0" + } + (map StatisticsIntervalMs Environment.int) diff --git a/nri-kafka/test/Helpers.hs b/nri-kafka/test/Helpers.hs index f29baee7..0212a460 100644 --- a/nri-kafka/test/Helpers.hs +++ b/nri-kafka/test/Helpers.hs @@ -73,6 +73,7 @@ spawnWorker handler' topic callback = |> Platform.doAnything (doAnything handler') ) ) + Nothing |> Async.race_ (returnWhenTerminating handler') |> Async.async Async.link async diff --git a/nri-kafka/test/Main.hs b/nri-kafka/test/Main.hs index 8ec4991a..88e7afa8 100644 --- a/nri-kafka/test/Main.hs +++ b/nri-kafka/test/Main.hs @@ -1,5 +1,6 @@ module Main (main) where +import qualified Spec.Data.Aeson.Extra import qualified Spec.Kafka.Worker.Integration import qualified Spec.Kafka.Worker.Partition import qualified Test @@ -13,5 +14,6 @@ tests = Test.describe "lib/kafka" [ Spec.Kafka.Worker.Integration.tests, - Spec.Kafka.Worker.Partition.tests + Spec.Kafka.Worker.Partition.tests, + Spec.Data.Aeson.Extra.tests ] diff --git a/nri-kafka/test/Spec/Data/Aeson/Extra.hs b/nri-kafka/test/Spec/Data/Aeson/Extra.hs new file mode 100644 index 00000000..bbafa378 --- /dev/null +++ b/nri-kafka/test/Spec/Data/Aeson/Extra.hs @@ -0,0 +1,76 @@ +module Spec.Data.Aeson.Extra (tests) where + +import qualified Data.Aeson as Aeson +import Data.Aeson.Extra (Segment (..), decodeIntoFlatDict) +import qualified Dict +import qualified Expect +import Test + +tests :: Test +tests = + describe + "Data.Aeson.Extra" + [ decodeIntoFlatDictTest + ] + +decodeIntoFlatDictTest :: Test +decodeIntoFlatDictTest = + describe + "decodeIntoFlatDict" + [ test "simple object" <| \() -> + "{\"foo\": 1}" + |> decodeIntoFlatDict + |> Expect.equal (Ok (Dict.fromList [([Key "foo"], Aeson.Number 1)])), + test "with nested object" <| \() -> + "{\"foo\": 1, \"bar\": { \"moo\": \"cow\" }}" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ([Key "foo"], Aeson.Number 1), + ([Key "bar", Key "moo"], Aeson.String "cow") + ] + ) + ), + test "with more nesting object" <| \() -> + "{\"foo\": 1, \"bar\": { \"moo\": \"cow\", \"hello\": { \"world\": true }}}" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ([Key "foo"], Aeson.Number 1), + ([Key "bar", Key "moo"], Aeson.String "cow"), + ([Key "bar", Key "hello", Key "world"], Aeson.Bool True) + ] + ) + ), + test "with nested arrays" <| \() -> + "{\"foo\": 1, \"bar\": [ 1, 2, 3 ]}" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ([Key "foo"], Aeson.Number 1), + ([Key "bar", Index 0], Aeson.Number 1), + ([Key "bar", Index 1], Aeson.Number 2), + ([Key "bar", Index 2], Aeson.Number 3) + ] + ) + ), + test "with top-level array" <| \() -> + "[1, 2, 3 ]" + |> decodeIntoFlatDict + |> Expect.equal + ( Ok + ( Dict.fromList + [ ([Index 0], Aeson.Number 1), + ([Index 1], Aeson.Number 2), + ([Index 2], Aeson.Number 3) + ] + ) + ), + test "with top-level value" <| \() -> + "true" + |> decodeIntoFlatDict + |> Expect.equal (Ok (Dict.fromList [([], Aeson.Bool True)])) + ]