Skip to content

Commit 982323d

Browse files
Add tracing objects (#222)
Co-authored-by: pyansys-ci-bot <[email protected]>
1 parent cb6bce7 commit 982323d

File tree

5 files changed

+186
-5
lines changed

5 files changed

+186
-5
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
golang.org/x/mod v0.17.0
1818
google.golang.org/grpc v1.72.0
1919
google.golang.org/protobuf v1.36.6
20+
nhooyr.io/websocket v1.8.17
2021
)
2122

2223
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,3 +234,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
234234
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
235235
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
236236
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
237+
nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y=
238+
nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=

pkg/clients/flowkitclient/flowkitclient.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func ListFunctionsAndSaveToInteralStates(url string, apiKey string) (err error)
206206
// Returns:
207207
// - map[string]sharedtypes.FilledInputOutput: the outputs of the function
208208
// - error: an error message if the gRPC call fails
209-
func RunFunction(functionName string, inputs map[string]sharedtypes.FilledInputOutput) (outputs map[string]sharedtypes.FilledInputOutput, err error) {
209+
func RunFunction(ctx *logging.ContextMap, functionName string, inputs map[string]sharedtypes.FilledInputOutput) (outputs map[string]sharedtypes.FilledInputOutput, err error) {
210210
defer func() {
211211
r := recover()
212212
if r != nil {
@@ -231,6 +231,12 @@ func RunFunction(functionName string, inputs map[string]sharedtypes.FilledInputO
231231
ctxWithCancel, cancel := context.WithCancel(context.Background())
232232
defer cancel()
233233

234+
// get logging metadata from context
235+
ctxWithMetadata, err := logging.CreateMetaDataFromCtx(ctx, ctxWithCancel)
236+
if err != nil {
237+
return nil, fmt.Errorf("error adding metadata: %v", err)
238+
}
239+
234240
// Convert inputs to gRPC format based on order from function definition
235241
grpcInputs := []*aaliflowkitgrpc.FunctionInput{}
236242
for _, inputDef := range functionDef.Inputs {
@@ -260,7 +266,7 @@ func RunFunction(functionName string, inputs map[string]sharedtypes.FilledInputO
260266
}
261267

262268
// Call RunFunction
263-
runResp, err := c.RunFunction(ctxWithCancel, &aaliflowkitgrpc.FunctionInputs{
269+
runResp, err := c.RunFunction(ctxWithMetadata, &aaliflowkitgrpc.FunctionInputs{
264270
Name: functionName,
265271
Inputs: grpcInputs,
266272
})
@@ -321,6 +327,14 @@ func StreamFunction(ctx *logging.ContextMap, functionName string, inputs map[str
321327
// Create a context with a cancel
322328
ctxWithCancel, cancel := context.WithCancel(context.Background())
323329

330+
// get logging metadata from context
331+
ctxWithMetadata, err := logging.CreateMetaDataFromCtx(ctx, ctxWithCancel)
332+
if err != nil {
333+
conn.Close()
334+
cancel()
335+
return nil, fmt.Errorf("error adding metadata: %v", err)
336+
}
337+
324338
// Convert inputs to gRPC format based on order from function definition
325339
grpcInputs := []*aaliflowkitgrpc.FunctionInput{}
326340
for _, inputDef := range functionDef.Inputs {
@@ -352,7 +366,7 @@ func StreamFunction(ctx *logging.ContextMap, functionName string, inputs map[str
352366
}
353367

354368
// Call StreamFunction
355-
stream, err := c.StreamFunction(ctxWithCancel, &aaliflowkitgrpc.FunctionInputs{
369+
stream, err := c.StreamFunction(ctxWithMetadata, &aaliflowkitgrpc.FunctionInputs{
356370
Name: functionName,
357371
Inputs: grpcInputs,
358372
})
@@ -491,8 +505,20 @@ func apiKeyInterceptor(apiKey string) grpc.UnaryClientInterceptor {
491505
invoker grpc.UnaryInvoker,
492506
opts ...grpc.CallOption,
493507
) error {
494-
// Add API key to the context metadata
495-
md := metadata.Pairs("x-api-key", apiKey)
508+
// Get existing metadata from context (if any)
509+
md, ok := metadata.FromOutgoingContext(ctx)
510+
if !ok {
511+
// No existing metadata, create new
512+
md = metadata.MD{}
513+
} else {
514+
// Copy the metadata to avoid modifying the original
515+
md = md.Copy()
516+
}
517+
518+
// Add API key to the existing metadata (this preserves other keys)
519+
md.Set("x-api-key", apiKey)
520+
521+
// Create new context with MERGED metadata
496522
ctx = metadata.NewOutgoingContext(ctx, md)
497523

498524
// Invoke the RPC with the modified context

pkg/logging/logging.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@ package logging
2424

2525
import (
2626
"bytes"
27+
"context"
2728
"encoding/json"
2829
"fmt"
2930
"net/http"
3031
"os"
3132
"time"
3233

34+
"nhooyr.io/websocket"
35+
3336
"github.com/ansys/aali-sharedtypes/pkg/config"
3437
"go.uber.org/zap"
3538
"go.uber.org/zap/zapcore"
39+
"google.golang.org/grpc/metadata"
3640
)
3741

3842
///////////////////////////////////
@@ -765,3 +769,148 @@ func writeStringToFile(filename string, data string) error {
765769
_, err = fmt.Fprintln(file, data)
766770
return err
767771
}
772+
773+
///////////////////////////////////
774+
// Log Context metadata functions
775+
///////////////////////////////////
776+
777+
// CreateMetaDataFromCtx creates gRPC metadata from the given ContextMap and attaches it to the provided context.
778+
//
779+
// Parameters:
780+
// - ctx: the logging context map containing metadata values
781+
// - ctxWithCancel: the gRPC context to which the metadata will be attached
782+
//
783+
// Returns:
784+
// - ctxWithMetaData: the new gRPC context with the attached metadata
785+
// - err: an error if the metadata creation or attachment fails
786+
func CreateMetaDataFromCtx(ctx *ContextMap, ctxWithCancel context.Context) (ctxWithMetaData context.Context, err error) {
787+
// Append body with context
788+
body := []map[string]interface{}{
789+
{},
790+
}
791+
ctx.data.Range(func(key, value interface{}) bool {
792+
body[0][string(key.(ContextKey))] = value
793+
return true
794+
})
795+
796+
// Serialize struct to JSON
797+
jsonData, err := json.Marshal(&body)
798+
if err != nil {
799+
return nil, fmt.Errorf("error serializing metadata struct to JSON: %v", err)
800+
}
801+
802+
// Attach metadata to gRPC context
803+
md := metadata.Pairs(
804+
"aali-logging-context", string(jsonData),
805+
)
806+
return metadata.NewOutgoingContext(ctxWithCancel, md), nil
807+
}
808+
809+
// CreateCtxFromMetaData creates a ContextMap from gRPC metadata in the provided context.
810+
//
811+
// Parameters:
812+
// - ctxWithMetaData: the gRPC context containing the metadata
813+
//
814+
// Returns:
815+
// - ctx: the logging context map created from the metadata
816+
// - err: an error if the metadata extraction or deserialization fails
817+
func CreateCtxFromMetaData(ctxWithMetaData context.Context) (ctx *ContextMap, err error) {
818+
// Create new ContextMap
819+
ctx = &ContextMap{}
820+
821+
// Extract metadata from incoming context
822+
md, ok := metadata.FromIncomingContext(ctxWithMetaData)
823+
if !ok {
824+
return ctx, nil
825+
}
826+
827+
// Get the aali-logging-context value
828+
metadataValues := md.Get("aali-logging-context")
829+
if len(metadataValues) == 0 {
830+
return ctx, nil
831+
}
832+
833+
// Take the first value (there should only be one)
834+
jsonData := metadataValues[0]
835+
836+
// Deserialize JSON to body
837+
var body []map[string]interface{}
838+
err = json.Unmarshal([]byte(jsonData), &body)
839+
if err != nil {
840+
return nil, fmt.Errorf("error deserializing JSON to metadata: %v", err)
841+
}
842+
843+
// Populate the ContextMap with data from body
844+
if len(body) > 0 && body[0] != nil {
845+
for key, value := range body[0] {
846+
ctx.data.Store(ContextKey(key), value)
847+
}
848+
}
849+
850+
return ctx, nil
851+
}
852+
853+
// CreateDialOptionsFromCtx creates websocket dial options from the given ContextMap.
854+
//
855+
// Parameters:
856+
// - ctx: the logging context map containing metadata values
857+
//
858+
// Returns:
859+
// - opts: the websocket dial options with the attached metadata
860+
// - err: an error if the metadata creation fails
861+
func CreateDialOptionsFromCtx(ctx *ContextMap) (opts *websocket.DialOptions, err error) {
862+
// Append body with context
863+
body := []map[string]interface{}{
864+
{},
865+
}
866+
ctx.data.Range(func(key, value interface{}) bool {
867+
body[0][string(key.(ContextKey))] = value
868+
return true
869+
})
870+
871+
// Serialize struct to JSON
872+
jsonData, err := json.Marshal(&body)
873+
if err != nil {
874+
return nil, fmt.Errorf("error serializing metadata struct to JSON: %v", err)
875+
}
876+
opts = &websocket.DialOptions{
877+
HTTPHeader: http.Header{
878+
"aali-logging-context": []string{string(jsonData)},
879+
},
880+
}
881+
return opts, nil
882+
}
883+
884+
// CreateCtxFromHeader creates a ContextMap from HTTP request headers.
885+
//
886+
// Parameters:
887+
// - request: the HTTP request containing the headers
888+
//
889+
// Returns:
890+
// - ctx: the logging context map created from the headers
891+
// - err: an error if the header extraction or deserialization fails
892+
func CreateCtxFromHeader(request *http.Request) (ctx *ContextMap, err error) {
893+
// Create new ContextMap
894+
ctx = &ContextMap{}
895+
896+
// Get the aali-logging-context value
897+
meta := request.Header.Get("aali-logging-context")
898+
if meta == "" {
899+
return ctx, nil
900+
}
901+
902+
// Deserialize JSON to body
903+
var body []map[string]interface{}
904+
err = json.Unmarshal([]byte(meta), &body)
905+
if err != nil {
906+
return nil, fmt.Errorf("error deserializing JSON to metadata: %v", err)
907+
}
908+
909+
// Populate the ContextMap with data from body
910+
if len(body) > 0 && body[0] != nil {
911+
for key, value := range body[0] {
912+
ctx.data.Store(ContextKey(key), value)
913+
}
914+
}
915+
return ctx, nil
916+
}

pkg/logging/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type ContextKey string
3333

3434
const (
3535
InstructionGuid ContextKey = "instructionGuid"
36+
WorkflowId ContextKey = "workflowId"
37+
WorkflowRunId ContextKey = "workflowRunId"
38+
UserId ContextKey = "userId"
3639
AdapterType ContextKey = "adapterType"
3740
WatchFolderPath ContextKey = "watchFolderPath"
3841
WatchFilePath ContextKey = "watchFilePath"

0 commit comments

Comments
 (0)