diff --git a/.tool-versions b/.tool-versions index 39ad7a35e3..06fc452757 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,4 +1,4 @@ -golang 1.24.1 +golang 1.24.2 protoc 25.1 protoc-gen-go-grpc 1.3.0 golangci-lint 1.64.8 diff --git a/pkg/beholder/common.go b/pkg/beholder/common.go new file mode 100644 index 0000000000..1d192dee59 --- /dev/null +++ b/pkg/beholder/common.go @@ -0,0 +1,96 @@ +package beholder + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// MetricInfo is a struct for metrics information +type MetricsInfoCapBasic struct { + // common + count MetricInfo + capTimestampStart MetricInfo + capTimestampEmit MetricInfo + capDuration MetricInfo // ts.emit - ts.start +} + +// NewMetricsInfoCapBasic creates a new MetricsInfoCapBasic using the provided event/metric information +func NewMetricsInfoCapBasic(metricPrefix, eventRef string) MetricsInfoCapBasic { + return MetricsInfoCapBasic{ + count: MetricInfo{ + Name: fmt.Sprintf("%s_count", metricPrefix), + Unit: "", + Description: fmt.Sprintf("The count of message: '%s' emitted", eventRef), + }, + capTimestampStart: MetricInfo{ + Name: fmt.Sprintf("%s_cap_timestamp_start", metricPrefix), + Unit: "ms", + Description: fmt.Sprintf("The timestamp (local) at capability exec start that resulted in message: '%s' emit", eventRef), + }, + capTimestampEmit: MetricInfo{ + Name: fmt.Sprintf("%s_cap_timestamp_emit", metricPrefix), + Unit: "ms", + Description: fmt.Sprintf("The timestamp (local) at message: '%s' emit", eventRef), + }, + capDuration: MetricInfo{ + Name: fmt.Sprintf("%s_cap_duration", metricPrefix), + Unit: "ms", + Description: fmt.Sprintf("The duration (local) since capability exec start to message: '%s' emit", eventRef), + }, + } +} + +// MetricsCapBasic is a base struct for metrics related to a capability +type MetricsCapBasic struct { + count metric.Int64Counter + capTimestampStart metric.Int64Gauge + capTimestampEmit metric.Int64Gauge + capDuration metric.Int64Gauge // ts.emit - ts.start +} + +// NewMetricsCapBasic creates a new MetricsCapBasic using the provided MetricsInfoCapBasic +func NewMetricsCapBasic(info MetricsInfoCapBasic) (MetricsCapBasic, error) { + meter := GetMeter() + set := MetricsCapBasic{} + + // Create new metrics + var err error + + set.count, err = info.count.NewInt64Counter(meter) + if err != nil { + return set, fmt.Errorf("failed to create new counter: %w", err) + } + + set.capTimestampStart, err = info.capTimestampStart.NewInt64Gauge(meter) + if err != nil { + return set, fmt.Errorf("failed to create new gauge: %w", err) + } + + set.capTimestampEmit, err = info.capTimestampEmit.NewInt64Gauge(meter) + if err != nil { + return set, fmt.Errorf("failed to create new gauge: %w", err) + } + + set.capDuration, err = info.capDuration.NewInt64Gauge(meter) + if err != nil { + return set, fmt.Errorf("failed to create new gauge: %w", err) + } + + return set, nil +} + +func (m *MetricsCapBasic) RecordEmit(ctx context.Context, start, emit uint64, attrKVs ...attribute.KeyValue) { + // Define attributes + attrs := metric.WithAttributes(attrKVs...) + + // Count events + m.count.Add(ctx, 1, attrs) + + // Timestamp events + m.capTimestampStart.Record(ctx, int64(start), attrs) + m.capTimestampEmit.Record(ctx, int64(emit), attrs) + m.capDuration.Record(ctx, int64(emit-start), attrs) +} diff --git a/pkg/beholder/info.go b/pkg/beholder/info.go new file mode 100644 index 0000000000..264b6d7ab7 --- /dev/null +++ b/pkg/beholder/info.go @@ -0,0 +1,39 @@ +package beholder + +import ( + "go.opentelemetry.io/otel/metric" +) + +// Define a new struct for metrics information +type MetricInfo struct { + Name string + Unit string + Description string +} + +// NewInt64Counter creates a new Int64Counter metric +func (m MetricInfo) NewInt64Counter(meter metric.Meter) (metric.Int64Counter, error) { + return meter.Int64Counter( + m.Name, + metric.WithUnit(m.Unit), + metric.WithDescription(m.Description), + ) +} + +// NewInt64Gauge creates a new Int64Gauge metric +func (m MetricInfo) NewInt64Gauge(meter metric.Meter) (metric.Int64Gauge, error) { + return meter.Int64Gauge( + m.Name, + metric.WithUnit(m.Unit), + metric.WithDescription(m.Description), + ) +} + +// NewFloat64Gauge creates a new Float64Gauge metric +func (m MetricInfo) NewFloat64Gauge(meter metric.Meter) (metric.Float64Gauge, error) { + return meter.Float64Gauge( + m.Name, + metric.WithUnit(m.Unit), + metric.WithDescription(m.Description), + ) +} diff --git a/pkg/beholder/metadata.go b/pkg/beholder/metadata.go new file mode 100644 index 0000000000..ac8334c5d6 --- /dev/null +++ b/pkg/beholder/metadata.go @@ -0,0 +1,98 @@ +package beholder + +import ( + "encoding/hex" + + "go.opentelemetry.io/otel/attribute" +) + +const ( + // WorkflowExecutionIDShortLen is the length of the short version of the WorkflowExecutionId (label) + WorkflowExecutionIDShortLen = 3 // first 3 characters, 16^3 = 4.096 possibilities (mid-high cardinality) +) + +// TODO: Refactor as a proto referenced from the other proto files (telemetry messages) +type ExecutionMetadata struct { + // Execution Context - Source + SourceId string + // Execution Context - Chain + ChainFamilyName string + ChainId string + NetworkName string + NetworkNameFull string + // Execution Context - Workflow (capabilities.RequestMetadata) + WorkflowId string + WorkflowOwner string + WorkflowExecutionId string + WorkflowName string + WorkflowDonId uint32 + WorkflowDonConfigVersion uint32 + ReferenceId string + // Execution Context - Capability + CapabilityType string + CapabilityId string + CapabilityTimestampStart uint32 + CapabilityTimestampEmit uint32 +} + +// Attributes returns common attributes used for metrics +func (m ExecutionMetadata) Attributes() []attribute.KeyValue { + // Decode workflow name attribute for output + workflowName := m.decodeWorkflowName() + + return []attribute.KeyValue{ + // Execution Context - Source + attribute.String("source_id", ValOrUnknown(m.SourceId)), + // Execution Context - Chain + attribute.String("chain_family_name", ValOrUnknown(m.ChainFamilyName)), + attribute.String("chain_id", ValOrUnknown(m.ChainId)), + attribute.String("network_name", ValOrUnknown(m.NetworkName)), + attribute.String("network_name_full", ValOrUnknown(m.NetworkNameFull)), + // Execution Context - Workflow (capabilities.RequestMetadata) + attribute.String("workflow_id", ValOrUnknown(m.WorkflowId)), + attribute.String("workflow_owner", ValOrUnknown(m.WorkflowOwner)), + // Notice: We lower the cardinality on the WorkflowExecutionId so it can be used by metrics + // This label has good chances to be unique per workflow, in a reasonable bounded time window + // TODO: enable this when sufficiently tested (PromQL queries like alerts might need to change if this is used) + // attribute.String("workflow_execution_id_short", ValShortOrUnknown(m.WorkflowExecutionId, WorkflowExecutionIDShortLen)), + attribute.String("workflow_name", ValOrUnknown(workflowName)), + attribute.Int64("workflow_don_id", int64(m.WorkflowDonId)), + attribute.Int64("workflow_don_config_version", int64(m.WorkflowDonConfigVersion)), + attribute.String("reference_id", ValOrUnknown(m.ReferenceId)), + // Execution Context - Capability + attribute.String("capability_type", ValOrUnknown(m.CapabilityType)), + attribute.String("capability_id", ValOrUnknown(m.CapabilityId)), + // Notice: we don't include the timestamps here (high cardinality) + } +} + +// decodeWorkflowName decodes the workflow name from hex string to raw string (underlying, output) +func (m ExecutionMetadata) decodeWorkflowName() string { + bytes, err := hex.DecodeString(m.WorkflowName) + if err != nil { + // This should never happen + bytes = []byte("unknown-decode-error") + } + return string(bytes) +} + +// This is needed to avoid issues during exporting OTel metrics to Prometheus +// For more details see https://smartcontract-it.atlassian.net/browse/INFOPLAT-1349 +// ValOrUnknown returns the value if it is not empty, otherwise it returns "unknown" +func ValOrUnknown(val string) string { + if val == "" { + return "unknown" + } + return val +} + +// ValShortOrUnknown returns the short len value if not empty or available, otherwise it returns "unknown" +func ValShortOrUnknown(val string, _len int) string { + if val == "" || _len <= 0 { + return "unknown" + } + if _len > len(val) { + return val + } + return val[:_len] +} diff --git a/pkg/utils/retry/retry.go b/pkg/utils/retry/retry.go new file mode 100644 index 0000000000..3aab611a44 --- /dev/null +++ b/pkg/utils/retry/retry.go @@ -0,0 +1,65 @@ +package retry + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jpillora/backoff" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type ctxKey string + +// ctxKeyID is the context key for tracing ID +const ctxKeyID ctxKey = "retryID" + +func CtxWithID(ctx context.Context, retryID string) context.Context { + return context.WithValue(ctx, ctxKeyID, retryID) +} + +// Exponential backoff (default) is used to handle retries with increasing wait times in case of errors +var BackoffStrategyDefault = backoff.Backoff{ + Min: 100 * time.Millisecond, + Max: 3 * time.Second, + Factor: 2, +} + +// WithStrategy applies a retry strategy to a given function. +func WithStrategy[R any](ctx context.Context, lggr logger.Logger, strategy backoff.Backoff, fn func(ctx context.Context) (R, error)) (R, error) { + // Generate a new tracing ID if not present, used to track retries + retryID := ctx.Value(ctxKeyID) + if retryID == nil { + retryID = uuid.New().String() + // Add the generated tracing ID to the context (as it was not already present) + ctx = context.WithValue(ctx, ctxKeyID, retryID) + } + + // Track the number of retries + numRetries := int(strategy.Attempt()) + for { + result, err := fn(ctx) + if err == nil { + return result, nil + } + + wait := strategy.Duration() + message := fmt.Sprintf("Failed to execute function, retrying in %s ...", wait) + lggr.Warnw(message, "wait", wait, "numRetries", numRetries, "retryID", retryID, "err", err) + + select { + case <-ctx.Done(): + return result, fmt.Errorf("context done while executing function {retryID=%s, numRetries=%d}: %w", retryID, numRetries, ctx.Err()) + case <-time.After(wait): + numRetries++ + // Continue with the next retry + } + } +} + +// With applies a default retry strategy to a given function. +func With[R any](ctx context.Context, lggr logger.Logger, fn func(ctx context.Context) (R, error)) (R, error) { + return WithStrategy(ctx, lggr, BackoffStrategyDefault, fn) +} diff --git a/pkg/utils/retry/retry_test.go b/pkg/utils/retry/retry_test.go new file mode 100644 index 0000000000..8529730832 --- /dev/null +++ b/pkg/utils/retry/retry_test.go @@ -0,0 +1,122 @@ +package retry + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/stretchr/testify/require" +) + +// exampleFunc is a function type used for testing retry strategies +type exampleFunc func(ctx context.Context) (string, error) + +func TestWithRetry(t *testing.T) { + lggr := logger.Test(t) + + tests := []struct { + name string + fn exampleFunc + expected string + errMsg string + timeout time.Duration + }{ + { + name: "successful function", + fn: func(ctx context.Context) (string, error) { + return "success", nil + }, + expected: "success", + timeout: 100 * time.Millisecond, + }, + { + name: "always failing function", + fn: func(ctx context.Context) (string, error) { + return "", errors.New("permanent error") + }, + errMsg: "context done while executing function", + timeout: 100 * time.Millisecond, + }, + { + name: "eventually successful function", + fn: func() exampleFunc { + attempts := 0 + return func(ctx context.Context) (string, error) { + attempts++ + if attempts < 3 { + return "", errors.New("temporary error") + } + return "eventual success", nil + } + }(), + expected: "eventual success", + timeout: 500 * time.Millisecond, + }, + { + name: "eventually successful function (fail - exceeding context timeout)", + fn: func() exampleFunc { + attempts := 0 + return func(ctx context.Context) (string, error) { + attempts++ + if attempts < 3 { + return "", errors.New("temporary error") + } + return "eventual success", nil + } + }(), + errMsg: "context done while executing function", + timeout: 100 * time.Millisecond, + }, + { + name: "eventually (in time) successful function", + fn: func() exampleFunc { + // Start timer, successful after 400ms + timeout := 400 * time.Millisecond + start := time.Now() + return func(ctx context.Context) (string, error) { + if time.Since(start) < timeout { + return "", errors.New("temporary error") + } + return "eventual success", nil + } + }(), + expected: "eventual success", + timeout: 1 * time.Second, + }, + { + name: "eventually (in time) successful function (fail - exceeding context timeout)", + fn: func() exampleFunc { + // Start timer, successful after 4s + timeout := 4 * time.Second + start := time.Now() + return func(ctx context.Context) (string, error) { + if time.Since(start) < timeout { + return "", errors.New("temporary error") + } + return "eventual success", nil + } + }(), + errMsg: "context done while executing function", + timeout: 1 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, tt.timeout) + defer cancel() + + result, err := With(ctx, lggr, tt.fn) + if tt.errMsg != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.errMsg) + } else { + require.NoError(t, err) + require.Equal(t, tt.expected, result) + } + }) + } +}