Skip to content

Commit 85a9fc1

Browse files
Support gcp cloud run (#3)
Co-authored-by: Shaharia Azam <[email protected]>
1 parent 24d0378 commit 85a9fc1

File tree

13 files changed

+557
-99
lines changed

13 files changed

+557
-99
lines changed

go.mod

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,44 @@ module github.com/shaharia-lab/telemetry-forwarder
33
go 1.24.1
44

55
require (
6+
cloud.google.com/go/logging v1.13.0
67
github.com/kelseyhightower/envconfig v1.4.0
78
github.com/stretchr/testify v1.10.0
9+
google.golang.org/api v0.229.0
810
)
911

1012
require (
13+
cloud.google.com/go v0.117.0 // indirect
14+
cloud.google.com/go/auth v0.16.0 // indirect
15+
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
16+
cloud.google.com/go/compute/metadata v0.6.0 // indirect
17+
cloud.google.com/go/longrunning v0.6.2 // indirect
1118
github.com/davecgh/go-spew v1.1.1 // indirect
19+
github.com/felixge/httpsnoop v1.0.4 // indirect
20+
github.com/go-logr/logr v1.4.2 // indirect
21+
github.com/go-logr/stdr v1.2.2 // indirect
22+
github.com/google/s2a-go v0.1.9 // indirect
23+
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
24+
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
1225
github.com/pmezard/go-difflib v1.0.0 // indirect
1326
github.com/stretchr/objx v0.5.2 // indirect
27+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
28+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
29+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
30+
go.opentelemetry.io/otel v1.35.0 // indirect
31+
go.opentelemetry.io/otel/metric v1.35.0 // indirect
32+
go.opentelemetry.io/otel/trace v1.35.0 // indirect
33+
golang.org/x/crypto v0.37.0 // indirect
34+
golang.org/x/net v0.39.0 // indirect
35+
golang.org/x/oauth2 v0.29.0 // indirect
36+
golang.org/x/sync v0.13.0 // indirect
37+
golang.org/x/sys v0.32.0 // indirect
38+
golang.org/x/text v0.24.0 // indirect
39+
golang.org/x/time v0.11.0 // indirect
40+
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect
41+
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect
42+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect
43+
google.golang.org/grpc v1.71.1 // indirect
44+
google.golang.org/protobuf v1.36.6 // indirect
1445
gopkg.in/yaml.v3 v3.0.1 // indirect
1546
)

go.sum

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,100 @@
1+
cloud.google.com/go v0.117.0 h1:Z5TNFfQxj7WG2FgOGX1ekC5RiXrYgms6QscOm32M/4s=
2+
cloud.google.com/go v0.117.0/go.mod h1:ZbwhVTb1DBGt2Iwb3tNO6SEK4q+cplHZmLWH+DelYYc=
3+
cloud.google.com/go/auth v0.16.0 h1:Pd8P1s9WkcrBE2n/PhAwKsdrR35V3Sg2II9B+ndM3CU=
4+
cloud.google.com/go/auth v0.16.0/go.mod h1:1howDHJ5IETh/LwYs3ZxvlkXF48aSqqJUM+5o02dNOI=
5+
cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc=
6+
cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c=
7+
cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I=
8+
cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg=
9+
cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA=
10+
cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY=
11+
cloud.google.com/go/logging v1.13.0 h1:7j0HgAp0B94o1YRDqiqm26w4q1rDMH7XNRU34lJXHYc=
12+
cloud.google.com/go/logging v1.13.0/go.mod h1:36CoKh6KA/M0PbhPKMq6/qety2DCAErbhXT62TuXALA=
13+
cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc=
14+
cloud.google.com/go/longrunning v0.6.2/go.mod h1:k/vIs83RN4bE3YCswdXC5PFfWVILjm3hpEUlSko4PiI=
115
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
216
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
17+
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
18+
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
19+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
20+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
21+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
22+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
23+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
24+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
25+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
26+
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
27+
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
28+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
29+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
30+
github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
31+
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
32+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
33+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
34+
github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4=
35+
github.com/googleapis/enterprise-certificate-proxy v0.3.6/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA=
36+
github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrkurSS/Q=
37+
github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA=
338
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
439
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
40+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
41+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
42+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
43+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
544
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
645
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
46+
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
47+
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
748
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
849
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
950
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
1051
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
11-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
52+
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
53+
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
54+
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
55+
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
56+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 h1:x7wzEgXfnzJcHDwStJT+mxOz4etr2EcexjqhBvmoakw=
57+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0/go.mod h1:rg+RlpR5dKwaS95IyyZqj5Wd4E13lk/msnTS0Xl9lJM=
58+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
59+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ=
60+
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
61+
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
62+
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
63+
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
64+
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
65+
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
66+
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
67+
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
68+
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
69+
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
70+
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
71+
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
72+
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
73+
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
74+
golang.org/x/oauth2 v0.29.0 h1:WdYw2tdTK1S8olAzWHdgeqfy+Mtm9XNhv/xJsY65d98=
75+
golang.org/x/oauth2 v0.29.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
76+
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
77+
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
78+
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
79+
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
80+
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
81+
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
82+
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
83+
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
84+
google.golang.org/api v0.229.0 h1:p98ymMtqeJ5i3lIBMj5MpR9kzIIgzpHHh8vQ+vgAzx8=
85+
google.golang.org/api v0.229.0/go.mod h1:wyDfmq5g1wYJWn29O22FDWN48P7Xcz0xz+LBpptYvB0=
86+
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk=
87+
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc=
88+
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24=
89+
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw=
90+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e h1:ztQaXfzEXTmCBvbtWYRhJxW+0iJcz2qXfd38/e9l7bA=
91+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
92+
google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI=
93+
google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
94+
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
95+
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
1296
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
97+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
98+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
1399
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
14100
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/config/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@ import (
44
"github.com/kelseyhightower/envconfig"
55
)
66

7-
// Environment variables
7+
// Config holds the configuration for the telemetry forwarder
88
type Config struct {
99
HTTPAPIPort string `envconfig:"HTTP_API_PORT" default:"8080"`
1010
HoneycombAPIKey string `envconfig:"HONEYCOMB_API_KEY"`
1111
HoneycombDataset string `envconfig:"HONEYCOMB_DATASET" default:"cli-telemetry"`
1212
HoneycombAPIURL string `envconfig:"HONEYCOMB_API_URL"`
13+
14+
GCPProjectID string `envconfig:"GCP_PROJECT_ID"`
15+
GCPLogName string `envconfig:"GCP_LOG_NAME" default:"telemetry-forwarder-events"`
1316
}
1417

1518
// Load loads the configuration from environment variables

internal/event/processor.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Package event provides functionality to process telemetry events and forward them to registered providers.If thIf
2+
package event
3+
4+
import (
5+
"context"
6+
"log"
7+
"sync"
8+
"time"
9+
10+
"github.com/shaharia-lab/telemetry-forwarder/internal/provider"
11+
"github.com/shaharia-lab/telemetry-forwarder/internal/types"
12+
)
13+
14+
// Processor is responsible for processing telemetry events and forwarding them to registered providers.
15+
type Processor struct {
16+
providerRegistry *provider.Registry
17+
eventChan chan types.OTelEvent
18+
done chan struct{}
19+
wg sync.WaitGroup
20+
}
21+
22+
// NewEventProcessor creates a new event processor with the given provider registry and buffer size.
23+
func NewEventProcessor(providerRegistry *provider.Registry, bufferSize int) *Processor {
24+
return &Processor{
25+
providerRegistry: providerRegistry,
26+
eventChan: make(chan types.OTelEvent, bufferSize),
27+
done: make(chan struct{}),
28+
}
29+
}
30+
31+
// Start begins processing events in a separate goroutine.
32+
func (p *Processor) Start() {
33+
p.wg.Add(1)
34+
go p.processEvents()
35+
}
36+
37+
func (p *Processor) processEvents() {
38+
defer p.wg.Done()
39+
40+
for {
41+
select {
42+
case event := <-p.eventChan:
43+
p.handleEvent(event)
44+
case <-p.done:
45+
log.Println("Event processor shutting down, processing remaining events...")
46+
47+
for {
48+
select {
49+
case event := <-p.eventChan:
50+
p.handleEvent(event)
51+
default:
52+
log.Println("Finished processing all events")
53+
return
54+
}
55+
}
56+
}
57+
}
58+
}
59+
60+
func (p *Processor) handleEvent(event types.OTelEvent) {
61+
var providerWg sync.WaitGroup
62+
ctx := context.Background()
63+
64+
for _, prv := range p.providerRegistry.GetAll() {
65+
if prv.IsEnabled() {
66+
providerWg.Add(1)
67+
go func(p provider.Provider) {
68+
defer providerWg.Done()
69+
if err := p.Send(ctx, event); err != nil {
70+
log.Printf("Error forwarding to %s: %v", p.Name(), err)
71+
}
72+
}(prv)
73+
}
74+
}
75+
providerWg.Wait()
76+
}
77+
78+
// EnqueueEvent adds an event to the processing queue. If the queue is full, it drops the event.
79+
func (p *Processor) EnqueueEvent(event types.OTelEvent) {
80+
select {
81+
case p.eventChan <- event:
82+
case <-time.After(100 * time.Millisecond):
83+
log.Println("Warning: Event processor queue is full, dropping event")
84+
}
85+
}
86+
87+
// Shutdown gracefully shuts down the event processor, waiting for all events to be processed.
88+
func (p *Processor) Shutdown() error {
89+
log.Println("Shutting down event processor...")
90+
close(p.done)
91+
92+
p.wg.Wait()
93+
close(p.eventChan)
94+
95+
log.Println("Event processor shutdown complete")
96+
return nil
97+
}

internal/handler/handler.go

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,28 @@ package handler
22

33
import (
44
"encoding/json"
5-
"github.com/shaharia-lab/telemetry-forwarder/internal/provider"
6-
"github.com/shaharia-lab/telemetry-forwarder/internal/types"
7-
"log"
85
"net/http"
9-
"sync"
6+
7+
"github.com/shaharia-lab/telemetry-forwarder/internal/event"
8+
"github.com/shaharia-lab/telemetry-forwarder/internal/types"
109
)
1110

12-
// TelemetryCollectHandler handles incoming telemetry events and forwards them to the configured providers.
13-
func TelemetryCollectHandler(providerRegistry *provider.ProviderRegistry) http.HandlerFunc {
11+
// TelemetryCollectHandler handles incoming telemetry events and forwards them to the event processor.
12+
func TelemetryCollectHandler(eventProcessor *event.Processor) http.HandlerFunc {
1413
return func(w http.ResponseWriter, r *http.Request) {
1514
if r.Method != http.MethodPost {
1615
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
1716
return
1817
}
1918

20-
var event types.OTelEvent
21-
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
19+
var evt types.OTelEvent
20+
if err := json.NewDecoder(r.Body).Decode(&evt); err != nil {
2221
http.Error(w, "Invalid request body", http.StatusBadRequest)
2322
return
2423
}
2524

26-
var wg sync.WaitGroup
27-
for _, prv := range providerRegistry.GetAll() {
28-
if prv.IsEnabled() {
29-
wg.Add(1)
30-
go func(p provider.Provider) {
31-
defer wg.Done()
32-
if err := p.Send(r.Context(), event); err != nil {
33-
log.Printf("Error forwarding to %s: %v", p.Name(), err)
34-
}
35-
}(prv)
36-
}
37-
}
38-
wg.Wait()
25+
eventProcessor.EnqueueEvent(evt)
3926

40-
w.WriteHeader(http.StatusOK)
27+
w.WriteHeader(http.StatusAccepted)
4128
}
4229
}

0 commit comments

Comments
 (0)