Skip to content

Commit 3f8beef

Browse files
Breaking: indicate root struct when establishing gRPC connection (#62)
Resolves #58 This will allow the same gRPC destination to accept STEF data that uses multiple different schemas (one schema per connection). A known use cases is Otel where we want to send metrics, traces, logs, etc. This should go to the same gRPC endpoint for simplicity but the destination needs to know what schema to use for decoding. The root struct is indicated in the first message from gRPC sender to destination, before the STEF stream bytes are sent.
1 parent 9540ee1 commit 3f8beef

File tree

21 files changed

+380
-147
lines changed

21 files changed

+380
-147
lines changed

go/grpc/client.go

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ type ClientCallbacks struct {
2424
OnAck func(ackId uint64) error
2525
}
2626

27+
// Client is a client for communicating over STEF/gRPC protocol.
2728
type Client struct {
2829
grpcClient stef_proto.STEFDestinationClient
2930
stream stef_proto.STEFDestination_StreamClient
3031
callbacks ClientCallbacks
31-
clientSchema *schema.WireSchema
32+
clientSchema ClientSchema
3233
logger types.Logger
3334

3435
// Running state
@@ -84,15 +85,47 @@ func (w *grpcWriter) WriteChunk(header []byte, content []byte) error {
8485

8586
var ErrServerInvalidResponse = errors.New("invalid server response")
8687

88+
// ClientSettings contains configuration settings for creating a Client.
8789
type ClientSettings struct {
90+
// Logger instance used for logging client operations.
8891
Logger types.Logger
92+
8993
// gRPC stream to send data over.
90-
GrpcClient stef_proto.STEFDestinationClient
91-
ClientSchema *schema.WireSchema
92-
Callbacks ClientCallbacks
94+
GrpcClient stef_proto.STEFDestinationClient
95+
96+
// ClientSchema of the client.
97+
ClientSchema ClientSchema
98+
99+
// Callbacks for handling events such as acknowledgments and disconnections.
100+
Callbacks ClientCallbacks
93101
}
94102

95-
func NewClient(settings ClientSettings) *Client {
103+
type ClientSchema struct {
104+
// The name of the root struct of the schema.
105+
RootStructName string
106+
107+
// The wire schema of the client.
108+
WireSchema *schema.WireSchema
109+
}
110+
111+
// NewClient creates a new instance of the Client.
112+
//
113+
// Requirements:
114+
// - The `RootStructName` in `ClientSchema` must not be empty.
115+
// - The `WireSchema` in `ClientSchema` must not be nil.
116+
//
117+
// Example:
118+
//
119+
// clientSettings := stefgrpc.ClientSettings{
120+
// GrpcClient: grpcClient,
121+
// ClientSchema: stefgrpc.ClientSchema{RootStructName: "Metrics", WireSchema: &schema},
122+
// Callbacks: stefgrpc.ClientCallbacks{},
123+
// }
124+
// client, err := stefgrpc.NewClient(clientSettings)
125+
// if err != nil {
126+
// log.Fatalf("Failed to create client: %v", err)
127+
// }
128+
func NewClient(settings ClientSettings) (*Client, error) {
96129
if settings.Logger == nil {
97130
settings.Logger = internal.NopLogger{}
98131
}
@@ -101,6 +134,13 @@ func NewClient(settings ClientSettings) *Client {
101134
settings.Callbacks.OnDisconnect = func(err error) {}
102135
}
103136

137+
if settings.ClientSchema.RootStructName == "" {
138+
return nil, fmt.Errorf("client schema root struct name is empty")
139+
}
140+
if settings.ClientSchema.WireSchema == nil {
141+
return nil, fmt.Errorf("client schema wire schema is nil")
142+
}
143+
104144
client := &Client{
105145
grpcClient: settings.GrpcClient,
106146
callbacks: settings.Callbacks,
@@ -109,7 +149,7 @@ func NewClient(settings ClientSettings) *Client {
109149
waitCh: make(chan struct{}),
110150
}
111151

112-
return client
152+
return client, nil
113153
}
114154

115155
func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOptions, error) {
@@ -137,6 +177,19 @@ func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOption
137177
}
138178
defer closeOnErr()
139179

180+
// Send the first message to the server, include the root struct name.
181+
clientMsg := &stef_proto.STEFClientFirstMessage{
182+
RootStructName: c.clientSchema.RootStructName,
183+
}
184+
err = stream.Send(
185+
&stef_proto.STEFClientMessage{
186+
FirstMessage: clientMsg,
187+
},
188+
)
189+
if err != nil {
190+
return nil, opts, fmt.Errorf("failed to send to server: %w", err)
191+
}
192+
140193
// The server must send capabilities message.
141194
message, err := stream.Recv()
142195
if err != nil {
@@ -162,7 +215,7 @@ func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOption
162215
}
163216

164217
// Check if server schema is backward compatible with client schema.
165-
compatibility, err := serverSchema.Compatible(c.clientSchema)
218+
compatibility, err := serverSchema.Compatible(c.clientSchema.WireSchema)
166219
switch compatibility {
167220
case schema.CompatibilityExact:
168221
// Schemas match exactly, nothing else is needed, can start sending data.
@@ -171,12 +224,12 @@ func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOption
171224
// ServerStream schema is superset of client schema. The client MUST specify its schema
172225
// in the STEF header.
173226
opts.IncludeDescriptor = true
174-
opts.Schema = c.clientSchema
227+
opts.Schema = c.clientSchema.WireSchema
175228

176229
case schema.CompatibilityIncompatible:
177230
// It is neither exact match nor is server schema a superset, but server schema maybe subset.
178231
// Check the opposite direction: if client schema is backward compatible with server schema.
179-
compatibility, err = serverSchema.Compatible(c.clientSchema)
232+
compatibility, err = serverSchema.Compatible(c.clientSchema.WireSchema)
180233

181234
if err != nil || compatibility == schema.CompatibilityIncompatible {
182235
return nil, opts, fmt.Errorf("client and server schemas are incompatble: %w", err)
@@ -185,7 +238,7 @@ func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOption
185238
if compatibility == schema.CompatibilitySuperset {
186239
// Client schema is superset of server schema. The client MUST downgrade its schema.
187240
opts.IncludeDescriptor = true
188-
opts.Schema = c.clientSchema
241+
opts.Schema = c.clientSchema.WireSchema
189242
}
190243
}
191244

go/grpc/proto/destination.proto

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,38 @@ service STEFDestination {
1616
}
1717

1818
message STEFClientMessage {
19+
// The client MUST set first_message field in the first STEFClientMessage sent.
20+
// All other fields MUST be unset when first_message is set.
21+
// All subsequent messages MUST have first_message unset.
22+
STEFClientFirstMessage first_message = 1;
23+
1924
// The bytes of STEF stream. The recipient is responsible for assembling the
2025
// STEF data stream from a sequence of messages in the order the
2126
// messages are received and decoding the STEF data stream.
2227
//
2328
// See specification.md for specification of STEF stream.
24-
bytes stef_bytes = 1;
29+
bytes stef_bytes = 2;
2530

2631
// Indicates that the last byte of tef_bytes is also an end of a chunk (a STEF header or
2732
// STEF frame). This can be used by recipients to accumulates bytes until the end of
2833
// the chunk is encountered and only then start decoding the chunk.
2934
// Clients MUST ensure they mark this field true at least once in a while otherwise
3035
// recipients may never start decoding the data.
31-
bool is_end_of_chunk = 2;
36+
bool is_end_of_chunk = 3;
37+
}
38+
39+
// ClientFirstMessage is the first message sent by the client to the destination.
40+
// The client MUST send this message first. The destination MUST respond with
41+
// STEFDestinationCapabilities message. The client MUST NOT send any other
42+
// messages until it receives STEFDestinationCapabilities message from the
43+
// destination. The client MUST NOT send STEF data until it receives
44+
// STEFDestinationCapabilities message from the destination.
45+
message STEFClientFirstMessage {
46+
// The name of the root struct of the client's schema. This is useful
47+
// for destinations that accept multiple schemas and need to know which schema
48+
// the client is using. The destination will use this information to
49+
// determine the schema to use for decoding the data.
50+
string root_struct_name = 1;
3251
}
3352

3453
message STEFDestinationCapabilities {
@@ -70,6 +89,8 @@ message STEFDictionaryLimits {
7089

7190

7291
message STEFServerMessage {
92+
// TODO: refactor this to avoid using oneof message to reduce
93+
// allocations for the most common case of STEFDataResponse.
7394
oneof message {
7495
STEFDestinationCapabilities capabilities = 1;
7596
STEFDataResponse response = 2;

go/grpc/server.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,27 @@ func NewStreamServer(settings ServerSettings) *StreamServer {
181181
}
182182

183183
func (s *StreamServer) Stream(server stef_proto.STEFDestination_StreamServer) error {
184+
// Receive the first message from the client.
185+
clientMsg, err := server.Recv()
186+
if err != nil {
187+
return fmt.Errorf("failed to receive a message from the client: %w", err)
188+
}
189+
if clientMsg.FirstMessage == nil {
190+
return fmt.Errorf("FirstMessage is nil")
191+
}
192+
if clientMsg.FirstMessage.RootStructName == "" {
193+
return fmt.Errorf("RootStructName is unspecified")
194+
}
195+
196+
// Send capabilities message to the client.
197+
198+
// Prepare the schema bytes.
184199
var schemaBytes bytes.Buffer
185-
err := s.serverSchema.Serialize(&schemaBytes)
200+
err = s.serverSchema.Serialize(&schemaBytes)
186201
if err != nil {
187202
return fmt.Errorf("could not marshal server schema: %w", err)
188203
}
189204

190-
// Send capabilities message to the client.
191205
message := stef_proto.STEFServerMessage{
192206
Message: &stef_proto.STEFServerMessage_Capabilities{
193207
Capabilities: &stef_proto.STEFDestinationCapabilities{

0 commit comments

Comments
 (0)