Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
55641d5
Add stats callback to kafka workers
stoeffel Mar 24, 2022
b9f8eac
Add stats callback to kafka
stoeffel Mar 24, 2022
37a47ba
Add type for stats
stoeffel Mar 24, 2022
6b96651
Decode stats in consumer callback
stoeffel Mar 24, 2022
cdf7626
Decode stats
stoeffel Mar 24, 2022
3e65ec6
Expose stats module
stoeffel Mar 24, 2022
9974886
Let us debug these things
omnibs Mar 31, 2022
336c48d
Tell kafka to report back with stats every 1s
omnibs Mar 31, 2022
36b05d6
Crash on debug failures
omnibs Mar 31, 2022
1668eb5
Show things
caiquefigueiredo Apr 7, 2022
a8b1258
WIP expose dict metrics
caiquefigueiredo Apr 7, 2022
a04332e
Fix test helpers
stoeffel Apr 14, 2022
1014811
Add function to decode json into a flat dict
stoeffel Apr 14, 2022
9b7bee1
Decode stats into flat dict
stoeffel Apr 14, 2022
c1b1ee7
Keep track of the Path, so not really a flat dict
zwilias Apr 14, 2022
3c0c5a1
Add pathToText helper
zwilias Apr 14, 2022
a0004d1
Support top-level values
zwilias Apr 14, 2022
84a8d28
Less casing
stoeffel Apr 14, 2022
2e0efe7
Simpler function (maybe slightly slower). Does it matter?
stoeffel Apr 14, 2022
4c2161d
More cleanup
stoeffel Apr 14, 2022
3435035
Reexpose Path and pathToText
zwilias Apr 14, 2022
f8dff24
Add topLevel metric types
stoeffel Apr 14, 2022
76aee8f
Attach metric type info to values
stoeffel Apr 14, 2022
2a7bdd1
Add show instance to metric
stoeffel Apr 14, 2022
2482a3a
Special case `req`
stoeffel Apr 14, 2022
5cd61aa
Cleanup
stoeffel Apr 14, 2022
90b65c4
Add req to known metrics
stoeffel Apr 14, 2022
ba38d08
There's a typo in rdkafka
omnibs Apr 14, 2022
debf3d3
Remove pathToText
zwilias Apr 21, 2022
612f86c
Formatting
stoeffel Apr 26, 2022
196c78c
Add todo
stoeffel Apr 26, 2022
9d0c4d4
Set stats interval to consumer
stoeffel Apr 26, 2022
11efc0f
Add settings for statistics.interval.ms
stoeffel Apr 26, 2022
a5bc956
Use andMap
stoeffel Apr 26, 2022
da0e849
Format args
stoeffel Apr 26, 2022
f8b81f6
Update changelog
stoeffel Apr 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nri-kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.1.0.5

- Added `StatsCallback` support to `Kafka` and `Kafka.Worker`.
- Added `STATISTICS_INTERVAL_MS` environment variable to `Kafka` and `Kafka.Worker`.

# 0.1.0.4

- Added new `ElsewhereButToKafkaAsWell` mode to `CommitOffsets`, which commits offsets to Kafka once the external Offset storage has been updated. Kafka commits are performed only to keep Kafka informed about consumer lag.
Expand Down
9 changes: 9 additions & 0 deletions nri-kafka/nri-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ library
exposed-modules:
Kafka
Kafka.Worker
Kafka.Stats
Kafka.Test
other-modules:
Data.Aeson.Extra
Kafka.Internal
Kafka.Settings
Kafka.Settings.Internal
Expand Down Expand Up @@ -78,17 +80,21 @@ library
, text >=1.2.3.1 && <2.1
, time >=1.8.0.2 && <2
, unix >=2.7.2.2 && <2.8.0.0
, unordered-containers >=0.2.14.0 && <0.3
, uuid >=1.3.0 && <1.4
, vector >=0.12.3.1 && <0.13
default-language: Haskell2010

