Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
golang 1.24.1
golang 1.24.2
protoc 25.1
protoc-gen-go-grpc 1.3.0
golangci-lint 1.64.8
Expand Down
96 changes: 96 additions & 0 deletions pkg/beholder/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package beholder

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// MetricInfo is a struct for metrics information
type MetricsInfoCapBasic struct {
// common
count MetricInfo
capTimestampStart MetricInfo
capTimestampEmit MetricInfo
capDuration MetricInfo // ts.emit - ts.start
}

// NewMetricsInfoCapBasic creates a new MetricsInfoCapBasic using the provided event/metric information
func NewMetricsInfoCapBasic(metricPrefix, eventRef string) MetricsInfoCapBasic {
return MetricsInfoCapBasic{
count: MetricInfo{
Name: fmt.Sprintf("%s_count", metricPrefix),
Unit: "",
Description: fmt.Sprintf("The count of message: '%s' emitted", eventRef),
},
capTimestampStart: MetricInfo{
Name: fmt.Sprintf("%s_cap_timestamp_start", metricPrefix),
Unit: "ms",
Description: fmt.Sprintf("The timestamp (local) at capability exec start that resulted in message: '%s' emit", eventRef),
},
capTimestampEmit: MetricInfo{
Name: fmt.Sprintf("%s_cap_timestamp_emit", metricPrefix),
Unit: "ms",
Description: fmt.Sprintf("The timestamp (local) at message: '%s' emit", eventRef),
},
capDuration: MetricInfo{
Name: fmt.Sprintf("%s_cap_duration", metricPrefix),
Unit: "ms",
Description: fmt.Sprintf("The duration (local) since capability exec start to message: '%s' emit", eventRef),
},
}
}

// MetricsCapBasic is a base struct for metrics related to a capability
type MetricsCapBasic struct {
count metric.Int64Counter
capTimestampStart metric.Int64Gauge
capTimestampEmit metric.Int64Gauge
capDuration metric.Int64Gauge // ts.emit - ts.start
}

// NewMetricsCapBasic creates a new MetricsCapBasic using the provided MetricsInfoCapBasic
func NewMetricsCapBasic(info MetricsInfoCapBasic) (MetricsCapBasic, error) {
meter := GetMeter()
set := MetricsCapBasic{}

// Create new metrics
var err error

set.count, err = info.count.NewInt64Counter(meter)
if err != nil {
return set, fmt.Errorf("failed to create new counter: %w", err)
}

set.capTimestampStart, err = info.capTimestampStart.NewInt64Gauge(meter)
if err != nil {
return set, fmt.Errorf("failed to create new gauge: %w", err)
}

set.capTimestampEmit, err = info.capTimestampEmit.NewInt64Gauge(meter)
if err != nil {
return set, fmt.Errorf("failed to create new gauge: %w", err)
}

set.capDuration, err = info.capDuration.NewInt64Gauge(meter)
if err != nil {
return set, fmt.Errorf("failed to create new gauge: %w", err)
}

return set, nil
}

func (m *MetricsCapBasic) RecordEmit(ctx context.Context, start, emit uint64, attrKVs ...attribute.KeyValue) {
// Define attributes
attrs := metric.WithAttributes(attrKVs...)

// Count events
m.count.Add(ctx, 1, attrs)

// Timestamp events
m.capTimestampStart.Record(ctx, int64(start), attrs)
m.capTimestampEmit.Record(ctx, int64(emit), attrs)
m.capDuration.Record(ctx, int64(emit-start), attrs)
}
39 changes: 39 additions & 0 deletions pkg/beholder/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package beholder

import (
"go.opentelemetry.io/otel/metric"
)

// Define a new struct for metrics information
type MetricInfo struct {
Name string
Unit string
Description string
}

// NewInt64Counter creates a new Int64Counter metric
func (m MetricInfo) NewInt64Counter(meter metric.Meter) (metric.Int64Counter, error) {
return meter.Int64Counter(
m.Name,
metric.WithUnit(m.Unit),
metric.WithDescription(m.Description),
)
}

// NewInt64Gauge creates a new Int64Gauge metric
func (m MetricInfo) NewInt64Gauge(meter metric.Meter) (metric.Int64Gauge, error) {
return meter.Int64Gauge(
m.Name,
metric.WithUnit(m.Unit),
metric.WithDescription(m.Description),
)
}

// NewFloat64Gauge creates a new Float64Gauge metric
func (m MetricInfo) NewFloat64Gauge(meter metric.Meter) (metric.Float64Gauge, error) {
return meter.Float64Gauge(
m.Name,
metric.WithUnit(m.Unit),
metric.WithDescription(m.Description),
)
}
98 changes: 98 additions & 0 deletions pkg/beholder/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package beholder

import (
"encoding/hex"

"go.opentelemetry.io/otel/attribute"
)

const (
// WorkflowExecutionIDShortLen is the length of the short version of the WorkflowExecutionId (label)
WorkflowExecutionIDShortLen = 3 // first 3 characters, 16^3 = 4.096 possibilities (mid-high cardinality)
)

