Skip to content

[watermill-kafka] Context value added during unmarshal missing #633

@mike-gurney

Description

@mike-gurney

Description

I am doing a POC with Watermill and am trying to put something into the watermill message context during the unmarshal from kafka to watermill using a custom unmarshaller. However, the value I put in during that step seems to get lost when the message is handled during with a ConsumerHandler. It looks like the context is being overwritten.

Relevant Code Links

It appears the context is being overwritten here: https://github.com/ThreeDotsLabs/watermill-kafka/blob/bd670972f95735397cbd6139f902e38bef6aa5af/pkg/kafka/subscriber.go#L582

Steps to reproduce

Trimmed down version of code that should show the issue:

  1. Run provided docker compose docker-compose up -d (or have a kafka broker available)
  2. Run the provided code (changing the broker value if needed)

docker-compose.yaml

version: '3.8'
services:
  # Copied from https://github.com/confluentinc/cp-all-in-one/blob/6b81eae4be858c2c2cbcbe1dc37439c076148e22/cp-all-in-one-kraft/docker-compose.yml
  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_METRIC_REPORTERS: io.confluent.telemetry.reporter.TelemetryReporter
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CLUSTER_LINK_METADATA_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_CLUSTER_LINK_METADATA_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_DURABILITY_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_TIER_METADATA_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/IBM/sarama"
	"github.com/ThreeDotsLabs/watermill"
	wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
)

var (
	publisher    message.Publisher
	globalLogger *slog.Logger
)

type TestMarshaller struct {
	defaultMarshaller wmkafka.DefaultMarshaler
}

func NewTestMarshaller() *TestMarshaller {
	return &TestMarshaller{
		defaultMarshaller: wmkafka.DefaultMarshaler{},
	}
}

func (m *TestMarshaller) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
	return m.defaultMarshaller.Marshal(topic, msg)
}

func (m *TestMarshaller) Unmarshal(msg *sarama.ConsumerMessage) (*message.Message, error) {
	wmMsg, err := m.defaultMarshaller.Unmarshal(msg)
	if err != nil {
		return nil, err
	}
	wmMsg.SetContext(context.WithValue(wmMsg.Context(), "foo", "bar"))
	globalLogger.Info("-----------------Unmarshalling message----------------")
	globalLogger.Info(fmt.Sprintf("%v", wmMsg.Context()))
	globalLogger.Info("------------------------------------------------------")
	return wmMsg, nil
}

func produce() {
	for {
		time.Sleep(10 * time.Second)
		globalLogger.Info("Producing Message")
		publisher.Publish("testTopic", message.NewMessage(watermill.NewUUID(), []byte("hello")))
	}
}

func consumeTestTopic(msg *message.Message) error {
	globalLogger.Info("-------------Received message on testTopic-------------")
	globalLogger.Info(fmt.Sprintf("%v", msg.Context()))
	globalLogger.Info("------------------------------------------------------")
	return nil
}

func main() {
	globalLogger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}))

	// Create my kafka subscriber
	kafkaConsumer, consErr := wmkafka.NewSubscriber(wmkafka.SubscriberConfig{
		Brokers:       []string{"localhost:9092"},
		Unmarshaler:   NewTestMarshaller(),
		ConsumerGroup: "topicGroup",
	}, watermill.NewSlogLogger(globalLogger))
	if consErr != nil {
		globalLogger.Error("failed to create consumer", "error", consErr)
		panic(consErr)
	}

	router, err := message.NewRouter(message.RouterConfig{}, watermill.NewSlogLogger(globalLogger))
	if err != nil {
		globalLogger.Error("failed to create router", "error", err)
		panic(err)
	}
	defer router.Close()

	router.AddPlugin(plugin.SignalsHandler)

	router.AddConsumerHandler("consumeTestTopic", "testTopic", kafkaConsumer, consumeTestTopic)

	var prodErr error
	// Create my kafka publisher
	publisher, prodErr = wmkafka.NewPublisher(wmkafka.PublisherConfig{
		Brokers:   []string{"localhost:9092"},
		Marshaler: NewTestMarshaller(),
	}, watermill.NewSlogLogger(globalLogger))
	if prodErr != nil {
		globalLogger.Error("failed to create publisher", "error", prodErr)
		panic(prodErr)
	}
	go produce()

	router.Run(context.Background())
}

Expected behavior

My context value should be present when the message is handled.

Actual behavior

My context value is not there - however there is a context with values.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions