Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions pkg/externalfunctions/ansysmaterials.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,32 @@ type DDTags string
const (
workflowTag DDTags = "dd.workflow"
LLMAssistedSelection DDTags = "LLMAssistedSelection"
apiKeyTag DDTags = "dd.api_keyVisible"
userIDTag DDTags = "dd.user_idVisible"
traceIDTag DDTags = "dd.trace_idVisible"
spanIDTag DDTags = "dd.span_idVisible"
parentIDTag DDTags = "dd.parent_idVisible"
totalTokenUsageTag DDTags = "dd.total_token_usageVisible"
inputTokenUsageTag DDTags = "dd.input_token_usageVisible"
outputTokenUsageTag DDTags = "dd.output_token_usageVisible"
llmResponseTimeTag DDTags = "dd.llm_response_time_secondsVisible"
)

// StartTrace generates a new trace ID and span ID for tracing
//
// Tags:
// - @displayName: Start new trace
//
// Parameters:
// - apiKey: the api key used for authentication
//
// Returns:
// - traceID: a 128-bit trace ID in decimal format
// - spanID: a 64-bit span ID in decimal format
func StartTrace(apiKey string) (traceID string, spanID string) {
func StartTrace() (traceID string, spanID string) {
traceID = generateTraceID()
spanID = generateSpanID()
ctx := &logging.ContextMap{}
ctx.Set(logging.ContextKey(workflowTag), LLMAssistedSelection)
ctx.Set(logging.ContextKey(apiKeyTag), apiKey)
ctx.Set(logging.ContextKey(traceIDTag), traceID)
ctx.Set(logging.ContextKey(spanIDTag), spanID)
logging.Log.Infof(ctx, "Starting new trace for ApiKey: %s with trace ID: %s and span ID: %s", apiKey, traceID, spanID)
logging.Log.Infof(ctx, "Starting new trace with trace ID: %s and span ID: %s", traceID, spanID)

return traceID, spanID
}
Expand Down Expand Up @@ -109,8 +109,6 @@ func CreateChildSpan(ctx *logging.ContextMap, traceID string, parentSpanID strin
ctx.Set(logging.ContextKey(spanIDTag), childSpanID)
ctx.Set(logging.ContextKey(parentIDTag), parentSpanID)

logging.Log.Infof(ctx, "Starting child span with trace ID: %s, span ID: %s, and parent span ID: %s", traceID, childSpanID, parentSpanID)

return childSpanID
}

Expand Down Expand Up @@ -317,7 +315,7 @@ func ExtractCriteriaSuggestions(llmResponse string, traceID string, spanID strin
// - tokenCount: the total token count (input tokens × n + combined output tokens)
// - childSpanID: the child span ID created for this operation
func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(input string, history []sharedtypes.HistoricMessage,
systemPrompt string, modelIds []string, tokenCountModelName string, n int, temperature float64, traceID string, spanID string) (uniqueCriterion []sharedtypes.MaterialLlmCriterion, tokenCount int, childSpanID string) {
systemPrompt string, modelIds []string, tokenCountModelName string, n int, temperature float64, traceID string, spanID string, userID string) (uniqueCriterion []sharedtypes.MaterialLlmCriterion, tokenCount int, childSpanID string) {
ctx := &logging.ContextMap{}
childSpanID = CreateChildSpan(ctx, traceID, spanID)

Expand Down Expand Up @@ -351,7 +349,7 @@ func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(inp
logging.Log.Debugf(ctx, "User prompt: %s", input)

// Collect all responses with child span for parallel execution
allResponses := runRequestsInParallel(n, sendRequest, traceID, childSpanID)
allResponses, llmResponseTime := runRequestsInParallel(n, sendRequest, traceID, childSpanID)

// Extract criteria from all responses with child span
var allCriteria []sharedtypes.MaterialLlmCriterion
Expand All @@ -376,8 +374,18 @@ func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(inp
logging.Log.Debugf(ctx, "Output token count: %d", outputTokenCount)
logging.Log.Debugf(ctx, "Total token count: %d", totalTokenCount)

// Set the token count and userID as tag to be able to pick them up from Datadog without requiring a pipeline
var tokenCounterContext = &logging.ContextMap{}
CreateChildSpan(tokenCounterContext, traceID, spanID)
tokenCounterContext.Set(logging.ContextKey(userIDTag), userID)
tokenCounterContext.Set(logging.ContextKey(totalTokenUsageTag), totalTokenCount)
tokenCounterContext.Set(logging.ContextKey(inputTokenUsageTag), inputTokenCount)
tokenCounterContext.Set(logging.ContextKey(outputTokenUsageTag), outputTokenCount)
tokenCounterContext.Set(logging.ContextKey(llmResponseTimeTag), fmt.Sprintf("%.2f", llmResponseTime))
logging.Log.Infof(tokenCounterContext, "Token usage for UserID %s - Input tokens: %d, Output tokens: %d, Total tokens: %d. Response time: %.2f seconds", userID, inputTokenCount, outputTokenCount, totalTokenCount, llmResponseTime)

if len(allCriteria) == 0 {
logging.Log.Debugf(ctx, "No valid criteria found in any response")
logging.Log.Infof(ctx, "No valid criteria found in any response")
return []sharedtypes.MaterialLlmCriterion{}, outputTokenCount, childSpanID
}

Expand All @@ -387,9 +395,12 @@ func PerformMultipleGeneralRequestsAndExtractAttributesWithOpenAiTokenOutput(inp
return uniqueCriterion, totalTokenCount, childSpanID
}

func runRequestsInParallel(n int, sendRequest func() string, traceID string, spanID string) []string {
func runRequestsInParallel(n int, sendRequest func() string, traceID string, spanID string) ([]string, float64) {
ctx := &logging.ContextMap{}
_ = CreateChildSpan(ctx, traceID, spanID)

startTime := time.Now()

responseChan := make(chan string, n)
var wg sync.WaitGroup

Expand Down Expand Up @@ -417,7 +428,10 @@ func runRequestsInParallel(n int, sendRequest func() string, traceID string, spa
logging.Log.Debugf(ctx, "Raw LLM response: %s", response)
allResponses = append(allResponses, response)
}
return allResponses

llmResponseTime := time.Since(startTime).Seconds()

return allResponses, llmResponseTime
}

// getTokenCount gets the token count for the given text using the specified model
Expand Down Expand Up @@ -535,14 +549,14 @@ func LogRequestFailedDebugWithMessage(msg1, msg2 string, traceID string, spanID
// Returns:
// - isAuthenticated: true if the API key is authenticated, false otherwise
// - childSpanID: the child span ID created for this operation
func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spanID string) (isAuthenticated bool, childSpanID string) {
func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spanID string) (isAuthenticated bool, childSpanID string, userID string) {
ctx := &logging.ContextMap{}
childSpanID = CreateChildSpan(ctx, traceID, spanID)

// Check if the API key is empty
if apiKey == "" {
logging.Log.Warnf(ctx, "API key is empty")
return false, childSpanID
logging.Log.Errorf(ctx, "API key is empty")
return false, childSpanID, ""
}

// Check if the API key exists in the KVDB
Expand All @@ -553,7 +567,7 @@ func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spa
}
if !exists {
logging.Log.Warnf(ctx, "API key does not exist in KVDB: %s", apiKey)
return false, childSpanID
return false, childSpanID, ""
}

// Unmarshal the JSON string into materials customer object
Expand All @@ -566,11 +580,13 @@ func CheckApiKeyAuthKvDb(kvdbEndpoint string, apiKey string, traceID string, spa

// Check if customer is denied access
if customer.AccessDenied {
logging.Log.Warnf(ctx, "Access denied for customer: %s", customer.CustomerName)
return false, childSpanID
logging.Log.Infof(ctx, "Access denied for customer: %s", customer.UserID)
return false, childSpanID, customer.UserID
}

return true, childSpanID
logging.Log.Infof(ctx, "Customer with UserID: %s is authenticated. Request trace ID: %s", customer.UserID, traceID)

return true, childSpanID, customer.UserID
}

// UpdateTotalTokenCountForCustomerKvDb updates the total token count for a customer in the KVDB
Expand Down Expand Up @@ -856,7 +872,7 @@ func ResetTokenCountIfNewMonth(kvdbEndpoint string, apiKey string, traceID strin

// Handle case where LastUpdated is 0 (new customer)
if customer.LastUpdated == 0 {
logging.Log.Debugf(ctx, "New customer with key %s. Setting initial timestamp.", customer.ApiKey)
logging.Log.Debugf(ctx, "New customer with key %s. Setting initial timestamp.", apiKey)
// Don't save any history for initial setup, just set the timestamp
customer.LastUpdated = now.Unix()

Expand All @@ -875,7 +891,7 @@ func ResetTokenCountIfNewMonth(kvdbEndpoint string, apiKey string, traceID strin
// Check if last updated is from a different month or year
if now.Year() != lastUpdated.Year() || now.Month() != lastUpdated.Month() {
logging.Log.Debugf(ctx, "Token count reset for customer %s. Last updated: %v, Current time: %v",
customer.ApiKey, lastUpdated, now)
apiKey, lastUpdated, now)

historyEntry := materialsCustomerHistoryObject{
TotalTokenCount: customer.TotalTokenCount,
Expand All @@ -884,7 +900,7 @@ func ResetTokenCountIfNewMonth(kvdbEndpoint string, apiKey string, traceID strin
}
customer.UsageHistory = append(customer.UsageHistory, historyEntry)
logging.Log.Debugf(ctx, "Saved usage history for customer %s: %d tokens (limit: %d) at timestamp %d",
customer.ApiKey, customer.TotalTokenCount, customer.TokenLimit, customer.LastUpdated)
apiKey, customer.TotalTokenCount, customer.TokenLimit, customer.LastUpdated)

// Reset token count to 0 and update timestamp
customer.TotalTokenCount = 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/externalfunctions/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ type kvdbErrorResponse struct {
}

type materialsCustomerObject struct {
ApiKey string `json:"api_key"`
UserID string `json:"user_id"`
CustomerName string `json:"customer_name"`
AccessDenied bool `json:"access_denied"`
TotalTokenCount int `json:"total_token_usage"`
Expand Down