test-suite tests
type: exitcode-stdio-1.0
main-is: Main.hs
other-modules:
Data.Aeson.Extra
Kafka
Kafka.Internal
Kafka.Settings
Kafka.Settings.Internal
Kafka.Stats
Kafka.Test
Kafka.Worker
Kafka.Worker.Analytics
Expand All @@ -98,6 +104,7 @@ test-suite tests
Kafka.Worker.Settings
Kafka.Worker.Stopping
Helpers
Spec.Data.Aeson.Extra
Spec.Kafka.Worker.Integration
Spec.Kafka.Worker.Partition
Paths_nri_kafka
Expand Down Expand Up @@ -137,5 +144,7 @@ test-suite tests
, text >=1.2.3.1 && <2.1
, time >=1.8.0.2 && <2
, unix >=2.7.2.2 && <2.8.0.0
, unordered-containers >=0.2.14.0 && <0.3
, uuid >=1.3.0 && <1.4
, vector >=0.12.3.1 && <0.13
default-language: Haskell2010
3 changes: 3 additions & 0 deletions nri-kafka/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ dependencies:
- time >= 1.8.0.2 && < 2
- unix >= 2.7.2.2 && < 2.8.0.0
- uuid >=1.3.0 && < 1.4
- unordered-containers >= 0.2.14.0 && < 0.3
- vector >= 0.12.3.1 && < 0.13
library:
exposed-modules:
- Kafka
- Kafka.Worker
- Kafka.Stats
- Kafka.Test
source-dirs: src
tests:
Expand Down
58 changes: 58 additions & 0 deletions nri-kafka/src/Data/Aeson/Extra.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module Data.Aeson.Extra (decodeIntoFlatDict, Path, Segment (..)) where

import qualified Data.Aeson as Aeson
import Data.ByteString (ByteString)
import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as Vector
import Dict (Dict)
import qualified Dict
import Prelude (Either (Left, Right))
import qualified Prelude

data Segment = Key Text | Index Int
deriving (Ord, Eq, Show)

type Path = List Segment

-- | Decodes JSON into a flat dict.
--
-- > decodeIntoFlatDict "{\"a\": {\"b|": 1}}"
-- > ==> Ok (Dict.fromList [("a.b", 1)])
--
-- > decodeIntoFlatDict "{\"a\": [1,2,3]}"
-- > ==> Ok (Dict.fromList [("a[0]", 1), ("a[1]", 2), ("a[2]", 3)])
decodeIntoFlatDict :: ByteString -> Result Text (Dict Path Aeson.Value)
decodeIntoFlatDict content =
case Aeson.eitherDecodeStrict content of
Left err -> Err (Text.fromList err)
Right value -> Ok (valueToDict [] value Nothing)

valueToDict :: Path -> Aeson.Value -> Maybe Segment -> Dict Path Aeson.Value
valueToDict path val maybeSegment =
let newPath =
case maybeSegment of
Nothing -> path
Just segment -> segment : path
in case val of
Aeson.Array arr -> arrayToDict newPath arr
Aeson.Object obj ->
HM.foldlWithKey'
( \acc k v ->
Just (Key k)
|> valueToDict newPath v
|> Dict.union acc
)
Dict.empty
obj
_ -> Dict.singleton (List.reverse newPath) val

arrayToDict :: Path -> Aeson.Array -> Dict Path Aeson.Value
arrayToDict path =
Vector.ifoldl
( \acc index item ->
Index (Prelude.fromIntegral index)
|> Just
|> valueToDict path item
|> Dict.union acc
)
Dict.empty
85 changes: 52 additions & 33 deletions nri-kafka/src/Kafka.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import qualified Dict
import qualified Kafka.Internal as Internal
import qualified Kafka.Producer as Producer
import qualified Kafka.Settings as Settings
import qualified Kafka.Stats as Stats
import qualified Platform
import qualified Prelude

Expand Down Expand Up @@ -155,9 +156,9 @@ key msg = Maybe.map Internal.unKey (Internal.key msg)
-- | Function for creating a Kafka handler.
--
-- See 'Kafka.Settings' for potential customizations.
handler :: Settings.Settings -> Conduit.Acquire Internal.Handler
handler settings = do
producer <- Conduit.mkAcquire (mkProducer settings) Producer.closeProducer
handler :: Settings.Settings -> Maybe Stats.StatsCallback -> Conduit.Acquire Internal.Handler
handler settings maybeStatsCallback = do
producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer
_ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate))
liftIO (mkHandler settings producer)

Expand Down Expand Up @@ -215,36 +216,54 @@ doSTM doAnything stm =
|> map Ok
|> Platform.doAnything doAnything

