Skip to content

Commit 825fc52

Browse files
committed
Add Collector Payload Thrift to/from JSON functions
1 parent 7124e85 commit 825fc52

File tree

4 files changed

+173
-0
lines changed

4 files changed

+173
-0
lines changed

docs/configuration_transformations_docs_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) {
141141
configObject = &transform.SetPkConfig{}
142142
case "spEnrichedToJson":
143143
configObject = &transform.EnrichedToJSONConfig{}
144+
case "spCollectorPayloadThriftToJSON":
145+
configObject = &transform.CollectorPayloadThriftToJSONConfig{}
146+
case "spJSONToCollectorPayloadThrift":
147+
configObject = &transform.jsonToCollectorPayloadThriftConfig{}
144148
case "js":
145149
configObject = &engine.JSEngineConfig{}
146150
case "lua":
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
//
2+
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
3+
//
4+
// This program is licensed to you under the Snowplow Community License Version 1.0,
5+
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
6+
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
7+
8+
package transform
9+
10+
import (
11+
"context"
12+
"errors"
13+
14+
"github.com/snowplow/snowbridge/config"
15+
"github.com/snowplow/snowbridge/pkg/models"
16+
17+
collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
18+
)
19+
20+
// CollectorPayloadThriftToJSONConfig is a configuration object for the spCollectorPayloadThriftToJSON transformation
21+
type CollectorPayloadThriftToJSONConfig struct {
22+
}
23+
24+
type collectorPayloadThriftToJSONAdapter func(i interface{}) (interface{}, error)
25+
26+
// Create implements the ComponentCreator interface.
27+
func (f collectorPayloadThriftToJSONAdapter) Create(i interface{}) (interface{}, error) {
28+
return f(i)
29+
}
30+
31+
// ProvideDefault implements the ComponentConfigurable interface
32+
func (f collectorPayloadThriftToJSONAdapter) ProvideDefault() (interface{}, error) {
33+
// Provide defaults
34+
cfg := &CollectorPayloadThriftToJSONConfig{}
35+
36+
return cfg, nil
37+
}
38+
39+
// adapterGenerator returns a spCollectorPayloadThriftToJSON transformation adapter.
40+
func collectorPayloadThriftToJSONAdapterGenerator(f func(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error)) collectorPayloadThriftToJSONAdapter {
41+
return func(i interface{}) (interface{}, error) {
42+
cfg, ok := i.(*CollectorPayloadThriftToJSONConfig)
43+
if !ok {
44+
return nil, errors.New("invalid input, expected collectorPayloadThriftToJSONConfig")
45+
}
46+
47+
return f(cfg)
48+
}
49+
}
50+
51+
// collectorPayloadThriftToJSONConfigFunction returns an spCollectorPayloadThriftToJSON transformation function, from an collectorPayloadThriftToJSONConfig.
52+
func collectorPayloadThriftToJSONConfigFunction(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error) {
53+
return SpCollectorPayloadThriftToJSON, nil
54+
}
55+
56+
// CollectorPayloadThriftToJSONConfigPair is a configuration pair for the spCollectorPayloadThriftToJSON transformation
57+
var CollectorPayloadThriftToJSONConfigPair = config.ConfigurationPair{
58+
Name: "spCollectorPayloadThriftToJSON",
59+
Handle: collectorPayloadThriftToJSONAdapterGenerator(collectorPayloadThriftToJSONConfigFunction),
60+
}
61+
62+
// SpCollectorPayloadThriftToJSON is a specific transformation implementation to transform a Thrift encoded Collector Payload
63+
// to a JSON string representation.
64+
func SpCollectorPayloadThriftToJSON(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
65+
ctx := context.Background()
66+
67+
// Deserialize the Collector Payload to a struct
68+
res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data)
69+
if deserializeErr != nil {
70+
message.SetError(deserializeErr)
71+
return nil, nil, message, nil
72+
}
73+
74+
// Re-encode as a JSON string to be able to leverage it downstream
75+
resJSON, jsonErr := collectorpayload.ToJSON(res)
76+
if jsonErr != nil {
77+
message.SetError(jsonErr)
78+
return nil, nil, message, nil
79+
}
80+
81+
message.Data = resJSON
82+
return message, nil, nil, intermediateState
83+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//
2+
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
3+
//
4+
// This program is licensed to you under the Snowplow Community License Version 1.0,
5+
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
6+
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
7+
8+
package transform
9+
10+
import (
11+
"context"
12+
"encoding/json"
13+
"errors"
14+
15+
"github.com/snowplow/snowbridge/config"
16+
"github.com/snowplow/snowbridge/pkg/models"
17+
18+
collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
19+
collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1"
20+
)
21+
22+
// JSONToCollectorPayloadThriftConfig is a configuration object for the spJSONToCollectorPayloadThrift transformation
23+
type JSONToCollectorPayloadThriftConfig struct {
24+
}
25+
26+
type jsonToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error)
27+
28+
// Create implements the ComponentCreator interface.
29+
func (f jsonToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) {
30+
return f(i)
31+
}
32+
33+
// ProvideDefault implements the ComponentConfigurable interface
34+
func (f jsonToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) {
35+
// Provide defaults
36+
cfg := &JSONToCollectorPayloadThriftConfig{}
37+
38+
return cfg, nil
39+
}
40+
41+
// adapterGenerator returns a spJSONToCollectorPayloadThrift transformation adapter.
42+
func jsonToCollectorPayloadThriftAdapterGenerator(f func(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error)) jsonToCollectorPayloadThriftAdapter {
43+
return func(i interface{}) (interface{}, error) {
44+
cfg, ok := i.(*JSONToCollectorPayloadThriftConfig)
45+
if !ok {
46+
return nil, errors.New("invalid input, expected jsonToCollectorPayloadThriftConfig")
47+
}
48+
49+
return f(cfg)
50+
}
51+
}
52+
53+
// jsonToCollectorPayloadThriftConfigFunction returns an spJSONToCollectorPayloadThrift transformation function, from an jsonToCollectorPayloadThriftConfig.
54+
func jsonToCollectorPayloadThriftConfigFunction(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error) {
55+
return SpJSONToCollectorPayloadThrift, nil
56+
}
57+
58+
// JSONToCollectorPayloadThriftConfigPair is a configuration pair for the spJSONToCollectorPayloadThrift transformation
59+
var JSONToCollectorPayloadThriftConfigPair = config.ConfigurationPair{
60+
Name: "spJSONToCollectorPayloadThrift",
61+
Handle: jsonToCollectorPayloadThriftAdapterGenerator(jsonToCollectorPayloadThriftConfigFunction),
62+
}
63+
64+
// SpJSONToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload
65+
// so that it can be pushed directly into the egress stream of a Collector.
66+
func SpJSONToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
67+
var p *collectorpayloadmodel1.CollectorPayload
68+
unmarshallErr := json.Unmarshal(message.Data, &p)
69+
if unmarshallErr != nil {
70+
message.SetError(unmarshallErr)
71+
return nil, nil, message, nil
72+
}
73+
74+
ctx := context.Background()
75+
76+
res, serializeErr := collectorpayload.BinarySerializer(ctx, p)
77+
if serializeErr != nil {
78+
message.SetError(serializeErr)
79+
return nil, nil, message, nil
80+
}
81+
82+
message.Data = res
83+
return message, nil, nil, intermediateState
84+
}

pkg/transform/transformconfig/transform_config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var SupportedTransformations = []config.ConfigurationPair{
2323
filter.ContextFilterConfigPair,
2424
transform.SetPkConfigPair,
2525
transform.EnrichedToJSONConfigPair,
26+
transform.CollectorPayloadThriftToJSONConfigPair,
27+
transform.JSONToCollectorPayloadThriftConfigPair,
2628
engine.LuaConfigPair,
2729
engine.JSConfigPair,
2830
}

0 commit comments

Comments
 (0)