// TODO: Refactor as a proto referenced from the other proto files (telemetry messages)
type ExecutionMetadata struct {
// Execution Context - Source
SourceId string
// Execution Context - Chain
ChainFamilyName string
ChainId string
NetworkName string
NetworkNameFull string
// Execution Context - Workflow (capabilities.RequestMetadata)
WorkflowId string
WorkflowOwner string
WorkflowExecutionId string
WorkflowName string
WorkflowDonId uint32
WorkflowDonConfigVersion uint32
ReferenceId string
// Execution Context - Capability
CapabilityType string
CapabilityId string
CapabilityTimestampStart uint32
CapabilityTimestampEmit uint32
}

// Attributes returns common attributes used for metrics
func (m ExecutionMetadata) Attributes() []attribute.KeyValue {
// Decode workflow name attribute for output
workflowName := m.decodeWorkflowName()

return []attribute.KeyValue{
// Execution Context - Source
attribute.String("source_id", ValOrUnknown(m.SourceId)),
// Execution Context - Chain
attribute.String("chain_family_name", ValOrUnknown(m.ChainFamilyName)),
attribute.String("chain_id", ValOrUnknown(m.ChainId)),
attribute.String("network_name", ValOrUnknown(m.NetworkName)),
attribute.String("network_name_full", ValOrUnknown(m.NetworkNameFull)),
// Execution Context - Workflow (capabilities.RequestMetadata)
attribute.String("workflow_id", ValOrUnknown(m.WorkflowId)),
attribute.String("workflow_owner", ValOrUnknown(m.WorkflowOwner)),
// Notice: We lower the cardinality on the WorkflowExecutionId so it can be used by metrics
// This label has good chances to be unique per workflow, in a reasonable bounded time window
// TODO: enable this when sufficiently tested (PromQL queries like alerts might need to change if this is used)
// attribute.String("workflow_execution_id_short", ValShortOrUnknown(m.WorkflowExecutionId, WorkflowExecutionIDShortLen)),
attribute.String("workflow_name", ValOrUnknown(workflowName)),
attribute.Int64("workflow_don_id", int64(m.WorkflowDonId)),
attribute.Int64("workflow_don_config_version", int64(m.WorkflowDonConfigVersion)),
attribute.String("reference_id", ValOrUnknown(m.ReferenceId)),
// Execution Context - Capability
attribute.String("capability_type", ValOrUnknown(m.CapabilityType)),
attribute.String("capability_id", ValOrUnknown(m.CapabilityId)),
// Notice: we don't include the timestamps here (high cardinality)
}
}

// decodeWorkflowName decodes the workflow name from hex string to raw string (underlying, output)
func (m ExecutionMetadata) decodeWorkflowName() string {
bytes, err := hex.DecodeString(m.WorkflowName)
if err != nil {
// This should never happen
bytes = []byte("unknown-decode-error")
}
return string(bytes)
}

// This is needed to avoid issues during exporting OTel metrics to Prometheus
// For more details see https://smartcontract-it.atlassian.net/browse/INFOPLAT-1349
// ValOrUnknown returns the value if it is not empty, otherwise it returns "unknown"
func ValOrUnknown(val string) string {
if val == "" {
return "unknown"
}
return val
}

// ValShortOrUnknown returns the short len value if not empty or available, otherwise it returns "unknown"
func ValShortOrUnknown(val string, _len int) string {
if val == "" || _len <= 0 {
return "unknown"
}
if _len > len(val) {
return val
}
return val[:_len]
}
65 changes: 65 additions & 0 deletions pkg/utils/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package retry

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/jpillora/backoff"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

type ctxKey string

// ctxKeyID is the context key for tracing ID
const ctxKeyID ctxKey = "retryID"

func CtxWithID(ctx context.Context, retryID string) context.Context {
return context.WithValue(ctx, ctxKeyID, retryID)
}

// Exponential backoff (default) is used to handle retries with increasing wait times in case of errors
var BackoffStrategyDefault = backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 3 * time.Second,
Factor: 2,
}

// WithStrategy applies a retry strategy to a given function.
func WithStrategy[R any](ctx context.Context, lggr logger.Logger, strategy backoff.Backoff, fn func(ctx context.Context) (R, error)) (R, error) {
// Generate a new tracing ID if not present, used to track retries
retryID := ctx.Value(ctxKeyID)
if retryID == nil {
retryID = uuid.New().String()
// Add the generated tracing ID to the context (as it was not already present)
ctx = context.WithValue(ctx, ctxKeyID, retryID)
}

// Track the number of retries
numRetries := int(strategy.Attempt())
for {
result, err := fn(ctx)
if err == nil {
return result, nil
}

wait := strategy.Duration()
message := fmt.Sprintf("Failed to execute function, retrying in %s ...", wait)
lggr.Warnw(message, "wait", wait, "numRetries", numRetries, "retryID", retryID, "err", err)

select {
case <-ctx.Done():
return result, fmt.Errorf("context done while executing function {retryID=%s, numRetries=%d}: %w", retryID, numRetries, ctx.Err())
case <-time.After(wait):
numRetries++
// Continue with the next retry
}
}
}

// With applies a default retry strategy to a given function.
func With[R any](ctx context.Context, lggr logger.Logger, fn func(ctx context.Context) (R, error)) (R, error) {
return WithStrategy(ctx, lggr, BackoffStrategyDefault, fn)
}
Loading
Loading