mkProducer :: Settings.Settings -> Prelude.IO Producer.KafkaProducer
mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout, Settings.logLevel, Settings.batchNumMessages} = do
let properties =
Producer.brokersList brokerAddresses
++ Producer.sendTimeout deliveryTimeout
++ Producer.logLevel logLevel
++ Producer.compression Producer.Snappy
++ Producer.extraProps
( Dict.fromList
[ ( "batch.num.messages",
batchNumMessages
|> Settings.unBatchNumMessages
|> Text.fromInt
),
-- Enable idemptent producers
-- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference
("enable.idempotence", "true"),
("acks", "all")
]
)
eitherProducer <- Producer.newProducer properties
case eitherProducer of
Prelude.Left err ->
-- We create the handler as part of starting the application. Throwing
-- means that if there's a problem with the settings the application will
-- fail immediately upon start. It won't result in runtime errors during
-- operation.
Exception.throwIO err
Prelude.Right producer ->
Prelude.pure producer
mkProducer :: Settings.Settings -> Maybe Stats.StatsCallback -> Prelude.IO Producer.KafkaProducer
mkProducer
Settings.Settings
{ Settings.brokerAddresses,
Settings.deliveryTimeout,
Settings.logLevel,
Settings.batchNumMessages,
Settings.statisticsIntervalMs
}
maybeStatsCallback = do
let properties =
Producer.brokersList brokerAddresses
++ Producer.sendTimeout deliveryTimeout
++ Producer.logLevel logLevel
++ Producer.compression Producer.Snappy
++ Producer.extraProps
( Dict.fromList
[ ( "batch.num.messages",
batchNumMessages
|> Settings.unBatchNumMessages
|> Text.fromInt
),
-- Enable idemptent producers
-- See https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html for reference
("enable.idempotence", "true"),
("acks", "all"),
("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs))
]
)
++ case maybeStatsCallback of
Nothing -> Prelude.mempty
Just statsCallback ->
Producer.setCallback
( Producer.statsCallback <| \content -> do
log <- Platform.silentHandler
_ <- Task.attempt log (statsCallback (Stats.decode content))
Prelude.pure ()
)
eitherProducer <- Producer.newProducer properties
case eitherProducer of
Prelude.Left err ->
-- We create the handler as part of starting the application. Throwing
-- means that if there's a problem with the settings the application will
-- fail immediately upon start. It won't result in runtime errors during
-- operation.
Exception.throwIO err
Prelude.Right producer ->
Prelude.pure producer

sendHelperAsync ::
Producer.KafkaProducer ->
Expand Down
32 changes: 25 additions & 7 deletions nri-kafka/src/Kafka/Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ module Kafka.Settings
BatchNumMessages,
unBatchNumMessages,
exampleBatchNumMessages,
StatisticsIntervalMs (..),
)
where

import qualified Environment
import qualified Kafka.Producer
import qualified Kafka.Settings.Internal as Internal
import qualified Prelude

-- | Settings required to write to Kafka
data Settings = Settings
Expand All @@ -21,12 +23,18 @@ data Settings = Settings
-- | Message delivery timeout. See hw-kafka's documentation for more info
deliveryTimeout :: Kafka.Producer.Timeout,
-- | Number of messages to batch together before sending to Kafka.
batchNumMessages :: BatchNumMessages
batchNumMessages :: BatchNumMessages,
-- | librdkafka statistics emit interval. The application also needs to
-- register a stats callback using rd_kafka_conf_set_stats_cb(). The
-- granularity is 1000ms. A value of 0 disables statistics.
statisticsIntervalMs :: StatisticsIntervalMs
}

-- | Number of messages to batch together before sending to Kafka.
newtype BatchNumMessages = BatchNumMessages {unBatchNumMessages :: Int}

newtype StatisticsIntervalMs = StatisticsIntervalMs {unStatisticsIntervalMs :: Int}

-- | example BatchNumMessages to use in tests
exampleBatchNumMessages :: BatchNumMessages
exampleBatchNumMessages = BatchNumMessages 1
Expand All @@ -38,12 +46,12 @@ exampleBatchNumMessages = BatchNumMessages 1
-- KAFKA_BATCH_SIZE=10000
decoder :: Environment.Decoder Settings
decoder =
map4
Settings
Internal.decoderBrokerAddresses
Internal.decoderKafkaLogLevel
decoderDeliveryTimeout
decoderBatchNumMessages
Prelude.pure Settings
|> andMap Internal.decoderBrokerAddresses
|> andMap Internal.decoderKafkaLogLevel
|> andMap decoderDeliveryTimeout
|> andMap decoderBatchNumMessages
|> andMap decoderStatisticsIntervalMs

decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout
decoderDeliveryTimeout =
Expand All @@ -64,3 +72,13 @@ decoderBatchNumMessages =
Environment.defaultValue = "10000"
}
(map BatchNumMessages Environment.int)

decoderStatisticsIntervalMs :: Environment.Decoder StatisticsIntervalMs
decoderStatisticsIntervalMs =
Environment.variable
Environment.Variable
{ Environment.name = "KAFKA_STATISTICS_INTERVAL_MS",
Environment.description = "librdkafka statistics emit interval. The application also needs to register a stats callback using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.",
Environment.defaultValue = "0"
}
(map StatisticsIntervalMs Environment.int)
Loading