diff --git a/pkg/beholder/attributes.go b/pkg/beholder/attributes.go new file mode 100644 index 0000000000..b37e17411f --- /dev/null +++ b/pkg/beholder/attributes.go @@ -0,0 +1,8 @@ +package beholder + +const ( + AttrKeyDataSchema = "beholder_data_schema" + AttrKeyEntity = "beholder_entity" + AttrKeyDomain = "beholder_domain" + AttrKeyDataType = "beholder_data_type" +) diff --git a/pkg/beholder/chip_ingress_emitter.go b/pkg/beholder/chip_ingress_emitter.go index 5461ac9061..0ca3e991aa 100644 --- a/pkg/beholder/chip_ingress_emitter.go +++ b/pkg/beholder/chip_ingress_emitter.go @@ -51,12 +51,12 @@ func ExtractSourceAndType(attrKVs ...any) (string, string, error) { for key, value := range attributes { // Retrieve source and type using either ChIP or legacy attribute names, prioritizing source/type - if key == "source" || (key == "beholder_domain" && sourceDomain == "") { + if key == "source" || (key == AttrKeyDomain && sourceDomain == "") { if val, ok := value.(string); ok { sourceDomain = val } } - if key == "type" || (key == "beholder_entity" && entityType == "") { + if key == "type" || (key == AttrKeyEntity && entityType == "") { if val, ok := value.(string); ok { entityType = val } diff --git a/pkg/beholder/chip_ingress_emitter_test.go b/pkg/beholder/chip_ingress_emitter_test.go index bd214dca30..9798b6fff0 100644 --- a/pkg/beholder/chip_ingress_emitter_test.go +++ b/pkg/beholder/chip_ingress_emitter_test.go @@ -42,7 +42,7 @@ func TestChipIngressEmit(t *testing.T) { emitter, err := beholder.NewChipIngressEmitter(clientMock) require.NoError(t, err) - err = emitter.Emit(t.Context(), body, "beholder_domain", domain, "beholder_entity", entity) + err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity) require.NoError(t, err) clientMock.AssertExpectations(t) @@ -59,7 +59,7 @@ func TestChipIngressEmit(t *testing.T) { emitter, err := beholder.NewChipIngressEmitter(clientMock) require.NoError(t, err) - err = emitter.Emit(t.Context(), body, "beholder_domain", domain) + err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain) assert.Error(t, err) }) @@ -74,7 +74,7 @@ func TestChipIngressEmit(t *testing.T) { emitter, err := beholder.NewChipIngressEmitter(clientMock) require.NoError(t, err) - err = emitter.Emit(t.Context(), body, "beholder_domain", domain, "beholder_entity", entity) + err = emitter.Emit(t.Context(), body, beholder.AttrKeyDomain, domain, beholder.AttrKeyEntity, entity) require.Error(t, err) clientMock.AssertExpectations(t) @@ -92,7 +92,7 @@ func TestExtractSourceAndType(t *testing.T) { }{ { name: "happy path - domain and entity exist", - attrs: []any{map[string]any{"beholder_domain": "test-domain", "beholder_entity": "test-entity"}}, + attrs: []any{map[string]any{beholder.AttrKeyDomain: "test-domain", beholder.AttrKeyEntity: "test-entity"}}, wantDomain: "test-domain", wantEntity: "test-entity", wantErr: false, @@ -106,14 +106,14 @@ func TestExtractSourceAndType(t *testing.T) { }, { name: "happy path - domain and entity exist - uses source/type", - attrs: []any{map[string]any{"source": "other-domain", "beholder_domain": "test-domain", "beholder_entity": "test-entity", "type": "other-entity"}}, + attrs: []any{map[string]any{"source": "other-domain", beholder.AttrKeyDomain: "test-domain", beholder.AttrKeyEntity: "test-entity", "type": "other-entity"}}, wantDomain: "other-domain", wantEntity: "other-entity", wantErr: false, }, { name: "missing domain/source", - attrs: []any{map[string]any{"beholder_entity": "test-entity"}}, + attrs: []any{map[string]any{beholder.AttrKeyEntity: "test-entity"}}, wantDomain: "", wantEntity: "", wantErr: true, @@ -121,7 +121,7 @@ func TestExtractSourceAndType(t *testing.T) { }, { name: "missing entity/type", - attrs: []any{map[string]any{"beholder_domain": "test-domain"}}, + attrs: []any{map[string]any{beholder.AttrKeyDomain: "test-domain"}}, wantDomain: "", wantEntity: "", wantErr: true, @@ -146,10 +146,10 @@ func TestExtractSourceAndType(t *testing.T) { { name: "domain and entity with additional attributes", attrs: []any{map[string]any{ - "other_key": "other_value", - "beholder_domain": "test-domain", - "beholder_entity": "test-entity", - "something_else": 123, + "other_key": "other_value", + beholder.AttrKeyDomain: "test-domain", + beholder.AttrKeyEntity: "test-entity", + "something_else": 123, }}, wantDomain: "test-domain", wantEntity: "test-entity", @@ -158,9 +158,9 @@ func TestExtractSourceAndType(t *testing.T) { { name: "non-string keys ignored", attrs: []any{map[string]any{ - "other_value": "value", - "beholder_domain": "test-domain", - "beholder_entity": "test-entity", + "other_value": "value", + beholder.AttrKeyDomain: "test-domain", + beholder.AttrKeyEntity: "test-entity", }, 123, "other_key"}, wantDomain: "test-domain", wantEntity: "test-entity", @@ -169,9 +169,9 @@ func TestExtractSourceAndType(t *testing.T) { { name: "non-string values handled", attrs: []any{map[string]any{ - "other_key": 123, - "beholder_domain": "test-domain", - "beholder_entity": "test-entity", + "other_key": 123, + beholder.AttrKeyDomain: "test-domain", + beholder.AttrKeyEntity: "test-entity", }}, wantDomain: "test-domain", wantEntity: "test-entity", diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index c0ee09aa4e..2a331cf7c8 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -135,7 +135,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } loggerAttributes := []attribute.KeyValue{ - attribute.String("beholder_data_type", "zap_log_message"), + attribute.String(AttrKeyDataType, "zap_log_message"), } loggerResource, err := sdkresource.Merge( sdkresource.NewSchemaless(loggerAttributes...), @@ -195,7 +195,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } messageAttributes := []attribute.KeyValue{ - attribute.String("beholder_data_type", "custom_message"), + attribute.String(AttrKeyDataType, "custom_message"), } messageLoggerResource, err := sdkresource.Merge( sdkresource.NewSchemaless(messageAttributes...), diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index b27aac9eaa..9bd52fbd4c 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -43,18 +43,18 @@ func (m *MockExporter) ForceFlush(ctx context.Context) error { func TestClient(t *testing.T) { defaultCustomAttributes := func() map[string]any { return map[string]any{ - "int_key_1": 123, - "int64_key_1": int64(123), - "int32_key_1": int32(123), - "str_key_1": "str_val_1", - "bool_key_1": true, - "float_key_1": 123.456, - "byte_key_1": []byte("byte_val_1"), - "str_slice_key_1": []string{"str_val_1", "str_val_2"}, - "nil_key_1": nil, - "beholder_domain": "TestDomain", // Required field - "beholder_entity": "TestEntity", // Required field - "beholder_data_schema": "/schemas/ids/1001", // Required field, URI + "int_key_1": 123, + "int64_key_1": int64(123), + "int32_key_1": int32(123), + "str_key_1": "str_val_1", + "bool_key_1": true, + "float_key_1": 123.456, + "byte_key_1": []byte("byte_val_1"), + "str_slice_key_1": []string{"str_val_1", "str_val_2"}, + "nil_key_1": nil, + beholder.AttrKeyDomain: "TestDomain", // Required field + beholder.AttrKeyEntity: "TestEntity", // Required field + beholder.AttrKeyDataSchema: "/schemas/ids/1001", // Required field, URI } } defaultMessageBody := []byte("body bytes") diff --git a/pkg/beholder/example_test.go b/pkg/beholder/example_test.go index 2045c3bb88..ddb9c6bdc6 100644 --- a/pkg/beholder/example_test.go +++ b/pkg/beholder/example_test.go @@ -44,10 +44,10 @@ func ExampleNewClient() { fmt.Println("Emit custom messages") for range 10 { err := beholder.GetEmitter().Emit(context.Background(), payloadBytes, - "beholder_data_schema", "/custom-message/versions/1", // required - "beholder_domain", "ExampleDomain", // required - "beholder_entity", "ExampleEntity", // required - "beholder_data_type", "custom_message", + beholder.AttrKeyDataSchema, "/custom-message/versions/1", // required + beholder.AttrKeyDomain, "ExampleDomain", // required + beholder.AttrKeyEntity, "ExampleEntity", // required + beholder.AttrKeyDataType, "custom_message", "foo", "bar", ) if err != nil { @@ -106,9 +106,9 @@ func ExampleNewNoopClient() { fmt.Println("Emitting custom message via noop otel client") err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"), - "beholder_data_schema", "/custom-message/versions/1", // required - "beholder_domain", "ExampleDomain", // required - "beholder_entity", "ExampleEntity", // required + beholder.AttrKeyDataSchema, "/custom-message/versions/1", // required + beholder.AttrKeyDomain, "ExampleDomain", // required + beholder.AttrKeyEntity, "ExampleEntity", // required ) if err != nil { log.Printf("Error emitting message: %v", err) diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 3c80ac0615..6af8d13a43 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -100,7 +100,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) } loggerAttributes := []attribute.KeyValue{ - attribute.String("beholder_data_type", "zap_log_message"), + attribute.String(AttrKeyDataType, "zap_log_message"), } loggerResource, err := sdkresource.Merge( sdkresource.NewSchemaless(loggerAttributes...), @@ -160,7 +160,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } messageAttributes := []attribute.KeyValue{ - attribute.String("beholder_data_type", "custom_message"), + attribute.String(AttrKeyDataType, "custom_message"), } messageLoggerResource, err := sdkresource.Merge( sdkresource.NewSchemaless(messageAttributes...), diff --git a/pkg/beholder/message.go b/pkg/beholder/message.go index a3a5464578..52a2c02b4b 100644 --- a/pkg/beholder/message.go +++ b/pkg/beholder/message.go @@ -60,9 +60,9 @@ func (m Metadata) Attributes() Attributes { "workflow_owner_address": m.WorkflowOwnerAddress, "workflow_spec_id": m.WorkflowSpecID, "workflow_execution_id": m.WorkflowExecutionID, - "beholder_domain": m.BeholderDomain, - "beholder_entity": m.BeholderEntity, - "beholder_data_schema": m.BeholderDataSchema, + AttrKeyDomain: m.BeholderDomain, + AttrKeyEntity: m.BeholderEntity, + AttrKeyDataSchema: m.BeholderDataSchema, "capability_contract_address": m.CapabilityContractAddress, "capability_id": m.CapabilityID, "capability_version": m.CapabilityVersion, @@ -206,11 +206,11 @@ func (m *Metadata) FromAttributes(attrs Attributes) *Metadata { m.WorkflowSpecID = v.(string) case "workflow_execution_id": m.WorkflowExecutionID = v.(string) - case "beholder_domain": + case AttrKeyDomain: m.BeholderDomain = v.(string) - case "beholder_entity": + case AttrKeyEntity: m.BeholderEntity = v.(string) - case "beholder_data_schema": + case AttrKeyDataSchema: m.BeholderDataSchema = v.(string) case "capability_contract_address": m.CapabilityContractAddress = v.(string) diff --git a/pkg/beholder/message_emitter_test.go b/pkg/beholder/message_emitter_test.go index d7abdd16d3..c010f84f90 100644 --- a/pkg/beholder/message_emitter_test.go +++ b/pkg/beholder/message_emitter_test.go @@ -44,9 +44,9 @@ func TestEmitterMessageValidation(t *testing.T) { { name: "Invalid URI not causing error", attrs: beholder.Attributes{ - "beholder_domain": "TestDomain", - "beholder_entity": "TestEntity", - "beholder_data_schema": "example-schema", + beholder.AttrKeyDomain: "TestDomain", + beholder.AttrKeyEntity: "TestEntity", + beholder.AttrKeyDataSchema: "example-schema", }, exporterCalledTimes: 1, expectedError: "", @@ -54,9 +54,9 @@ func TestEmitterMessageValidation(t *testing.T) { { name: "Invalid Beholder domain (double underscore)", attrs: beholder.Attributes{ - "beholder_data_schema": "/example-schema/versions/1", - "beholder_entity": "TestEntity", - "beholder_domain": "Test__Domain", + beholder.AttrKeyDataSchema: "/example-schema/versions/1", + beholder.AttrKeyEntity: "TestEntity", + beholder.AttrKeyDomain: "Test__Domain", }, exporterCalledTimes: 0, expectedError: "'Metadata.BeholderDomain' Error:Field validation for 'BeholderDomain' failed on the 'domain_entity' tag", @@ -64,9 +64,9 @@ func TestEmitterMessageValidation(t *testing.T) { { name: "Invalid Beholder domain (special characters)", attrs: beholder.Attributes{ - "beholder_data_schema": "/example-schema/versions/1", - "beholder_entity": "TestEntity", - "beholder_domain": "TestDomain*$", + beholder.AttrKeyDataSchema: "/example-schema/versions/1", + beholder.AttrKeyEntity: "TestEntity", + beholder.AttrKeyDomain: "TestDomain*$", }, exporterCalledTimes: 0, expectedError: "'Metadata.BeholderDomain' Error:Field validation for 'BeholderDomain' failed on the 'domain_entity' tag", @@ -74,9 +74,9 @@ func TestEmitterMessageValidation(t *testing.T) { { name: "Invalid Beholder entity (double underscore)", attrs: beholder.Attributes{ - "beholder_data_schema": "/example-schema/versions/1", - "beholder_entity": "Test__Entity", - "beholder_domain": "TestDomain", + beholder.AttrKeyDataSchema: "/example-schema/versions/1", + beholder.AttrKeyEntity: "Test__Entity", + beholder.AttrKeyDomain: "TestDomain", }, exporterCalledTimes: 0, expectedError: "'Metadata.BeholderEntity' Error:Field validation for 'BeholderEntity' failed on the 'domain_entity' tag", @@ -84,9 +84,9 @@ func TestEmitterMessageValidation(t *testing.T) { { name: "Invalid Beholder entity (special characters)", attrs: beholder.Attributes{ - "beholder_data_schema": "/example-schema/versions/1", - "beholder_entity": "TestEntity*$", - "beholder_domain": "TestDomain", + beholder.AttrKeyDataSchema: "/example-schema/versions/1", + beholder.AttrKeyEntity: "TestEntity*$", + beholder.AttrKeyDomain: "TestDomain", }, exporterCalledTimes: 0, expectedError: "'Metadata.BeholderEntity' Error:Field validation for 'BeholderEntity' failed on the 'domain_entity' tag", @@ -95,9 +95,9 @@ func TestEmitterMessageValidation(t *testing.T) { name: "Valid Attributes", exporterCalledTimes: 1, attrs: beholder.Attributes{ - "beholder_domain": "TestDomain", - "beholder_entity": "TestEntity", - "beholder_data_schema": "/example-schema/versions/1", + beholder.AttrKeyDomain: "TestDomain", + beholder.AttrKeyEntity: "TestEntity", + beholder.AttrKeyDataSchema: "/example-schema/versions/1", }, expectedError: "", }, @@ -105,9 +105,9 @@ func TestEmitterMessageValidation(t *testing.T) { name: "Valid Attributes (special characters)", exporterCalledTimes: 1, attrs: beholder.Attributes{ - "beholder_domain": "Test.Domain_42-1", - "beholder_entity": "Test.Entity_42-1", - "beholder_data_schema": "/example-schema/versions/1", + beholder.AttrKeyDomain: "Test.Domain_42-1", + beholder.AttrKeyEntity: "Test.Entity_42-1", + beholder.AttrKeyDataSchema: "/example-schema/versions/1", }, expectedError: "", }, diff --git a/pkg/beholder/proto_emitter.go b/pkg/beholder/proto_emitter.go new file mode 100644 index 0000000000..6d5c8a7843 --- /dev/null +++ b/pkg/beholder/proto_emitter.go @@ -0,0 +1,96 @@ +//nolint:revive,staticcheck // disable revive, staticcheck +package beholder + +import ( + "context" + + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +const ( + // Helper keys to avoid duplicating attributes + CtxKeySkipAppendAttrs = "skip_append_attrs" +) + +// BeholderClient is a Beholder client extension with a custom ProtoEmitter +type BeholderClient struct { + *Client + ProtoEmitter ProtoEmitter +} + +// ProtoEmitter is an interface for emitting protobuf messages +type ProtoEmitter interface { + // Sends message with bytes and attributes to OTel Collector + Emit(ctx context.Context, m proto.Message, attrKVs ...any) error + EmitWithLog(ctx context.Context, m proto.Message, attrKVs ...any) error +} + +// ProtoProcessor is an interface for processing emitted protobuf messages +type ProtoProcessor interface { + Process(ctx context.Context, m proto.Message, attrKVs ...any) error +} + +func NewProtoEmitter(lggr logger.Logger, client *Client, schemaBasePath string) ProtoEmitter { + return &protoEmitter{lggr, client, schemaBasePath} +} + +// protoEmitter is a ProtoEmitter implementation +var _ ProtoEmitter = (*protoEmitter)(nil) + +type protoEmitter struct { + lggr logger.Logger + client *Client + schemaBasePath string +} + +func (e *protoEmitter) Emit(ctx context.Context, m proto.Message, attrKVs ...any) error { + payload, err := proto.Marshal(m) + if err != nil { + // Notice: we log here because emit errors are usually not critical and swallowed by the caller + e.lggr.Errorw("[Beholder] Failed to marshal", "err", err) + return err + } + + // Skip appending attributes if the context says it's already done that + if skip, ok := ctx.Value(CtxKeySkipAppendAttrs).(bool); !ok || !skip { + attrKVs = e.appendAttrsRequired(attrKVs, m) + } + + // Emit the message with attributes + err = e.client.Emitter.Emit(ctx, payload, attrKVs...) + if err != nil { + // Notice: we log here because emit errors are usually not critical and swallowed by the caller + e.lggr.Errorw("[Beholder] Failed to client.Emitter.Emit", "err", err) + return err + } + + return nil +} + +// EmitWithLog emits a protobuf message with attributes and logs the emitted message +func (e *protoEmitter) EmitWithLog(ctx context.Context, m proto.Message, attrKVs ...any) error { + attrKVs = e.appendAttrsRequired(attrKVs, m) + // attach a bool switch to ctx to avoid duplicating common attrs + ctx = context.WithValue(ctx, CtxKeySkipAppendAttrs, true) + + // Marshal the message as JSON and log before emitting + // https://protobuf.dev/programming-guides/json/ + mStr := protojson.MarshalOptions{ + UseProtoNames: true, + EmitUnpopulated: true, + }.Format(m) + e.lggr.Infow("[Beholder.emit]", "message", mStr, "attributes", attrKVs) + + return e.Emit(ctx, m, attrKVs...) +} + +// appendAttrsRequired appends required attributes to the attribute key-value list +func (e *protoEmitter) appendAttrsRequired(attrKVs []any, m proto.Message) []any { + attrKVs = appendRequiredAttrDataSchema(attrKVs, toSchemaPath(m, e.schemaBasePath)) + attrKVs = appendRequiredAttrEntity(attrKVs, m) + attrKVs = appendRequiredAttrDomain(attrKVs, m) + return attrKVs +} diff --git a/pkg/beholder/schema.go b/pkg/beholder/schema.go new file mode 100644 index 0000000000..2f33040c1c --- /dev/null +++ b/pkg/beholder/schema.go @@ -0,0 +1,108 @@ +//nolint:gosimple // disable gosimple +package beholder + +import ( + "fmt" + "path" + "regexp" + "strings" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/runtime/protoimpl" +) + +// patternSnake is a regular expression to match CamelCase words +// Notice: we use the Unicode property 'Lu' (uppercase letter) to match +// the first letter of the word, and 'P{Lu}' (not uppercase letter) to match +// the rest of the word. +var patternSnake = regexp.MustCompile("(\\p{Lu}+\\P{Lu}*)") + +// toSnakeCase converts a CamelCase to snake_case (used for type -> file name mapping) +func toSnakeCase(s string) string { + s = patternSnake.ReplaceAllString(s, "_${1}") + s, _ = strings.CutPrefix(strings.ToLower(s), "_") + return s +} + +// toSchemaName returns a protobuf message name (short) +func toSchemaName(m proto.Message) string { + return string(protoimpl.X.MessageTypeOf(m).Descriptor().Name()) +} + +// toSchemaName returns a protobuf message name (full) +func ToSchemaFullName(m proto.Message) string { + return string(protoimpl.X.MessageTypeOf(m).Descriptor().FullName()) +} + +// toSchemaPath maps a protobuf message to a Beholder schema path +func toSchemaPath(m proto.Message, basePath string) string { + // Notice: a name like 'platform.on_chain.forwarder.ReportProcessed' + protoName := ToSchemaFullName(m) + + // We map to a Beholder schema path like '/platform/on-chain/forwarder/report_processed.proto' + protoPath := protoName + protoPath = strings.ReplaceAll(protoPath, ".", "/") + protoPath = strings.ReplaceAll(protoPath, "_", "-") + + // Split the path components (at least one component) + pp := strings.Split(protoPath, "/") + pp[len(pp)-1] = toSnakeCase(pp[len(pp)-1]) + + // Join the path components again + protoPath = strings.Join(pp, "/") + protoPath = fmt.Sprintf("%s.proto", protoPath) + + // Return the full schema path + return path.Join(basePath, protoPath) +} + +// appendRequiredAttrDataSchema adds the message schema path as an attribute (required) +func appendRequiredAttrDataSchema(attrKVs []any, val string) []any { + if containsKey(attrKVs, AttrKeyDataSchema) { + return attrKVs + } + + attrKVs = append(attrKVs, AttrKeyDataSchema) + attrKVs = append(attrKVs, val) + return attrKVs +} + +// appendRequiredAttrEntity adds the message entity type as an attribute (required) +func appendRequiredAttrEntity(attrKVs []any, m proto.Message) []any { + if containsKey(attrKVs, AttrKeyEntity) { + return attrKVs + } + + attrKVs = append(attrKVs, AttrKeyEntity) + attrKVs = append(attrKVs, toSchemaName(m)) + return attrKVs +} + +// appendRequiredAttrDomain adds the message domain as an attribute (required) +func appendRequiredAttrDomain(attrKVs []any, m proto.Message) []any { + if containsKey(attrKVs, AttrKeyDomain) { + return attrKVs + } + + // Notice: a name like 'platform.on_chain.forwarder.ReportProcessed' + protoName := ToSchemaFullName(m) + + // Extract first path component (entrypoint package) as a domain + domain := "unknown" + if strings.Contains(protoName, ".") { + domain = strings.Split(protoName, ".")[0] + } + + attrKVs = append(attrKVs, AttrKeyDomain) + attrKVs = append(attrKVs, domain) + return attrKVs +} + +func containsKey(attrKVs []any, key string) bool { + for i := 0; i < len(attrKVs); i += 2 { + if attrKVs[i] == key { + return true + } + } + return false +} diff --git a/pkg/beholder/schema_test.go b/pkg/beholder/schema_test.go new file mode 100644 index 0000000000..2709e0acde --- /dev/null +++ b/pkg/beholder/schema_test.go @@ -0,0 +1,72 @@ +package beholder + +import ( + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" +) + +func makeDynamicMessage(t *testing.T, pkg, msgName string) protoreflect.ProtoMessage { + fdProto := &descriptorpb.FileDescriptorProto{ + Name: proto.String("test.proto"), + Package: proto.String(pkg), + MessageType: []*descriptorpb.DescriptorProto{{ + Name: proto.String(msgName), + }}, + } + + fd, err := protodesc.NewFile(fdProto, nil) + require.NoError(t, err) + + md := fd.Messages().ByName(protoreflect.Name(msgName)) + return dynamicpb.NewMessage(md) +} + +func TestToSchemaPath(t *testing.T) { + base := "/" + tests := []struct { + pkg, msgName, expected string + }{ + { + pkg: "alpha.bravo.charlie", + msgName: "FirstTest", + expected: path.Join(base, "alpha/bravo/charlie/first_test.proto"), + }, + { + pkg: "one.two", + msgName: "XMLEncode", + expected: path.Join(base, "one/two/xmlencode.proto"), + }, + { + pkg: "single", + msgName: "SimpleMessage", + expected: path.Join(base, "single/simple_message.proto"), + }, + { + pkg: "a.b.c.d.e", + msgName: "NestedLevel", + expected: path.Join(base, "a/b/c/d/e/nested_level.proto"), + }, + { + pkg: "mix.UpAndDOWN", + msgName: "CamelCaseID", + // package segment "UpAndDOWN" is left verbatim (no hyphenation), only the message gets snake_cased + expected: path.Join(base, "mix/UpAndDOWN/camel_case_id.proto"), + }, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + m := makeDynamicMessage(t, tt.pkg, tt.msgName) + got := toSchemaPath(m, base) + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/pkg/capabilities/consensus/ocr3/types/aggregator.go b/pkg/capabilities/consensus/ocr3/types/aggregator.go index 96ba82be06..40192f58a0 100644 --- a/pkg/capabilities/consensus/ocr3/types/aggregator.go +++ b/pkg/capabilities/consensus/ocr3/types/aggregator.go @@ -1,6 +1,11 @@ package types import ( + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "io" "strings" ocrcommon "github.com/smartcontractkit/libocr/commontypes" @@ -34,6 +39,148 @@ func (m *Metadata) padWorkflowName() { } } +// Encode serializes Metadata in contract order: +// 1B Version, 32B ExecutionID, 4B Timestamp, 4B DONID, 4B DONConfigVersion, +// 32B WorkflowID, 10B WorkflowName, 20B WorkflowOwner, 2B ReportID +func (m Metadata) Encode() ([]byte, error) { + m.padWorkflowName() + buf := new(bytes.Buffer) + + // 1) Version as a single byte + if err := buf.WriteByte(byte(m.Version)); err != nil { + return nil, err + } + + // 2) Helper to decode a hex string and ensure length + writeHex := func(field string, expectedBytes int) error { + s := strings.TrimPrefix(field, "0x") + b, err := hex.DecodeString(s) + if err != nil { + return fmt.Errorf("invalid hex in field: %w", err) + } + if len(b) != expectedBytes { + return fmt.Errorf("wrong length: expected %d bytes, got %d", expectedBytes, len(b)) + } + _, err = buf.Write(b) + return err + } + + // ExecutionID: 32 bytes + if err := writeHex(m.ExecutionID, 32); err != nil { + return nil, fmt.Errorf("ExecutionID: %w", err) + } + + // Timestamp, DONID, DONConfigVersion—all 4‐byte big endian + for _, v := range []uint32{m.Timestamp, m.DONID, m.DONConfigVersion} { + if err := binary.Write(buf, binary.BigEndian, v); err != nil { + return nil, err + } + } + + // WorkflowID: 32 bytes + if err := writeHex(m.WorkflowID, 32); err != nil { + return nil, fmt.Errorf("WorkflowID: %w", err) + } + + // Workflow Name: 10 bytes + if err := writeHex(m.WorkflowName, 10); err != nil { + return nil, fmt.Errorf("WorkflowName: %w", err) + } + + // WorkflowOwner: 20 bytes + if err := writeHex(m.WorkflowOwner, 20); err != nil { + return nil, fmt.Errorf("WorkflowOwner: %w", err) + } + + // ReportID: 2 bytes + if err := writeHex(m.ReportID, 2); err != nil { + return nil, fmt.Errorf("ReportID: %w", err) + } + + return buf.Bytes(), nil +} + +// 1B Version, 32B ExecutionID, 4B Timestamp, 4B DONID, 4B DONConfigVersion, +// 32B WorkflowID, 10B WorkflowName, 20B WorkflowOwner, 2B ReportID +const MetadataLen = 1 + 32 + 4 + 4 + 4 + 32 + 10 + 20 + 2 // =109 + +// Decode parses exactly MetadataLen bytes from raw, returns a Metadata struct +// and any trailing data. +func Decode(raw []byte) (Metadata, []byte, error) { + m := Metadata{} + + if len(raw) < MetadataLen { + return m, nil, fmt.Errorf("metadata: raw too short, want ≥%d, got %d", MetadataLen, len(raw)) + } + + buf := bytes.NewReader(raw[:MetadataLen]) + + // 1) Version (1 byte) + var vb byte + if err := binary.Read(buf, binary.BigEndian, &vb); err != nil { + return m, nil, err + } + m.Version = uint32(vb) + + // helper to read N bytes and hex-decode + readHex := func(n int) (string, error) { + tmp := make([]byte, n) + if _, err := io.ReadFull(buf, tmp); err != nil { + return "", err + } + return hex.EncodeToString(tmp), nil + } + + // 2) ExecutionID (32 bytes hex) + var err error + if m.ExecutionID, err = readHex(32); err != nil { + return m, nil, fmt.Errorf("ExecutionID: %w", err) + } + + // 3) Timestamp, DONID, DONConfigVersion (each 4 bytes BE) + for _, ptr := range []*uint32{&m.Timestamp, &m.DONID, &m.DONConfigVersion} { + if err := binary.Read(buf, binary.BigEndian, ptr); err != nil { + return m, nil, err + } + } + + // 4) WorkflowID (32 bytes hex) + if m.WorkflowID, err = readHex(32); err != nil { + return m, nil, fmt.Errorf("WorkflowID: %w", err) + } + + nameBytes := make([]byte, 10) + if _, err := io.ReadFull(buf, nameBytes); err != nil { + return m, nil, err + } + // hex-encode those 10 bytes into a 20-char string + m.WorkflowName = hex.EncodeToString(nameBytes) + + // 6) WorkflowOwner (20 bytes hex) + if m.WorkflowOwner, err = readHex(20); err != nil { + return m, nil, fmt.Errorf("WorkflowOwner: %w", err) + } + + // 7) ReportID (2 bytes hex) + if m.ReportID, err = readHex(2); err != nil { + return m, nil, fmt.Errorf("ReportID: %w", err) + } + + // strip any stray "0x" prefixes just in case + m.ExecutionID = strings.TrimPrefix(m.ExecutionID, "0x") + m.WorkflowID = strings.TrimPrefix(m.WorkflowID, "0x") + m.WorkflowOwner = strings.TrimPrefix(m.WorkflowOwner, "0x") + m.ReportID = strings.TrimPrefix(m.ReportID, "0x") + + // the rest is payload + tail := raw[MetadataLen:] + return m, tail, nil +} + +func (m Metadata) Length() int { + return MetadataLen +} + // Aggregator is the interface that enables a hook to the Outcome() phase of OCR reporting. type Aggregator interface { // Called by the Outcome() phase of OCR reporting. diff --git a/pkg/capabilities/consensus/ocr3/types/aggregator_test.go b/pkg/capabilities/consensus/ocr3/types/aggregator_test.go index 94660d1033..58948ae14e 100644 --- a/pkg/capabilities/consensus/ocr3/types/aggregator_test.go +++ b/pkg/capabilities/consensus/ocr3/types/aggregator_test.go @@ -1,11 +1,205 @@ package types import ( + "strings" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +func TestMetadata_EncodeDecode(t *testing.T) { + metadata := Metadata{ + Version: 1, + ExecutionID: "1234567890123456789012345678901234567890123456789012345678901234", + Timestamp: 1620000000, + DONID: 1, + DONConfigVersion: 1, + WorkflowID: "1234567890123456789012345678901234567890123456789012345678901234", + WorkflowName: "12", + WorkflowOwner: "1234567890123456789012345678901234567890", + ReportID: "1234", + } + + metadata.padWorkflowName() + + encoded, err := metadata.Encode() + require.NoError(t, err) + + require.Len(t, encoded, 109) + + // append tail to encoded + tail := []byte("tail") + encoded = append(encoded, tail...) + decoded, remaining, err := Decode(encoded) + require.NoError(t, err) + require.Equal(t, metadata.Version, decoded.Version) + require.Equal(t, metadata.ExecutionID, decoded.ExecutionID) + require.Equal(t, metadata.Timestamp, decoded.Timestamp) + require.Equal(t, metadata.DONID, decoded.DONID) + require.Equal(t, metadata.DONConfigVersion, decoded.DONConfigVersion) + require.Equal(t, metadata.WorkflowID, decoded.WorkflowID) + require.Equal(t, metadata.WorkflowName, decoded.WorkflowName) + require.Equal(t, metadata.WorkflowOwner, decoded.WorkflowOwner) + require.Equal(t, metadata.ReportID, decoded.ReportID) + require.Equal(t, tail, remaining) +} + +func TestMetadata_Length(t *testing.T) { + var m Metadata + require.Equal(t, MetadataLen, m.Length()) +} + +func TestPadWorkflowName_NoPadWhenExactLength(t *testing.T) { + // 20 hex characters = 10 bytes, exact length + original := "abcdef0123456789abcd" + m := &Metadata{WorkflowName: original} + m.padWorkflowName() + require.Equal(t, original, m.WorkflowName) +} + +func TestPadWorkflowName_TooLong(t *testing.T) { + // 22 hex characters = 11 bytes, should not be truncated by pad + original := "abcdef0123456789abcd01" + m := &Metadata{WorkflowName: original} + m.padWorkflowName() + require.Equal(t, original, m.WorkflowName) +} + +func TestEncode_InvalidHexFields(t *testing.T) { + m := Metadata{ + Version: 1, + ExecutionID: "zzzz", // invalid hex + Timestamp: 0, + DONID: 0, + DONConfigVersion: 0, + WorkflowID: strings.Repeat("00", 32), + WorkflowName: "00", + WorkflowOwner: strings.Repeat("00", 20), + ReportID: "0000", + } + _, err := m.Encode() + require.Error(t, err) + require.Contains(t, err.Error(), "invalid hex") +} + +func TestEncode_WrongLengthFields(t *testing.T) { + tests := []struct { + name string + m Metadata + }{ + { + name: "short ExecutionID", + m: Metadata{ + Version: 1, + ExecutionID: "00", // too short + Timestamp: 0, + DONID: 0, + DONConfigVersion: 0, + WorkflowID: strings.Repeat("00", 32), + WorkflowName: "00", + WorkflowOwner: strings.Repeat("00", 20), + ReportID: "0000", + }, + }, + { + name: "short WorkflowID", + m: Metadata{ + Version: 1, + ExecutionID: strings.Repeat("00", 32), + Timestamp: 0, + DONID: 0, + DONConfigVersion: 0, + WorkflowID: "00", // too short + WorkflowName: "00", + WorkflowOwner: strings.Repeat("00", 20), + ReportID: "0000", + }, + }, + { + name: "long WorkflowName", + m: Metadata{ + Version: 1, + ExecutionID: strings.Repeat("00", 32), + Timestamp: 0, + DONID: 0, + DONConfigVersion: 0, + WorkflowID: strings.Repeat("00", 32), + WorkflowName: strings.Repeat("01", 11), // 22 chars, >20 + WorkflowOwner: strings.Repeat("00", 20), + ReportID: "0000", + }, + }, + { + name: "short WorkflowOwner", + m: Metadata{ + Version: 1, + ExecutionID: strings.Repeat("00", 32), + Timestamp: 0, + DONID: 0, + DONConfigVersion: 0, + WorkflowID: strings.Repeat("00", 32), + WorkflowName: "00", + WorkflowOwner: "00", // too short + ReportID: "0000", + }, + }, + { + name: "short ReportID", + m: Metadata{ + Version: 1, + ExecutionID: strings.Repeat("00", 32), + Timestamp: 0, + DONID: 0, + DONConfigVersion: 0, + WorkflowID: strings.Repeat("00", 32), + WorkflowName: "00", + WorkflowOwner: strings.Repeat("00", 20), + ReportID: "00", // too short + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := tt.m.Encode() + require.Error(t, err) + require.Contains(t, err.Error(), "wrong length") + }) + } +} + +func TestDecode_RawTooShort(t *testing.T) { + _, _, err := Decode([]byte{0x01, 0x02}) + require.Error(t, err) + require.Contains(t, err.Error(), "raw too short") +} + +func TestDecode_RemainingData(t *testing.T) { + m := Metadata{ + Version: 1, + ExecutionID: strings.Repeat("11", 32), + Timestamp: 2, + DONID: 3, + DONConfigVersion: 4, + WorkflowID: strings.Repeat("22", 32), + WorkflowName: "33", + WorkflowOwner: strings.Repeat("44", 20), + ReportID: "5555", + } + m.padWorkflowName() + + encoded, err := m.Encode() + require.NoError(t, err) + // add extra bytes to simulate payload + extra := []byte("extra") + data := append(encoded, extra...) + + decoded, remaining, err := Decode(data) + require.NoError(t, err) + require.Equal(t, extra, remaining) + require.Equal(t, m, decoded) +} + func TestMetadata_padWorkflowName(t *testing.T) { type fields struct { WorkflowName string @@ -50,7 +244,7 @@ func TestMetadata_padWorkflowName(t *testing.T) { WorkflowName: tt.fields.WorkflowName, } m.padWorkflowName() - assert.Equal(t, tt.want, m.WorkflowName, tt.name) + require.Equal(t, tt.want, m.WorkflowName, tt.name) }) } } diff --git a/pkg/capabilities/events/events.go b/pkg/capabilities/events/events.go index 444f457053..78e28217ee 100644 --- a/pkg/capabilities/events/events.go +++ b/pkg/capabilities/events/events.go @@ -193,9 +193,9 @@ func (e *Emitter) Emit(ctx context.Context, msg Message) error { } attrs := []any{ - "beholder_data_schema", + beholder.AttrKeyDataSchema, "/capabilities-operational-event/versions/1", - "beholder_data_type", + beholder.AttrKeyDataType, "custom_message", } diff --git a/pkg/custmsg/custom_message.go b/pkg/custmsg/custom_message.go index 49ed8c459a..a2f9b7d2ad 100644 --- a/pkg/custmsg/custom_message.go +++ b/pkg/custmsg/custom_message.go @@ -111,9 +111,9 @@ func sendLogAsCustomMessageW(ctx context.Context, msg string, labels map[string] } err = beholder.GetEmitter().Emit(ctx, payloadBytes, - "beholder_data_schema", "/beholder-base-message/versions/1", // required - "beholder_domain", "platform", // required - "beholder_entity", "BaseMessage", // required + beholder.AttrKeyDataSchema, "/beholder-base-message/versions/1", // required + beholder.AttrKeyDomain, "platform", // required + beholder.AttrKeyEntity, "BaseMessage", // required ) if err != nil { return fmt.Errorf("sending custom message failed on emit: %w", err)