diff --git a/nix/sources.json b/nix/sources.json index 142a0664..26bf2ffa 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -1,4 +1,16 @@ { + "hw-kafka-client": { + "branch": "main", + "description": "Kafka client for Haskell, including auto-rebalancing consumers", + "homepage": null, + "owner": "NoRedInk", + "repo": "hw-kafka-client", + "rev": "afb77994286f9c4876f562fb0a8c7b098b56248f", + "sha256": "1ygmvw508n7dc6is9yzz8yc1k8nhz66f6snagvb7sjijfsym31lw", + "type": "tarball", + "url": "https://github.com/NoRedInk/hw-kafka-client/archive/afb77994286f9c4876f562fb0a8c7b098b56248f.tar.gz", + "url_template": "https://github.com///archive/.tar.gz" + }, "niv": { "branch": "master", "description": "Easy dependency management for Nix projects", diff --git a/nri-kafka/nri-kafka.cabal b/nri-kafka/nri-kafka.cabal index 01fdfe56..319286ee 100644 --- a/nri-kafka/nri-kafka.cabal +++ b/nri-kafka/nri-kafka.cabal @@ -71,7 +71,7 @@ library , bytestring >=0.10.8.2 && <0.12 , conduit >=1.3.0 && <1.4 , containers >=0.6.0.1 && <0.7 - , hw-kafka-client >=4.0.3 && <5.0 + , hw-kafka-client >=5.0.0 && <6.0 , nri-env-parser >=0.1.0.0 && <0.2 , nri-observability >=0.1.1.1 && <0.2 , nri-prelude >=0.1.0.0 && <0.7 @@ -135,7 +135,7 @@ test-suite tests , bytestring >=0.10.8.2 && <0.12 , conduit >=1.3.0 && <1.4 , containers >=0.6.0.1 && <0.7 - , hw-kafka-client >=4.0.3 && <5.0 + , hw-kafka-client >=5.0.0 && <6.0 , nri-env-parser >=0.1.0.0 && <0.2 , nri-observability >=0.1.1.1 && <0.2 , nri-prelude >=0.1.0.0 && <0.7 diff --git a/nri-kafka/package.yaml b/nri-kafka/package.yaml index 4eb278b4..25112513 100644 --- a/nri-kafka/package.yaml +++ b/nri-kafka/package.yaml @@ -20,7 +20,7 @@ dependencies: - bytestring >= 0.10.8.2 && < 0.12 - conduit >= 1.3.0 && < 1.4 - containers >= 0.6.0.1 && < 0.7 - - hw-kafka-client >=4.0.3 && < 5.0 + - hw-kafka-client >=5.0.0 && < 6.0 - nri-env-parser >= 0.1.0.0 && < 0.2 - nri-observability >= 0.1.1.1 && < 0.2 - nri-prelude >= 0.1.0.0 && < 0.7 diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 8e9a28d8..411985d0 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -137,6 +137,7 @@ record msg = do |> ByteString.Lazy.toStrict ) (Internal.payload msg) + , Producer.prHeaders = Prelude.mempty } -- | The topic of a message. This function might sometimes be useful in tests. @@ -159,7 +160,7 @@ key msg = Maybe.map Internal.unKey (Internal.key msg) handler :: Settings.Settings -> Maybe Stats.StatsCallback -> Conduit.Acquire Internal.Handler handler settings maybeStatsCallback = do producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer - _ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) + _ <- Conduit.mkAcquire (startFlushLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) liftIO (mkHandler settings producer) data Terminate = Terminate @@ -167,24 +168,24 @@ data Terminate = Terminate -- | By default events only get polled right before sending a record to kafka. -- This means that the deliveryCallback only gets fired on the next call to produceMessage'. -- We want to be informed about delivery status as soon as possible though. -startPollEventLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b) -startPollEventLoop producer = do +-- The only way to do that right now in hw-kafka-client is by flushing the queue +startFlushLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b) +startFlushLoop producer = do terminator <- STM.atomically TMVar.newEmptyTMVar _ <- Async.race_ - (pollEvents producer) + (flushProducer producer) (STM.atomically <| TMVar.readTMVar terminator) |> Async.async Prelude.pure terminator -- | We use a little trick here to poll events, by sending an empty message batch. -- This will call the internal pollEvent function in hw-kafka-client. -pollEvents :: Producer.KafkaProducer -> Prelude.IO () -pollEvents producer = do - Producer.produceMessageBatch producer [] - |> map (\_ -> ()) +flushProducer :: Producer.KafkaProducer -> Prelude.IO () +flushProducer producer = do + Producer.flushProducer producer Control.Concurrent.threadDelay 100_000 {- 100ms -} - pollEvents producer + flushProducer producer -- | mkHandler :: Settings.Settings -> Producer.KafkaProducer -> Prelude.IO Internal.Handler diff --git a/nri-kafka/src/Kafka/Settings.hs b/nri-kafka/src/Kafka/Settings.hs index 82240e85..722513fc 100644 --- a/nri-kafka/src/Kafka/Settings.hs +++ b/nri-kafka/src/Kafka/Settings.hs @@ -10,6 +10,7 @@ module Kafka.Settings where import qualified Environment +import qualified Kafka.Consumer.AssignmentStrategy as AssignmentStrategy import qualified Kafka.Producer import qualified Kafka.Settings.Internal as Internal import qualified Prelude @@ -27,7 +28,11 @@ data Settings = Settings -- | librdkafka statistics emit interval. The application also needs to -- register a stats callback using rd_kafka_conf_set_stats_cb(). The -- granularity is 1000ms. A value of 0 disables statistics. - statisticsIntervalMs :: StatisticsIntervalMs + statisticsIntervalMs :: StatisticsIntervalMs, + -- | partition assignment strategy for workers. one of + -- RangeAssignor: https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html + -- CooperativeStickyAssignor: https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html + partitionAssignmentStrategy :: AssignmentStrategy.ConsumerAssignmentStrategy } -- | Number of messages to batch together before sending to Kafka. @@ -52,6 +57,7 @@ decoder = |> andMap decoderDeliveryTimeout |> andMap decoderBatchNumMessages |> andMap decoderStatisticsIntervalMs + |> andMap decoderPartitionAssignmentStrategy decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout decoderDeliveryTimeout = @@ -82,3 +88,18 @@ decoderStatisticsIntervalMs = Environment.defaultValue = "0" } (map StatisticsIntervalMs Environment.int) + +decoderPartitionAssignmentStrategy :: Environment.Decoder AssignmentStrategy.ConsumerAssignmentStrategy +decoderPartitionAssignmentStrategy = + Environment.variable + Environment.Variable + { Environment.name = "KAFKA_PARTITION_ASSIGNMENT_STRATEGY", + Environment.description = "sets the kafka partition assignemnt strategy. one of: {cooperative-sticky,range-assignor}", + Environment.defaultValue = "cooperative-sticky" + } + ( Environment.custom Environment.text + <| \str -> case str of + "cooperative-sticky" -> Ok AssignmentStrategy.CooperativeStickyAssignor + "range-assignor" -> Ok AssignmentStrategy.RangeAssignor + invalidValue -> Err ("Invalid value: " ++ invalidValue) + ) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index bff7517e..36f41b03 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -14,6 +14,7 @@ import qualified Data.UUID.V4 import qualified Dict import qualified GHC.Clock import qualified Kafka.Consumer as Consumer +import qualified Kafka.Consumer.AssignmentStrategy as AssignmentStrategy import qualified Kafka.Internal as Kafka import qualified Kafka.Metadata import qualified Kafka.Stats as Stats @@ -269,6 +270,7 @@ createConsumer ++ Consumer.logLevel logLevel ++ Consumer.setCallback (Consumer.rebalanceCallback rebalance) ++ Consumer.compression Consumer.Snappy + ++ Consumer.setAssignmentStrategy [AssignmentStrategy.CooperativeStickyAssignor] ++ Consumer.extraProps ( Dict.fromList [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), diff --git a/nri-kafka/test/Helpers.hs b/nri-kafka/test/Helpers.hs index 0212a460..73485b5a 100644 --- a/nri-kafka/test/Helpers.hs +++ b/nri-kafka/test/Helpers.hs @@ -161,6 +161,7 @@ record topicName partitionId val = { Producer.prTopic = Producer.TopicName (Internal.unTopic topicName), Producer.prPartition = Producer.SpecifiedPartition (Prelude.fromIntegral partitionId), Producer.prKey = Nothing, + Producer.prHeaders = Prelude.mempty, Producer.prValue = Internal.MsgWithMetaData { Internal.metaData = diff --git a/run-tests.sh b/run-tests.sh index d0b60f20..6d96a163 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -16,18 +16,23 @@ pg_ctl start -o '-k .' mkdir -p ./_build/redis/data redis-server --daemonize yes --dir ./_build/redis/data -## start zookeeper (for kafka) +## start zookeeper (for kafka) +zk_server_properties_path=$(dirname "$(which zkServer.sh)")/../conf/zoo_sample.cfg mkdir -p /tmp/zookeeper /tmp/zookeeper-logs -ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh stop zoo_sample.cfg +ZOOPIDFILE=/tmp/zookeeper-logs/pid \ + ZOO_LOG_DIR=/tmp/zookeeper-logs \ + zkServer.sh stop "$zk_server_properties_path" rm -rf /tmp/zookeeper/* /tmp/zookeeper-logs/* -ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh start zoo_sample.cfg +ZOOPIDFILE=/tmp/zookeeper-logs/pid \ + ZOO_LOG_DIR=/tmp/zookeeper-logs \ + zkServer.sh start "$zk_server_properties_path" ## wait for zookeeper echo "waiting for zookeeper to start" until nc -vz localhost 2181 do sleep 1 -done +done echo "zookeeper available" ## start kafka @@ -50,4 +55,6 @@ cabal test all # cleanup kafka-server-stop.sh -ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh stop zoo_sample.cfg +ZOOPIDFILE=/tmp/zookeeper-logs/pid \ + ZOO_LOG_DIR=/tmp/zookeeper-logs \ + zkServer.sh stop "$zk_server_properties_path" diff --git a/shell-ghc-8-10.nix b/shell-ghc-8-10.nix index dbc64a2f..9ebf1b15 100644 --- a/shell-ghc-8-10.nix +++ b/shell-ghc-8-10.nix @@ -10,5 +10,7 @@ in import nix/mk-shell.nix { safe-coloured-text-terminfo = super.callCabal2nix "safe-coloured-text-terminfo" "${sources.safe-coloured-text}/safe-coloured-text-terminfo" { }; + hw-kafka-client = pkgs.haskell.lib.dontCheck + (super.callCabal2nix "hw-kafka-client" sources.hw-kafka-client { }); }); }