diff --git a/examples/trace/trace.go b/examples/trace/trace.go new file mode 100644 index 0000000..934dbc7 --- /dev/null +++ b/examples/trace/trace.go @@ -0,0 +1,126 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/openGemini/opengemini-client-go/opengemini" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +// setupOTelSDK bootstraps the OpenTelemetry pipeline. +// If it does not return an error, make sure to call shutdown for proper cleanup. +func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown = func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + // handleErr calls shutdown for cleanup and makes sure that all errors are returned. + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + // Set up propagator. + prop := newPropagator() + otel.SetTextMapPropagator(prop) + + // Set up trace provider. + tracerProvider, err := newTracerProvider() + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + return +} + +func newPropagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) +} + +func newTracerProvider() (*sdktrace.TracerProvider, error) { + // test: export to jaeger + traceExporter, err := otlptracehttp.New(context.Background(), + otlptracehttp.WithEndpoint("127.0.0.1:4318"), + otlptracehttp.WithInsecure()) + if err != nil { + return nil, err + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(resource.NewWithAttributes("", semconv.ServiceName("opengemini-client-go"))), + sdktrace.WithBatcher(traceExporter, + // Default is 5s. Set to 1s for demonstrative purposes. + sdktrace.WithBatchTimeout(time.Second)), + ) + return tracerProvider, nil +} + +func main() { + var ctx = context.Background() + shutdown, err := setupOTelSDK(ctx) + if err != nil { + return + } + //Handle shutdown properly so nothing leaks. + defer func() { + err = errors.Join(err, shutdown(ctx)) + }() + + // create an openGemini client + config := &opengemini.Config{ + Addresses: []opengemini.Address{{ + Host: "127.0.0.1", + Port: 8086, + }}, + } + client, err := opengemini.NewClient(config) + if err != nil { + fmt.Println(err) + return + } + + // set otel interceptor + client.Interceptors(opengemini.NewOtelInterceptor()) + + err = client.CreateDatabase("db0") + if err != nil { + // do something + } +} diff --git a/go.mod b/go.mod index 652054e..fbdfa76 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/openGemini/opengemini-client-go -go 1.22 +go 1.24.0 require ( github.com/golang/snappy v1.0.0 @@ -10,24 +10,37 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/stretchr/testify v1.10.0 github.com/vmihailenco/msgpack/v5 v5.4.1 - google.golang.org/grpc v1.65.1 - google.golang.org/protobuf v1.35.2 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 + go.opentelemetry.io/otel/sdk v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 + google.golang.org/grpc v1.73.0 + google.golang.org/protobuf v1.36.6 ) require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - golang.org/x/net v0.26.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index bfb991e..c75a133 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,26 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= +github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -33,26 +45,48 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= -google.golang.org/grpc v1.65.1 h1:toSN4j5/Xju+HVovfaY5g1YZVuJeHzQZhP8eJ0L0f1I= -google.golang.org/grpc v1.65.1/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= -google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= -google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 h1:bDMKF3RUSxshZ5OjOTi8rsHGaPKsAt76FaqgvIUySLc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0/go.mod h1:dDT67G/IkA46Mr2l9Uj7HsQVwsjASyV9SjGofsiUZDA= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os= +go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 h1:oWVWY3NzT7KJppx2UKhKmzPq4SRe0LdCijVRwvGeikY= +google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822/go.mod h1:h3c4v36UTKzUiuaOKQ6gr3S+0hovBtUrXzTG/i3+XEc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1:fc6jSaCT0vBduLYZHYrBBNY4dsWuvgyff9noRNDdBeE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/opengemini/client.go b/opengemini/client.go index 68464ec..c59ee56 100644 --- a/opengemini/client.go +++ b/opengemini/client.go @@ -18,6 +18,7 @@ import ( "context" "crypto/tls" "log/slog" + "net/http" "strconv" "time" @@ -51,6 +52,13 @@ const ( CompressMethodNone CompressMethod = "NONE" ) +type InterceptorClosure func(ctx context.Context, response *http.Response) error + +type Interceptor interface { + Query(ctx context.Context, query *InterceptorQuery) InterceptorClosure + Write(ctx context.Context, write *InterceptorWrite) InterceptorClosure +} + // Client represents a openGemini client. type Client interface { // Ping check that status of cluster. @@ -135,6 +143,8 @@ type Client interface { // ExposeMetrics expose prometheus metrics, calling prometheus.MustRegister(metrics) to register ExposeMetrics() prometheus.Collector + // Interceptors inject interceptor + Interceptors(...Interceptor) } // Config is used to construct a openGemini Client instance. diff --git a/opengemini/client_impl.go b/opengemini/client_impl.go index 0c07cf8..3e0f2d6 100644 --- a/opengemini/client_impl.go +++ b/opengemini/client_impl.go @@ -33,13 +33,14 @@ type endpoint struct { } type client struct { - config *Config - endpoints []endpoint - cli *http.Client - prevIdx atomic.Int32 - dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB] - metrics *metrics - rpcClient *writerClient + config *Config + endpoints []endpoint + cli *http.Client + prevIdx atomic.Int32 + dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB] + metrics *metrics + rpcClient *writerClient + interceptors []Interceptor batchContext context.Context batchContextCancel context.CancelFunc @@ -47,6 +48,10 @@ type client struct { logger *slog.Logger } +func (c *client) Interceptors(interceptor ...Interceptor) { + c.interceptors = append(c.interceptors, interceptor...) +} + func newClient(c *Config) (Client, error) { if len(c.Addresses) == 0 { return nil, ErrNoAddress diff --git a/opengemini/default_interceptor.go b/opengemini/default_interceptor.go new file mode 100644 index 0000000..af01730 --- /dev/null +++ b/opengemini/default_interceptor.go @@ -0,0 +1,146 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opengemini + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +const ( + TraceName = "opengemini-client-go" + SpanNameQuery = "query" + SpanNameWrite = "write" + AttributeDatabase = "db" + AttributeRetentionPolicy = "rp" + AttributeMeasurement = "mst" + AttributePrecision = "precision" + AttributeCommand = "cmd" + AttributeResponseStatusCode = "status-code" + AttributeResponseBody = "response-body" + AttributeWriteLineProtocol = "lp" +) + +var ( + tracer = otel.Tracer(TraceName) +) + +type InterceptorQuery struct { + *Query + Ctx context.Context + Span trace.Span + Carrier propagation.TextMapCarrier +} + +type InterceptorWrite struct { + Database string + RetentionPolicy string + Measurement string + LineProtocol string + Precision string + Ctx context.Context + Span trace.Span + Carrier propagation.TextMapCarrier +} + +type OtelClient struct { +} + +func NewOtelInterceptor() Interceptor { + return &OtelClient{} +} + +func (o *OtelClient) Query(ctx context.Context, query *InterceptorQuery) InterceptorClosure { + if query.Carrier != nil { + ctx = otel.GetTextMapPropagator().Extract(ctx, query.Carrier) + } + var span trace.Span + + if query.Span != nil { + span = query.Span + } else { + ctx, span = tracer.Start(ctx, SpanNameQuery) + } + + if query.Carrier != nil { + otel.GetTextMapPropagator().Inject(ctx, query.Carrier) + } + + span.SetAttributes(attribute.String(AttributeDatabase, query.Database)) + span.SetAttributes(attribute.String(AttributeRetentionPolicy, query.RetentionPolicy)) + span.SetAttributes(attribute.String(AttributePrecision, query.Precision.Epoch())) + span.SetAttributes(attribute.String(AttributeCommand, query.Command)) + + return func(ctx context.Context, response *http.Response) error { + defer span.End() + if response == nil { + fmt.Println("otel interceptor query response body is nil") + return nil // when otel makes an error, it does not affect the main business process + } + + var buf bytes.Buffer + tee := io.TeeReader(response.Body, &buf) + data, err := io.ReadAll(tee) + if err != nil { + fmt.Println("otel interceptor read query response body failed: " + err.Error()) + return nil + } + response.Body = io.NopCloser(&buf) + + span.SetAttributes(attribute.Int(AttributeResponseStatusCode, response.StatusCode)) + span.SetAttributes(attribute.String(AttributeResponseBody, string(data))) + return nil + } +} + +func (o *OtelClient) Write(ctx context.Context, write *InterceptorWrite) InterceptorClosure { + if write.Carrier != nil { + ctx = otel.GetTextMapPropagator().Extract(ctx, write.Carrier) + } + var span trace.Span + + if write.Span != nil { + span = write.Span + } else { + ctx, span = tracer.Start(ctx, SpanNameWrite) + } + + if write.Carrier != nil { + otel.GetTextMapPropagator().Inject(ctx, write.Carrier) + } + + span.SetAttributes(attribute.String(AttributeDatabase, write.Database)) + span.SetAttributes(attribute.String(AttributeRetentionPolicy, write.RetentionPolicy)) + span.SetAttributes(attribute.String(AttributePrecision, write.Precision)) + span.SetAttributes(attribute.String(AttributeWriteLineProtocol, write.LineProtocol)) + + return func(ctx context.Context, response *http.Response) error { + defer span.End() + if response == nil { + fmt.Println("otel interceptor write response body is nil") + return nil + } + span.SetAttributes(attribute.Int(AttributeResponseStatusCode, response.StatusCode)) + return nil + } +} diff --git a/opengemini/default_interceptor_test.go b/opengemini/default_interceptor_test.go new file mode 100644 index 0000000..ff9d4e7 --- /dev/null +++ b/opengemini/default_interceptor_test.go @@ -0,0 +1,148 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opengemini + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOtelClient_WriteInterceptors(t *testing.T) { + c := testDefaultClient(t) + + //Register the OtelCClient interceptor + c.Interceptors(NewOtelInterceptor()) + + databaseName := randomDatabaseName() + err := c.CreateDatabase(databaseName) + assert.NoError(t, err) + time.Sleep(time.Second * 3) + point := &Point{ + Measurement: "test_write", + Precision: PrecisionNanosecond, + Timestamp: time.Now().UnixNano(), + Tags: map[string]string{ + "foo": "bar", + }, + Fields: map[string]interface{}{ + "v1": 1, + }, + } + err = c.WritePoint(databaseName, point, CallbackDummy) + require.Nil(t, err) +} + +func TestOtelClient_ShowTagKeys(t *testing.T) { + c := testDefaultClient(t) + //Register the OtelCClient interceptor + c.Interceptors(NewOtelInterceptor()) + + databaseName := randomDatabaseName() + err := c.CreateDatabase(databaseName) + assert.NoError(t, err) + point := &Point{ + Measurement: "test_write", + Precision: PrecisionNanosecond, + Timestamp: time.Now().UnixNano(), + Tags: map[string]string{ + "foo": "bar", + }, + Fields: map[string]interface{}{ + "v1": 1, + }, + } + err = c.WritePoint(databaseName, point, CallbackDummy) + require.Nil(t, err) + measurement := randomMeasurement() + cmd := fmt.Sprintf("CREATE MEASUREMENT %s (tag1 TAG,tag2 TAG,tag3 TAG, field1 INT64 FIELD, field2 BOOL, field3 STRING, field4 FLOAT64)", measurement) + _, err = c.Query(Query{Command: cmd, Database: databaseName}) + assert.Nil(t, err) + // SHOW TAG KEYS FROM measurement limit 3 OFFSET 0 + tagKeyResult, err := c.ShowTagKeys(NewShowTagKeysBuilder().Database(databaseName).Measurement(measurement).Limit(3).Offset(0)) + assert.Nil(t, err) + assert.Equal(t, 1, len(tagKeyResult)) + assert.Equal(t, 3, len(tagKeyResult[measurement])) + err = c.DropDatabase(databaseName) + require.Nil(t, err) +} + +func TestOtelShowDatabase(t *testing.T) { + c := testDefaultClient(t) + //Register the OtelCClient interceptor + c.Interceptors(NewOtelInterceptor()) + + databaseName := randomDatabaseName() + err := c.CreateDatabase(databaseName) + if err != nil { + t.Logf("Error creating database %q: %v", databaseName, err) + return + } + require.Nil(t, err) + + result, err := c.ShowDatabases() + require.Nil(t, err) + assert.NotEmpty(t, result) +} + +func TestOtelWritePoint(t *testing.T) { + c := testDefaultClient(t) + //Register the OtelCClient interceptor + c.Interceptors(NewOtelInterceptor()) + + databaseName := randomDatabaseName() + err := c.CreateDatabase(databaseName) + assert.Nil(t, err, "failure:%v", err) + + point := &Point{ + Measurement: "test_write", + Precision: PrecisionNanosecond, + Timestamp: time.Now().UnixNano(), + Tags: map[string]string{ + "foo": "bar", + }, + Fields: map[string]interface{}{ + "v1": 1, + }, + } + + err = c.WritePoint(databaseName, point, CallbackDummy) + assert.Nil(t, err) +} + +func TestOtelCreateAndQueryMeasurement(t *testing.T) { + c := testDefaultClient(t) + //Register the OtelCClient interceptor + c.Interceptors(NewOtelInterceptor()) + databaseName := randomDatabaseName() + err := c.CreateDatabase(databaseName) + require.Nil(t, err) + + measurement := randomMeasurement() + + createCmd := fmt.Sprintf("CREATE MEASUREMENT %s (tag1 TAG, tag2 TAG, field1 INT64 FIELD)", measurement) + createQuery := Query{Command: createCmd, Database: databaseName} + _, err = c.Query(createQuery) + require.Nil(t, err) + + queryCmd := fmt.Sprintf("SELECT * FROM %s", measurement) + queryQuery := Query{Command: queryCmd, Database: databaseName} + result, err := c.Query(queryQuery) + require.Nil(t, err) + assert.NotEmpty(t, result) +} diff --git a/opengemini/http.go b/opengemini/http.go index 9eea3bc..bb2dddd 100644 --- a/opengemini/http.go +++ b/opengemini/http.go @@ -29,6 +29,36 @@ type requestDetails struct { body io.Reader } +func (req *requestDetails) toQuery() *InterceptorQuery { + if req.queryValues == nil { + return &InterceptorQuery{} + } + return &InterceptorQuery{ + Query: &Query{ + Database: req.queryValues.Get("db"), + Command: req.queryValues.Get("q"), + RetentionPolicy: req.queryValues.Get("rp"), + Precision: ToPrecision(req.queryValues.Get("epoch")), + }, + } +} + +func (req *requestDetails) toWrite() *InterceptorWrite { + if req.queryValues == nil { + return &InterceptorWrite{} + } + body, err := io.ReadAll(req.body) + if err != nil { + return &InterceptorWrite{} + } + return &InterceptorWrite{ + Database: req.queryValues.Get("db"), + RetentionPolicy: req.queryValues.Get("rp"), + LineProtocol: string(body), + Precision: req.queryValues.Get("epoch"), + } +} + func (c *client) updateAuthHeader(method, urlPath string, header http.Header) http.Header { if c.config.AuthConfig == nil { return header @@ -122,5 +152,30 @@ func (c *client) executeHttpRequestInner(ctx context.Context, method, serverUrl, } } - return c.cli.Do(request) + var closures []InterceptorClosure + for _, interceptor := range c.interceptors { + var closure InterceptorClosure + switch urlPath { + case UrlWrite: + var data = details.toWrite() + closure = interceptor.Write(ctx, data) + default: + var data = details.toQuery() + closure = interceptor.Query(ctx, data) + } + closures = append(closures, closure) + } + + response, err := c.cli.Do(request) + if err != nil { + return nil, err + } + + for _, fn := range closures { + if err := fn(ctx, response); err != nil { + return nil, err + } + } + + return response, nil }