From 44775bfc3f9e563107f0ba9453a5e93ff55a6faa Mon Sep 17 00:00:00 2001 From: David Cha Date: Thu, 11 Sep 2025 14:09:01 +0100 Subject: [PATCH 1/2] MatBU: Changes to allow data be picked up from datadog; --- pkg/externalfunctions/ansysmaterials.go | 64 +++++++++++++++---------- pkg/externalfunctions/types.go | 2 +- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/pkg/externalfunctions/ansysmaterials.go b/pkg/externalfunctions/ansysmaterials.go index 3b1c3fc4..9af9c431 100644 --- a/pkg/externalfunctions/ansysmaterials.go +++ b/pkg/externalfunctions/ansysmaterials.go @@ -53,10 +53,14 @@ type DDTags string const ( workflowTag DDTags = "dd.workflow" LLMAssistedSelection DDTags = "LLMAssistedSelection" - apiKeyTag DDTags = "dd.api_keyVisible" + userIDTag DDTags = "dd.user_idVisible" traceIDTag DDTags = "dd.trace_idVisible" spanIDTag DDTags = "dd.span_idVisible" parentIDTag DDTags = "dd.parent_idVisible" + totalTokenUsageTag DDTags = "dd.total_token_usageVisible" + inputTokenUsageTag DDTags = "dd.input_token_usageVisible" + outputTokenUsageTag DDTags = "dd.output_token_usageVisible" + llmResponseTimeTag DDTags = "dd.llm_response_time_secondsVisible" ) // StartTrace generates a new trace ID and span ID for tracing @@ -64,21 +68,17 @@ const ( // Tags: // - @displayName: Start new trace // -// Parameters: -// - apiKey: the api key used for authentication -// // Returns: // - traceID: a 128-bit trace ID in decimal format // - spanID: a 64-bit span ID in decimal format -func StartTrace(apiKey string) (traceID string, spanID string) { +func StartTrace() (traceID string, spanID string) { traceID = generateTraceID() spanID = generateSpanID() ctx := &logging.ContextMap{} ctx.Set(logging.ContextKey(workflowTag), LLMAssistedSelection) - ctx.Set(logging.ContextKey(apiKeyTag), apiKey) ctx.Set(logging.ContextKey(traceIDTag), traceID) ctx.Set(logging.ContextKey(spanIDTag), spanID) - logging.Log.Infof(ctx, "Starting new trace for ApiKey: %s with trace ID: %s and span ID: %s", apiKey, traceID, spanID) + logging.Log.Infof(ctx, "Starting new trace with trace ID: %s and span ID: %s", traceID, spanID) return traceID, spanID } @@ -109,8 +109,6 @@ func CreateChildSpan(ctx *logging.ContextMap, traceID string, parentSpanID strin ctx.Set(logging.ContextKey(spanIDTag), childSpanID) ctx.Set(logging.ContextKey(parentIDTag), parentSpanID) - logging.Log.Infof(ctx, "Starting child span with trace ID: %s, span ID: %s, and parent span ID: %s", traceID, childSpanID, parentSpanID) - return childSpanID } @@ -317,7 +315,7 @@ func ExtractCriteriaSuggestions(llmResponse string, traceID string, spanID strin // - tokenCount: the total token count (input tokens × n + combined output tokens) // - childSpanID: the child span ID created for this operation func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(input string, history []sharedtypes.HistoricMessage, - systemPrompt string, modelIds []string, tokenCountModelName string, n int, temperature float64, traceID string, spanID string) (uniqueCriterion []sharedtypes.MaterialLlmCriterion, tokenCount int, childSpanID string) { + systemPrompt string, modelIds []string, tokenCountModelName string, n int, temperature float64, traceID string, spanID string, userID string) (uniqueCriterion []sharedtypes.MaterialLlmCriterion, tokenCount int, childSpanID string) { ctx := &logging.ContextMap{} childSpanID = CreateChildSpan(ctx, traceID, spanID) @@ -351,7 +349,7 @@ func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(inp logging.Log.Debugf(ctx, "User prompt: %s", input) // Collect all responses with child span for parallel execution - allResponses := runRequestsInParallel(n, sendRequest, traceID, childSpanID) + allResponses, llmResponseTime := runRequestsInParallel(n, sendRequest, traceID, childSpanID) // Extract criteria from all responses with child span var allCriteria []sharedtypes.MaterialLlmCriterion @@ -376,8 +374,18 @@ func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(inp logging.Log.Debugf(ctx, "Output token count: %d", outputTokenCount) logging.Log.Debugf(ctx, "Total token count: %d", totalTokenCount) + // Set the token count and userID as tag to be able to pick them up from Datadog without requiring a pipeline + var tokenCounterContext = &logging.ContextMap{} + CreateChildSpan(tokenCounterContext, traceID, spanID) + tokenCounterContext.Set(logging.ContextKey(userIDTag), userID) + tokenCounterContext.Set(logging.ContextKey(totalTokenUsageTag), totalTokenCount) + tokenCounterContext.Set(logging.ContextKey(inputTokenUsageTag), inputTokenCount) + tokenCounterContext.Set(logging.ContextKey(outputTokenUsageTag), outputTokenCount) + tokenCounterContext.Set(logging.ContextKey(llmResponseTimeTag), fmt.Sprintf("%.2f", llmResponseTime)) + logging.Log.Infof(tokenCounterContext, "Token usage for UserID %s - Input tokens: %d, Output tokens: %d, Total tokens: %d. Response time: %.2f seconds", userID, inputTokenCount, outputTokenCount, totalTokenCount, llmResponseTime) + if len(allCriteria) == 0 { - logging.Log.Debugf(ctx, "No valid criteria found in any response") + logging.Log.Infof(ctx, "No valid criteria found in any response") return []sharedtypes.MaterialLlmCriterion{}, outputTokenCount, childSpanID } @@ -387,9 +395,12 @@ func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(inp return uniqueCriterion, totalTokenCount, childSpanID } -func runRequestsInParallel(n int, sendRequest func() string, traceID string, spanID string) []string { +func runRequestsInParallel(n int, sendRequest func() string, traceID string, spanID string) ([]string, float64) { ctx := &logging.ContextMap{} _ = CreateChildSpan(ctx, traceID, spanID) + + startTime := time.Now() + responseChan := make(chan string, n) var wg sync.WaitGroup @@ -417,7 +428,10 @@ func runRequestsInParallel(n int, sendRequest func() string, traceID string, spa logging.Log.Debugf(ctx, "Raw LLM response: %s", response) allResponses = append(allResponses, response) } - return allResponses + + llmResponseTime := time.Since(startTime).Seconds() + + return allResponses, llmResponseTime } // getTokenCount gets the token count for the given text using the specified model @@ -535,14 +549,14 @@ func LogRequestFailedDebugWithMessage(msg1, msg2 string, traceID string, spanID // Returns: // - isAuthenticated: true if the API key is authenticated, false otherwise // - childSpanID: the child span ID created for this operation -func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spanID string) (isAuthenticated bool, childSpanID string) { +func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spanID string) (isAuthenticated bool, childSpanID string, userID string) { ctx := &logging.ContextMap{} childSpanID = CreateChildSpan(ctx, traceID, spanID) // Check if the API key is empty if apiKey == "" { - logging.Log.Warnf(ctx, "API key is empty") - return false, childSpanID + logging.Log.Errorf(ctx, "API key is empty") + return false, childSpanID, "" } // Check if the API key exists in the KVDB @@ -553,7 +567,7 @@ func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spa } if !exists { logging.Log.Warnf(ctx, "API key does not exist in KVDB: %s", apiKey) - return false, childSpanID + return false, childSpanID, "" } // Unmarshal the JSON string into materials customer object @@ -566,11 +580,13 @@ func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spa // Check if customer is denied access if customer.AccessDenied { - logging.Log.Warnf(ctx, "Access denied for customer: %s", customer.CustomerName) - return false, childSpanID + logging.Log.Info(ctx, "Access denied for customer: %s", customer.UserID) + return false, childSpanID, customer.UserID } - return true, childSpanID + logging.Log.Infof(ctx, "Customer with UserID: %s is authenticated. Request trace ID: %s", customer.UserID, traceID) + + return true, childSpanID, customer.UserID } // UpdateTotalTokenCountForCustomerKvDb updates the total token count for a customer in the KVDB @@ -856,7 +872,7 @@ func ResetTokenCountIfNewMonth(kvdbEndpoint string, apiKey string, traceID strin // Handle case where LastUpdated is 0 (new customer) if customer.LastUpdated == 0 { - logging.Log.Debugf(ctx, "New customer with key %s. Setting initial timestamp.", customer.ApiKey) + logging.Log.Debugf(ctx, "New customer with key %s. Setting initial timestamp.", apiKey) // Don't save any history for initial setup, just set the timestamp customer.LastUpdated = now.Unix() @@ -875,7 +891,7 @@ func ResetTokenCountIfNewMonth(kvdbEndpoint string, apiKey string, traceID strin // Check if last updated is from a different month or year if now.Year() != lastUpdated.Year() || now.Month() != lastUpdated.Month() { logging.Log.Debugf(ctx, "Token count reset for customer %s. Last updated: %v, Current time: %v", - customer.ApiKey, lastUpdated, now) + apiKey, lastUpdated, now) historyEntry := materialsCustomerHistoryObject{ TotalTokenCount: customer.TotalTokenCount, @@ -884,7 +900,7 @@ func ResetTokenCountIfNewMonth(kvdbEndpoint string, apiKey string, traceID strin } customer.UsageHistory = append(customer.UsageHistory, historyEntry) logging.Log.Debugf(ctx, "Saved usage history for customer %s: %d tokens (limit: %d) at timestamp %d", - customer.ApiKey, customer.TotalTokenCount, customer.TokenLimit, customer.LastUpdated) + apiKey, customer.TotalTokenCount, customer.TokenLimit, customer.LastUpdated) // Reset token count to 0 and update timestamp customer.TotalTokenCount = 0 diff --git a/pkg/externalfunctions/types.go b/pkg/externalfunctions/types.go index d21f0ccc..09ad82e7 100644 --- a/pkg/externalfunctions/types.go +++ b/pkg/externalfunctions/types.go @@ -326,7 +326,7 @@ type kvdbErrorResponse struct { } type materialsCustomerObject struct { - ApiKey string `json:"api_key"` + UserID string `json:"user_id"` CustomerName string `json:"customer_name"` AccessDenied bool `json:"access_denied"` TotalTokenCount int `json:"total_token_usage"` From ef4e4145aa5541600039a7adee8442b7491db2c1 Mon Sep 17 00:00:00 2001 From: David Cha Date: Tue, 16 Sep 2025 11:57:09 +0100 Subject: [PATCH 2/2] MatBU: Fix .Info call has possible Printf formatting; --- pkg/externalfunctions/ansysmaterials.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/externalfunctions/ansysmaterials.go b/pkg/externalfunctions/ansysmaterials.go index 9af9c431..e5c7b403 100644 --- a/pkg/externalfunctions/ansysmaterials.go +++ b/pkg/externalfunctions/ansysmaterials.go @@ -580,7 +580,7 @@ func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spa // Check if customer is denied access if customer.AccessDenied { - logging.Log.Info(ctx, "Access denied for customer: %s", customer.UserID) + logging.Log.Infof(ctx, "Access denied for customer: %s", customer.UserID) return false, childSpanID, customer.UserID }