Skip to content

[BUG]: Karafka integration doesn't inject trace digest to produced messages #4868

@Drowze

Description

@Drowze

Tracer Version(s)

2.19.0

Ruby Version(s)

3.4.1

Relevent Library and Version(s)

karafka 2.4.18, 2.5.0

Bug Report

As Karafka is now supported by the Datadog tracer with a distributed_tracing configuration, I'd expect produced messages to automatically include the current trace's digest, so the current trace is automatically continued when a Karafka consumer consumes it.

Note that it is possible to add a simple Waterdrop middleware to inject the headers (as I did test myself):

module KarafkaDdMiddleware
  def self.call(message)
    dd_trace = Datadog::Tracing.active_trace
    message[:headers] ||= {}
    Datadog::Tracing::Contrib::Karafka.inject(dd_trace, message[:headers]) if dd_trace
    message
  end
end

Karafka.producer.middleware.append(KarafkaDdMiddleware)

Reproduction Code

initialize project:

rails new testing_karafka_datadog
bundle add datadog karafka

Datadog config:

# config/initializers/datadog.rb
Datadog.configure do |c|
  c.tracing.instrument :karafka, distributed_tracing: true, enabled: true
end

Karafka config and consumer (barely changed after running karafka install):

# karafka.rb (without any manually added middleware to inject the trace on the message)
class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka = { 'bootstrap.servers': '127.0.0.1:9092' }
    config.client_id = 'YOUR_APP_NAME'

    config.group_id = 'YOUR_APP_NAME_consumer'
    config.consumer_persistence = !Rails.env.development?
  end

  routes.draw do
    topic :example do
      consumer ExampleConsumer
    end
  end
end

# app/consumers/example_consumer.rb
class ExampleConsumer < ApplicationConsumer
  def consume
    messages.each do |message|
      Datadog::Tracing.trace("consumer") do
        puts "[#{self.class.name}] current trace_id: #{Datadog::Tracing.active_trace&.id || 'not set'}"
        puts "[#{self.class.name}] current span_id: #{Datadog::Tracing.active_span&.id || 'not set'}"
        puts message.payload
      end
    end
  end
end

Producing a message (with karafka server open in another terminal):

# in rails console
Datadog::Tracing.trace("producer") do
  puts "[producing] current trace_id: #{Datadog::Tracing.active_trace&.id || 'not set'}"
  puts "[producing] current span_id: #{Datadog::Tracing.active_span&.id || 'not set'}"

  Karafka.producer.produce_sync(topic: 'example', payload: { 'ping' => 'pong' }.to_json)
end

Outputs (note that the 'producing' trace is not continued in the consumer):

[producing] current trace_id: 139135027431419632075626282499067703165
[producing] current span_id: 1760224571734279276
[ExampleConsumer - before #each] current trace_id: 139135027431419632072840599894897151586
[ExampleConsumer - before #each] current span_id: 1385937625620920418
[ExampleConsumer - inside #each] current trace_id: 139135027431419632072840599894897151586
[ExampleConsumer - inside #each] current span_id: 2562426227694958412
{"ping" => "pong"}
[ExampleConsumer - after #each] current trace_id: 139135027431419632072840599894897151586
[ExampleConsumer - after #each] current span_id: 1385937625620920418

Configuration Block

No response

Error Logs

No response

Operating System

No response

How does Datadog help you?

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugInvolves a bugcommunityWas opened by a community member

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions