Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions apps/webapp/app/v3/otlpExporter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
import { logger } from "~/services/logger.server";
import { trace, Tracer } from "@opentelemetry/api";
import { startSpan } from "./tracing.server";
import { enrichCreatableEvents } from "./utils/enrichCreatableEvents.server";

export type OTLPExporterConfig = {
batchSize: number;
Expand Down Expand Up @@ -54,14 +55,16 @@ class OTLPExporter {
return convertSpansToCreateableEvents(resourceSpan);
});

this.#logEventsVerbose(events);
const enrichedEvents = enrichCreatableEvents(events);

span.setAttribute("event_count", events.length);
this.#logEventsVerbose(enrichedEvents);

span.setAttribute("event_count", enrichedEvents.length);

if (immediate) {
await this._eventRepository.insertManyImmediate(events);
await this._eventRepository.insertManyImmediate(enrichedEvents);
} else {
await this._eventRepository.insertMany(events);
await this._eventRepository.insertMany(enrichedEvents);
}

return ExportTraceServiceResponse.create();
Expand All @@ -79,14 +82,16 @@ class OTLPExporter {
return convertLogsToCreateableEvents(resourceLog);
});

this.#logEventsVerbose(events);
const enrichedEvents = enrichCreatableEvents(events);

this.#logEventsVerbose(enrichedEvents);

span.setAttribute("event_count", events.length);
span.setAttribute("event_count", enrichedEvents.length);

if (immediate) {
await this._eventRepository.insertManyImmediate(events);
await this._eventRepository.insertManyImmediate(enrichedEvents);
} else {
await this._eventRepository.insertMany(events);
await this._eventRepository.insertMany(enrichedEvents);
}

return ExportLogsServiceResponse.create();
Expand Down Expand Up @@ -135,16 +140,28 @@ class OTLPExporter {
(attribute) => attribute.key === SemanticInternalAttributes.TRIGGER
);

if (!triggerAttribute) {
const executionEnvironmentAttribute = resourceSpan.resource?.attributes.find(
(attribute) => attribute.key === SemanticInternalAttributes.EXECUTION_ENVIRONMENT
);

if (!triggerAttribute && !executionEnvironmentAttribute) {
logger.debug("Skipping resource span without trigger attribute", {
attributes: resourceSpan.resource?.attributes,
spans: resourceSpan.scopeSpans.flatMap((scopeSpan) => scopeSpan.spans),
});

return;
return true; // go ahead and let this resource span through
}

const executionEnvironment = isStringValue(executionEnvironmentAttribute?.value)
? executionEnvironmentAttribute.value.stringValue
: undefined;

if (executionEnvironment === "trigger") {
return true; // go ahead and let this resource span through
}

return isBoolValue(triggerAttribute.value) ? triggerAttribute.value.boolValue : false;
return isBoolValue(triggerAttribute?.value) ? triggerAttribute.value.boolValue : false;
});
}

Expand Down
67 changes: 67 additions & 0 deletions apps/webapp/app/v3/utils/enrichCreatableEvents.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { CreatableEvent } from "../eventRepository.server";

export function enrichCreatableEvents(events: CreatableEvent[]) {
return events.map((event) => {
return enrichCreatableEvent(event);
});
}

function enrichCreatableEvent(event: CreatableEvent): CreatableEvent {
const message = formatPythonStyle(event.message, event.properties);

event.message = message;
event.style = enrichStyle(event);

return event;
}

function enrichStyle(event: CreatableEvent) {
const baseStyle = event.style ?? {};
const props = event.properties;

// Direct property access and early returns
// GenAI System check
const system = props["gen_ai.system"];
if (typeof system === "string") {
return { ...baseStyle, icon: `tabler-brand-${system}` };
}

// Agent workflow check
const name = props["name"];
if (typeof name === "string" && name.includes("Agent workflow")) {
return { ...baseStyle, icon: "tabler-brain" };
}

return baseStyle;
}

function repr(value: any): string {
if (typeof value === "string") {
return `'${value}'`;
}
return String(value);
}

function formatPythonStyle(template: string, values: Record<string, any>): string {
// Early return if template is too long
if (template.length >= 256) {
return template;
}

// Early return if no template variables present
if (!template.includes("{")) {
return template;
}

return template.replace(/\{([^}]+?)(?:!r)?\}/g, (match, key) => {
const hasRepr = match.endsWith("!r}");
const actualKey = hasRepr ? key : key;
const value = values[actualKey];

if (value === undefined) {
return match;
}

return hasRepr ? repr(value) : String(value);
});
}
397 changes: 397 additions & 0 deletions apps/webapp/test/otlpExporter.test.ts

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion packages/core/src/v3/otel/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type Span, SpanStatusCode } from "@opentelemetry/api";
import { type Span, SpanStatusCode, context, propagation } from "@opentelemetry/api";

