Skip to content

Commit 59aa058

Browse files
author
Manendra Pal Singh
committed
Feat: update the pr as per comment
1 parent 1293c24 commit 59aa058

File tree

13 files changed

+153
-172
lines changed

13 files changed

+153
-172
lines changed

cmd/adapter/main.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func addParentIdCtx(ctx context.Context, config *Config) context.Context {
233233
log.Infof(ctx, "Adding POD name: %s", p)
234234
podName = p
235235
} else {
236-
log.Info(ctx, "POD_NAME environment variable not set falling back to hostname")
236+
log.Info(ctx, "POD_NAME environment variable not set, falling back to hostname")
237237
if hostname, err := os.Hostname(); err == nil {
238238
log.Infof(ctx, "Setting POD name as hostname: %s", hostname)
239239
podName = hostname
@@ -243,16 +243,21 @@ func addParentIdCtx(ctx context.Context, config *Config) context.Context {
243243
}
244244

245245
for _, m := range config.Modules {
246-
if m.Handler.Role != "" && m.Handler.SubscriberID != "" {
247-
parentID = string(m.Handler.Role) + ":" + m.Handler.SubscriberID + ":" + podName
248-
break
246+
if m.Handler.Role == "" || m.Handler.SubscriberID == "" {
247+
continue
248+
}
249+
candidate := string(m.Handler.Role) + ":" + m.Handler.SubscriberID + ":" + podName
250+
if parentID == "" {
251+
parentID = candidate
252+
} else if candidate != parentID {
253+
log.Warnf(ctx, "Multiple distinct role:subscriberID pairs found in modules (using %q, also saw %q); consider explicit parent_id config", parentID, candidate)
249254
}
250255
}
251256

252257
if parentID != "" {
253258
ctx = context.WithValue(ctx, model.ContextKeyParentID, parentID)
254259
} else {
255-
log.Warnf(ctx, "Failed to find parent ID in config please add the role and subscriber_id in the handler config ")
260+
log.Warnf(ctx, "Failed to find parent ID in config; add role and subscriber_id to the handler config")
256261
}
257262
return ctx
258263
}

core/module/handler/handlerMetrics.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,3 @@ func newHandlerMetrics() (*HandlerMetrics, error) {
6565

6666
return m, nil
6767
}
68-

core/module/handler/http_metric.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"strings"
78
"sync"
89

910
"github.com/beckn-one/beckn-onix/pkg/telemetry"
@@ -79,10 +80,10 @@ func RecordHTTPRequest(ctx context.Context, statusCode int, action, role, sender
7980

8081
metric_code := action + "_api_total_count"
8182
category := "NetworkHealth"
82-
if action == "/search" || action == "/discovery" {
83+
if strings.HasSuffix(action, "/search") || strings.HasSuffix(action, "/discovery") {
8384
category = "Discovery"
8485
}
85-
attributes = append(attributes, specHttpMetricAttr(metric_code, category)...) //TODO: need to update as per the furthur discussion
86+
attributes = append(attributes, specHttpMetricAttr(metric_code, category)...)
8687
m.HttpRequestCount.Add(ctx, 1, metric.WithAttributes(attributes...))
8788
}
8889

core/module/handler/stdHandler.go

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
122122
record: nil,
123123
}
124124

125-
selfID := h.SubscriberID
126-
remoteID := ""
127-
if v, ok := r.Context().Value(model.ContextKeyRemoteID).(string); ok {
128-
remoteID = v
129-
}
130-
var senderID, receiverID string
131-
if strings.Contains(h.moduleName, "Caller") {
132-
senderID = selfID
133-
receiverID = remoteID
134-
} else {
135-
senderID = remoteID
136-
receiverID = selfID
137-
}
125+
senderID, receiverID := h.resolveDirection(r.Context())
138126
httpMeter, _ := GetHTTPMetrics(r.Context())
139127
if httpMeter != nil {
140128
recordOnce = func() {
@@ -161,7 +149,7 @@ func (h *stdHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
161149
}
162150

163151
body := stepCtx.Body
164-
go telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode), auditlog.String("http.response.error", errString(err)))
152+
telemetry.EmitAuditLogs(r.Context(), body, auditlog.Int("http.response.status_code", wrapped.statusCode), auditlog.String("http.response.error", errString(err)))
165153
span.End()
166154
}()
167155

@@ -385,21 +373,17 @@ func (h *stdHandler) initSteps(ctx context.Context, mgr PluginManager, cfg *Conf
385373
return nil
386374
}
387375

388-
func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) {
376+
func (h *stdHandler) resolveDirection(ctx context.Context) (senderID, receiverID string) {
389377
selfID := h.SubscriberID
390-
remoteID := ""
391-
if v, ok := r.Context().Value(model.ContextKeyRemoteID).(string); ok {
392-
remoteID = v
393-
}
394-
395-
var senderID, receiverID string
378+
remoteID, _ := ctx.Value(model.ContextKeyRemoteID).(string)
396379
if strings.Contains(h.moduleName, "Caller") {
397-
senderID = selfID
398-
receiverID = remoteID
399-
} else {
400-
senderID = remoteID
401-
receiverID = selfID
380+
return selfID, remoteID
402381
}
382+
return remoteID, selfID
383+
}
384+
385+
func setBecknAttr(span trace.Span, r *http.Request, h *stdHandler) {
386+
senderID, receiverID := h.resolveDirection(r.Context())
403387
attrs := []attribute.KeyValue{
404388
telemetry.AttrRecipientID.String(receiverID),
405389
telemetry.AttrSenderID.String(senderID),

core/module/handler/step.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,25 @@ func (s *signStep) Run(ctx *model.StepContext) error {
4141
return model.NewBadReqErr(fmt.Errorf("subscriberID not set"))
4242
}
4343

44-
tracer := otel.Tracer("beckn-onix")
44+
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
4545

4646
var keySet *model.Keyset
4747
{
48-
// to create span to finding the key set
4948
keySetCtx, keySetSpan := tracer.Start(ctx.Context, "keyset")
50-
defer keySetSpan.End()
5149
ks, err := s.km.Keyset(keySetCtx, ctx.SubID)
50+
keySetSpan.End()
5251
if err != nil {
5352
return fmt.Errorf("failed to get signing key: %w", err)
5453
}
5554
keySet = ks
56-
5755
}
5856

5957
{
60-
// to create span for the signa
6158
signerCtx, signerSpan := tracer.Start(ctx.Context, "sign")
62-
defer signerSpan.End()
6359
createdAt := time.Now().Unix()
6460
validTill := time.Now().Add(5 * time.Minute).Unix()
6561
sign, err := s.signer.Sign(signerCtx, ctx.Body, keySet.SigningPrivate, createdAt, validTill)
62+
signerSpan.End()
6663
if err != nil {
6764
return fmt.Errorf("failed to sign request: %w", err)
6865
}
@@ -73,7 +70,6 @@ func (s *signStep) Run(ctx *model.StepContext) error {
7370
header = model.AuthHeaderGateway
7471
}
7572
ctx.Request.Header.Set(header, authHeader)
76-
7773
}
7874

7975
return nil

install/network-observability/docker-compose.yml

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -196,46 +196,41 @@ services:
196196
- zipkin
197197
- loki
198198

199-
bpp-client:
200-
image: fidedocker/protocol-server
201-
container_name: bpp-client
199+
sandbox-bap:
200+
container_name: sandbox-bap
201+
image: fidedocker/sandbox-2.0:latest
202202
platform: linux/amd64
203-
networks:
204-
- beckn_network
203+
environment:
204+
- NODE_ENV=production
205+
- PORT=3001
205206
ports:
206-
- "6001:6001"
207-
restart: unless-stopped
208-
volumes:
209-
- bpp_client_config_volume:/usr/src/app/config
210-
- bpp_client_schemas_volume:/usr/src/app/schemas
211-
- bpp_client_logs_volume:/usr/src/app/logs
212-
213-
bpp-network:
214-
image: fidedocker/protocol-server
215-
container_name: bpp-network
216-
platform: linux/amd64
207+
- "3001:3001"
208+
healthcheck:
209+
test: ["CMD", "wget", "-qO-", "http://localhost:3001/api/health"]
210+
interval: 10s
211+
timeout: 3s
212+
retries: 5
213+
start_period: 10s
217214
networks:
218215
- beckn_network
219-
ports:
220-
- "6002:6002"
221-
restart: unless-stopped
222-
volumes:
223-
- bpp_network_config_volume:/usr/src/app/config
224-
- bpp_network_schemas_volume:/usr/src/app/schemas
225-
- bpp_network_logs_volume:/usr/src/app/logs
226216

227-
sandbox-api:
228-
image: fidedocker/sandbox-api
229-
container_name: sandbox-api
217+
sandbox-bpp:
218+
container_name: sandbox-bpp
219+
image: fidedocker/sandbox-2.0:latest
230220
platform: linux/amd64
221+
environment:
222+
- NODE_ENV=production
223+
- PORT=3002
224+
ports:
225+
- "3002:3002"
226+
healthcheck:
227+
test: ["CMD", "wget", "-qO-", "http://localhost:3002/api/health"]
228+
interval: 10s
229+
timeout: 3s
230+
retries: 5
231+
start_period: 10s
231232
networks:
232233
- beckn_network
233-
ports:
234-
- "4010:4000"
235-
restart: unless-stopped
236-
environment:
237-
- PORT=4000
238-
- WEBHOOK_URL=http://host.docker.internal:3001/webhook
239234

240235
networks:
241236
observability:
@@ -248,17 +243,3 @@ volumes:
248243
prometheus_data:
249244
grafana_data:
250245
loki_data:
251-
bpp_client_config_volume:
252-
name: bpp_client_config_volume
253-
external: true
254-
bpp_client_schemas_volume:
255-
name: bpp_client_schemas_volume
256-
bpp_client_logs_volume:
257-
name: bpp_client_logs_volume
258-
bpp_network_config_volume:
259-
name: bpp_network_config_volume
260-
external: true
261-
bpp_network_schemas_volume:
262-
name: bpp_network_schemas_volume
263-
bpp_network_logs_volume:
264-
name: bpp_network_logs_volume

pkg/log/log_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,11 @@ func TestError(t *testing.T) {
250250
func TestRequest(t *testing.T) {
251251
logPath := setupLogger(t, InfoLevel)
252252
ctx := context.WithValue(context.Background(), requestID, "abc-123")
253-
ctx = context.WithValue(context.Background(), transaction_id, "transaction-id-123-")
254-
ctx = context.WithValue(context.Background(), message_id, "message-id-123")
255-
ctx = context.WithValue(context.Background(), subscriber_id, "subscriber-id-123")
256-
ctx = context.WithValue(context.Background(), module_id, "module-id-123")
257-
ctx = context.WithValue(context.Background(), parent_id, "parent-id-123")
253+
ctx = context.WithValue(ctx, transaction_id, "transaction-id-123-")
254+
ctx = context.WithValue(ctx, message_id, "message-id-123")
255+
ctx = context.WithValue(ctx, subscriber_id, "subscriber-id-123")
256+
ctx = context.WithValue(ctx, module_id, "module-id-123")
257+
ctx = context.WithValue(ctx, parent_id, "parent-id-123")
258258

259259
req, _ := http.NewRequest("POST", "/api/test", bytes.NewBuffer([]byte(`{"key":"value"}`)))
260260
req.RemoteAddr = "127.0.0.1:8080"

pkg/model/model.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const (
5858
ContextKeyParentID ContextKey = "parent_id"
5959

6060
// ContextKeyRemoteID is the context key for the caller who is calling the bap/bpp
61-
ContextKeyRemoteID ContextKey = "caller_id"
61+
ContextKeyRemoteID ContextKey = "remote_id"
6262
)
6363

6464
var contextKeys = map[string]ContextKey{
@@ -67,7 +67,7 @@ var contextKeys = map[string]ContextKey{
6767
"subscriber_id": ContextKeySubscriberID,
6868
"module_id": ContextKeyModuleID,
6969
"parent_id": ContextKeyParentID,
70-
"caller_id": ContextKeyRemoteID,
70+
"remote_id": ContextKeyRemoteID,
7171
}
7272

7373
// ParseContextKey converts a string into a valid ContextKey.

pkg/plugin/implementation/cache/cache.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,22 +114,25 @@ func New(ctx context.Context, cfg *Config) (*Cache, func() error, error) {
114114

115115
// Get retrieves the value for the specified key from Redis.
116116
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
117-
result, err := c.Client.Get(ctx, key).Result()
117+
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
118+
spanCtx, span := tracer.Start(ctx, "redis_get")
119+
defer span.End()
120+
result, err := c.Client.Get(spanCtx, key).Result()
118121
if c.metrics != nil {
119122
attrs := []attribute.KeyValue{
120123
telemetry.AttrOperation.String("get"),
121124
}
122125
switch {
123126
case err == redis.Nil:
124-
c.metrics.CacheMissesTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
125-
c.metrics.CacheOperationsTotal.Add(ctx, 1,
127+
c.metrics.CacheMissesTotal.Add(spanCtx, 1, metric.WithAttributes(attrs...))
128+
c.metrics.CacheOperationsTotal.Add(spanCtx, 1,
126129
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("miss"))...))
127130
case err != nil:
128-
c.metrics.CacheOperationsTotal.Add(ctx, 1,
131+
c.metrics.CacheOperationsTotal.Add(spanCtx, 1,
129132
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("error"))...))
130133
default:
131-
c.metrics.CacheHitsTotal.Add(ctx, 1, metric.WithAttributes(attrs...))
132-
c.metrics.CacheOperationsTotal.Add(ctx, 1,
134+
c.metrics.CacheHitsTotal.Add(spanCtx, 1, metric.WithAttributes(attrs...))
135+
c.metrics.CacheOperationsTotal.Add(spanCtx, 1,
133136
metric.WithAttributes(append(attrs, telemetry.AttrStatus.String("hit"))...))
134137
}
135138
}
@@ -149,8 +152,11 @@ func (c *Cache) Set(ctx context.Context, key, value string, ttl time.Duration) e
149152

150153
// Delete removes the specified key from Redis.
151154
func (c *Cache) Delete(ctx context.Context, key string) error {
152-
err := c.Client.Del(ctx, key).Err()
153-
c.recordOperation(ctx, "delete", err)
155+
tracer := otel.Tracer(telemetry.ScopeName, trace.WithInstrumentationVersion(telemetry.ScopeVersion))
156+
spanCtx, span := tracer.Start(ctx, "redis_delete")
157+
defer span.End()
158+
err := c.Client.Del(spanCtx, key).Err()
159+
c.recordOperation(spanCtx, "delete", err)
154160
return err
155161
}
156162

0 commit comments

Comments
 (0)