- 
                Notifications
    You must be signed in to change notification settings 
- Fork 83
Open
Description
I want to test if an offset is committed or not (because only commit in some situations). I have not found any way to test that.
I'm very newbie at this, so probably there's something I just don't get.
I'm using akka kafka streams. Here's an example of a test that is working.
  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes
      withRunningKafka {
        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()
        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }
publishAndRequestRetry uses publishToKafka to put a message and wait for the source to get it.
I want to add a test to know if the offset has been commited or not. Is this possible with EmbeddedKafka?
poslegm
Metadata
Metadata
Assignees
Labels
No labels