export function recordSpanException(span: Span, error: unknown) {
if (error instanceof Error) {
Expand All @@ -20,3 +20,10 @@ function sanitizeSpanError(error: Error) {

return sanitizedError;
}

export function carrierFromContext(): Record<string, string> {
const carrier = {};
propagation.inject(context.active(), carrier);

return carrier;
}
1 change: 1 addition & 0 deletions packages/core/src/v3/semanticInternalAttributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ export const SemanticInternalAttributes = {
RATE_LIMIT_RESET: "response.rateLimit.reset",
SPAN_ATTEMPT: "$span.attempt",
METRIC_EVENTS: "$metrics.events",
EXECUTION_ENVIRONMENT: "exec_env",
};
7 changes: 6 additions & 1 deletion packages/core/src/v3/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ export { PreciseWallClock as DurableClock } from "../clock/preciseWallClock.js";
export { getEnvVar, getNumberEnvVar } from "../utils/getEnv.js";
export { OtelTaskLogger, logLevels } from "../logger/taskLogger.js";
export { ConsoleInterceptor } from "../consoleInterceptor.js";
export { TracingSDK, type TracingDiagnosticLogLevel, recordSpanException } from "../otel/index.js";
export {
TracingSDK,
type TracingDiagnosticLogLevel,
recordSpanException,
carrierFromContext,
} from "../otel/index.js";
export { StandardResourceCatalog } from "../resource-catalog/standardResourceCatalog.js";
export {
TaskContextSpanProcessor,
Expand Down
65 changes: 58 additions & 7 deletions packages/python/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import {
AsyncIterableStream,
createAsyncIterableStreamFromAsyncIterable,
SemanticInternalAttributes,
taskContext,
} from "@trigger.dev/core/v3";
import { logger } from "@trigger.dev/sdk/v3";
import { carrierFromContext } from "@trigger.dev/core/v3/otel";
import assert from "node:assert";
import fs from "node:fs";
import { Result, x, Options as XOptions } from "tinyexec";
Expand All @@ -17,6 +19,8 @@ export const python = {
async run(scriptArgs: string[] = [], options: PythonExecOptions = {}): Promise<Result> {
const pythonBin = process.env.PYTHON_BIN_PATH || "python";

const carrier = carrierFromContext();

return await logger.trace(
"python.run()",
async (span) => {
Expand All @@ -27,6 +31,12 @@ export const python = {
env: {
...process.env,
...options.env,
TRACEPARENT: carrier["traceparent"],
OTEL_RESOURCE_ATTRIBUTES: `${
SemanticInternalAttributes.EXECUTION_ENVIRONMENT
}=trigger,${Object.entries(taskContext.attributes)
.map(([key, value]) => `${key}=${value}`)
.join(",")}`,
},
},
throwOnError: false, // Ensure errors are handled manually
Expand All @@ -50,7 +60,7 @@ export const python = {
attributes: {
pythonBin,
args: scriptArgs.join(" "),
[SemanticInternalAttributes.STYLE_ICON]: "brand-python",
[SemanticInternalAttributes.STYLE_ICON]: "python",
},
}
);
Expand All @@ -69,6 +79,8 @@ export const python = {
async (span) => {
span.setAttribute("scriptPath", scriptPath);

const carrier = carrierFromContext();

const result = await x(
process.env.PYTHON_BIN_PATH || "python",
[scriptPath, ...scriptArgs],
Expand All @@ -79,6 +91,13 @@ export const python = {
env: {
...process.env,
...options.env,
TRACEPARENT: carrier["traceparent"],
OTEL_RESOURCE_ATTRIBUTES: `${
SemanticInternalAttributes.EXECUTION_ENVIRONMENT
}=trigger,${Object.entries(taskContext.attributes)
.map(([key, value]) => `${key}=${value}`)
.join(",")}`,
OTEL_LOG_LEVEL: "DEBUG",
},
},
throwOnError: false,
Expand All @@ -93,7 +112,7 @@ export const python = {
throw new Error(
`${scriptPath} ${scriptArgs.join(" ")} exited with a non-zero code ${
result.exitCode
}:\n${result.stderr}`
}:\n${result.stdout}\n${result.stderr}`
);
}

Expand All @@ -104,7 +123,7 @@ export const python = {
pythonBin: process.env.PYTHON_BIN_PATH || "python",
scriptPath,
args: scriptArgs.join(" "),
[SemanticInternalAttributes.STYLE_ICON]: "brand-python",
[SemanticInternalAttributes.STYLE_ICON]: "python",
},
}
);
Expand All @@ -124,6 +143,8 @@ export const python = {
async (tempFilePath) => {
span.setAttribute("tempFilePath", tempFilePath);

const carrier = carrierFromContext();

const pythonBin = process.env.PYTHON_BIN_PATH || "python";
const result = await x(pythonBin, [tempFilePath], {
...options,
Expand All @@ -132,6 +153,12 @@ export const python = {
env: {
...process.env,
...options.env,
TRACEPARENT: carrier["traceparent"],
OTEL_RESOURCE_ATTRIBUTES: `${
SemanticInternalAttributes.EXECUTION_ENVIRONMENT
}=trigger,${Object.entries(taskContext.attributes)
.map(([key, value]) => `${key}=${value}`)
.join(",")}`,
},
},
throwOnError: false,
Expand All @@ -157,7 +184,7 @@ export const python = {
pythonBin: process.env.PYTHON_BIN_PATH || "python",
contentPreview:
scriptContent.substring(0, 100) + (scriptContent.length > 100 ? "..." : ""),
[SemanticInternalAttributes.STYLE_ICON]: "brand-python",
[SemanticInternalAttributes.STYLE_ICON]: "python",
},
}
);
Expand All @@ -167,13 +194,21 @@ export const python = {
run(scriptArgs: string[] = [], options: PythonExecOptions = {}): AsyncIterableStream<string> {
const pythonBin = process.env.PYTHON_BIN_PATH || "python";

const carrier = carrierFromContext();

const pythonProcess = x(pythonBin, scriptArgs, {
...options,
nodeOptions: {
...(options.nodeOptions || {}),
env: {
...process.env,
...options.env,
TRACEPARENT: carrier["traceparent"],
OTEL_RESOURCE_ATTRIBUTES: `${
SemanticInternalAttributes.EXECUTION_ENVIRONMENT
}=trigger,${Object.entries(taskContext.attributes)
.map(([key, value]) => `${key}=${value}`)
.join(",")}`,
},
},
throwOnError: false,
Expand All @@ -183,7 +218,7 @@ export const python = {
attributes: {
pythonBin,
args: scriptArgs.join(" "),
[SemanticInternalAttributes.STYLE_ICON]: "brand-python",
[SemanticInternalAttributes.STYLE_ICON]: "python",
},
});

Expand All @@ -206,13 +241,21 @@ export const python = {

const pythonBin = process.env.PYTHON_BIN_PATH || "python";

const carrier = carrierFromContext();

const pythonProcess = x(pythonBin, [scriptPath, ...scriptArgs], {
...options,
nodeOptions: {
...(options.nodeOptions || {}),
env: {
...process.env,
...options.env,
TRACEPARENT: carrier["traceparent"],
OTEL_RESOURCE_ATTRIBUTES: `${
SemanticInternalAttributes.EXECUTION_ENVIRONMENT
}=trigger,${Object.entries(taskContext.attributes)
.map(([key, value]) => `${key}=${value}`)
.join(",")}`,
},
},
throwOnError: false,
Expand All @@ -223,7 +266,7 @@ export const python = {
pythonBin,
scriptPath,
args: scriptArgs.join(" "),
[SemanticInternalAttributes.STYLE_ICON]: "brand-python",
[SemanticInternalAttributes.STYLE_ICON]: "python",
},
});

Expand All @@ -243,13 +286,21 @@ export const python = {

const pythonScriptPath = createTempFileSync(`script_${Date.now()}.py`, scriptContent);

const carrier = carrierFromContext();

const pythonProcess = x(pythonBin, [pythonScriptPath], {
...options,
nodeOptions: {
...(options.nodeOptions || {}),
env: {
...process.env,
...options.env,
TRACEPARENT: carrier["traceparent"],
OTEL_RESOURCE_ATTRIBUTES: `${
SemanticInternalAttributes.EXECUTION_ENVIRONMENT
}=trigger,${Object.entries(taskContext.attributes)
.map(([key, value]) => `${key}=${value}`)
.join(",")}`,
},
},
throwOnError: false,
Expand All @@ -260,7 +311,7 @@ export const python = {
pythonBin,
contentPreview:
scriptContent.substring(0, 100) + (scriptContent.length > 100 ? "..." : ""),
[SemanticInternalAttributes.STYLE_ICON]: "brand-python",
[SemanticInternalAttributes.STYLE_ICON]: "python",
},
});

Expand Down
Loading