Skip to content

Wrap pulsar client so that it supports opentelemetry.

License

Notifications You must be signed in to change notification settings

me-cs/pulsartracing

pulsartracing

Wrap pulsar so that it supports opentelemetry.

Go codecov Release Go Report Card Go Reference License: MIT

English | 简体中文

Example use:

package main

import (
	"context"

	"github.com/apache/pulsar-client-go/pulsar"
	"github.com/me-cs/pulsartracing"
)

var pulsarClient pulsar.Client

func produce() {
	p, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{})
	if err != nil {
		panic(err)
	}
	//Generally an upstream context that has already been traced.
	_, _ = p.Send(context.Background(), &pulsar.ProducerMessage{})
}

func consume() {
	customerConsumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{})
	if err != nil {
		panic(err)
	}
	for {
		ctx := context.Background()
		ctx, msg, err := pulsartracing.ReceiveWithSpanCtx(ctx, customerConsumer)
		if err != nil {
			continue
		}
		err = customerConsumer.Ack(msg)
		if err != nil {
			continue
		}
		//Pass this context to the downstream
		//downstream(ctx)
		//Then you can see in your link tracking system (e.g. jaeger) that the message was tracked to
	}
}

func main() {
	var err error
	pulsarClient, err = pulsartracing.NewClient(pulsar.ClientOptions{
		URL: "pulsar://pulsar.xxx.cn:6650",
	})
	if err != nil {
		panic(err)
	}
	produce()
	consume()
}

More Important to Note Assuming your application initializes opentelemetry like this, you need to bridge opentelemetry to opentracing

package main

import (
	"github.com/opentracing/opentracing-go"
	"go.opentelemetry.io/otel"
	otelBridge "go.opentelemetry.io/otel/bridge/opentracing"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

var (
	tp *sdktrace.TracerProvider
)

func main() {
	opts := []sdktrace.TracerProviderOption{}
	tp = sdktrace.NewTracerProvider(opts...)
	otel.SetTracerProvider(tp)
	otelTracer := tp.Tracer("you trace name")
	// Use the bridgeTracer as your OpenTracing tracer.
	bridgeTracer, wrapperTracerProvider := otelBridge.NewTracerPair(otelTracer)
	// Set the wrapperTracerProvider as the global OpenTelemetry
	// TracerProvider so instrumentation will use it by default.
	otel.SetTracerProvider(wrapperTracerProvider)
	opentracing.SetGlobalTracer(bridgeTracer)
	return
}

Here's a screenshot of my test, if your follow my steps, you can see the message being tracked correctly in your environment too! example If you have any questions, feel free to ask questions, I'm happy to help~

About

Wrap pulsar client so that it supports opentelemetry.

Resources

License

Code of conduct

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages