Skip to content

Commit 73e7378

Browse files
authored
feat(webapp): add ecs otel resource attributes (#2454)
* add attributes to runlock dequeue span * feat(webapp): add ecs otel resource attributes
1 parent b0b0df6 commit 73e7378

File tree

4 files changed

+297
-13
lines changed

4 files changed

+297
-13
lines changed
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import { z } from "zod";
2+
import type { ResourceAttributes } from "@opentelemetry/resources";
3+
import {
4+
SEMRESATTRS_AWS_ECS_CLUSTER_ARN,
5+
SEMRESATTRS_AWS_ECS_CONTAINER_ARN,
6+
SEMRESATTRS_AWS_ECS_LAUNCHTYPE,
7+
SEMRESATTRS_AWS_ECS_TASK_ARN,
8+
SEMRESATTRS_AWS_ECS_TASK_FAMILY,
9+
SEMRESATTRS_AWS_ECS_TASK_REVISION,
10+
SEMRESATTRS_AWS_LOG_GROUP_NAMES,
11+
SEMRESATTRS_AWS_LOG_STREAM_NAMES,
12+
SEMRESATTRS_CLOUD_AVAILABILITY_ZONE,
13+
SEMRESATTRS_CLOUD_PLATFORM,
14+
SEMRESATTRS_CLOUD_PROVIDER,
15+
SEMRESATTRS_CLOUD_REGION,
16+
SEMRESATTRS_CONTAINER_NAME,
17+
SEMRESATTRS_CONTAINER_ID,
18+
SEMRESATTRS_CONTAINER_IMAGE_NAME,
19+
SEMRESATTRS_CONTAINER_IMAGE_TAG,
20+
CLOUDPLATFORMVALUES_AWS_ECS,
21+
CLOUDPROVIDERVALUES_AWS,
22+
} from "@opentelemetry/semantic-conventions";
23+
import { tryCatch } from "@trigger.dev/core/utils";
24+
import { logger } from "~/services/logger.server";
25+
26+
// Minimal schema for ECS task metadata
27+
const ECSTaskMetadataSchema = z.object({
28+
Cluster: z.string().optional(),
29+
TaskARN: z.string().optional(),
30+
Family: z.string().optional(),
31+
Revision: z.string().optional(),
32+
AvailabilityZone: z.string().optional(),
33+
LaunchType: z.string().optional(),
34+
ServiceName: z.string().optional(),
35+
});
36+
37+
const ECSLogOptions = z.object({
38+
"awslogs-group": z.string().optional(),
39+
"awslogs-region": z.string().optional(),
40+
"awslogs-stream": z.string().optional(),
41+
mode: z.string().optional(),
42+
});
43+
44+
// Minimal schema for container metadata
45+
const ECSContainerMetadataSchema = z.object({
46+
DockerId: z.string().optional(),
47+
Name: z.string().optional(),
48+
Image: z.string().optional(),
49+
ImageID: z.string().optional(),
50+
ContainerARN: z.string().optional(),
51+
LogOptions: ECSLogOptions.optional(),
52+
});
53+
54+
// Cache for ECS metadata to avoid repeated fetches
55+
let ecsMetadataCache: ResourceAttributes | null = null;
56+
57+
/**
58+
* Fetches ECS task metadata from the v4 endpoint
59+
*/
60+
async function fetchECSTaskMetadata(metadataUri: string): Promise<ResourceAttributes> {
61+
const [error, response] = await tryCatch(
62+
fetch(`${metadataUri}/task`, {
63+
signal: AbortSignal.timeout(5000),
64+
})
65+
);
66+
67+
if (error) {
68+
logger.warn("Failed to fetch ECS task metadata", { error });
69+
return {};
70+
}
71+
72+
if (!response.ok) {
73+
logger.warn("ECS task metadata fetch failed", { status: response.status });
74+
return {};
75+
}
76+
77+
const [jsonError, taskJson] = await tryCatch(response.json());
78+
if (jsonError) {
79+
logger.warn("Failed to parse ECS task metadata", { error: jsonError });
80+
return {};
81+
}
82+
83+
const parseResult = ECSTaskMetadataSchema.safeParse(taskJson);
84+
if (!parseResult.success) {
85+
logger.warn("ECS task metadata validation issues", { issues: parseResult.error.issues });
86+
return {};
87+
}
88+
89+
const taskData = parseResult.data;
90+
const attributes: ResourceAttributes = {};
91+
92+
if (taskData.TaskARN) {
93+
attributes[SEMRESATTRS_AWS_ECS_TASK_ARN] = taskData.TaskARN;
94+
}
95+
96+
if (taskData.Cluster) {
97+
attributes[SEMRESATTRS_AWS_ECS_CLUSTER_ARN] = taskData.Cluster;
98+
}
99+
100+
if (taskData.LaunchType) {
101+
attributes[SEMRESATTRS_AWS_ECS_LAUNCHTYPE] = taskData.LaunchType;
102+
}
103+
104+
if (taskData.Family) {
105+
attributes[SEMRESATTRS_AWS_ECS_TASK_FAMILY] = taskData.Family;
106+
}
107+
108+
if (taskData.Revision) {
109+
attributes[SEMRESATTRS_AWS_ECS_TASK_REVISION] = taskData.Revision;
110+
}
111+
112+
if (taskData.AvailabilityZone) {
113+
attributes[SEMRESATTRS_CLOUD_AVAILABILITY_ZONE] = taskData.AvailabilityZone;
114+
}
115+
116+
if (taskData.ServiceName) {
117+
// Custom attribute for ECS service name
118+
attributes["aws.ecs.service.name"] = taskData.ServiceName;
119+
}
120+
121+
return attributes;
122+
}
123+
124+
/**
125+
* Fetches ECS container metadata from the v4 endpoint
126+
*/
127+
async function fetchECSContainerMetadata(metadataUri: string): Promise<ResourceAttributes> {
128+
const [error, response] = await tryCatch(
129+
fetch(metadataUri, {
130+
signal: AbortSignal.timeout(5000),
131+
})
132+
);
133+
134+
if (error) {
135+
logger.warn("Failed to fetch ECS container metadata", { error });
136+
return {};
137+
}
138+
139+
if (!response.ok) {
140+
logger.warn("ECS container metadata fetch failed", { status: response.status });
141+
return {};
142+
}
143+
144+
const [jsonError, containerJson] = await tryCatch(response.json());
145+
if (jsonError) {
146+
logger.warn("Failed to parse ECS container metadata", { error: jsonError });
147+
return {};
148+
}
149+
150+
const parseResult = ECSContainerMetadataSchema.safeParse(containerJson);
151+
if (!parseResult.success) {
152+
logger.warn("ECS container metadata validation issues", { issues: parseResult.error.issues });
153+
return {};
154+
}
155+
156+
const containerData = parseResult.data;
157+
const attributes: ResourceAttributes = {};
158+
159+
if (containerData.Name) {
160+
attributes[SEMRESATTRS_CONTAINER_NAME] = containerData.Name;
161+
}
162+
163+
if (containerData.DockerId) {
164+
attributes[SEMRESATTRS_CONTAINER_ID] = containerData.DockerId;
165+
}
166+
167+
if (containerData.Image) {
168+
const [name, tag] = containerData.Image.split(":");
169+
170+
if (name) {
171+
attributes[SEMRESATTRS_CONTAINER_IMAGE_NAME] = name;
172+
}
173+
174+
if (tag) {
175+
attributes[SEMRESATTRS_CONTAINER_IMAGE_TAG] = tag;
176+
}
177+
}
178+
179+
if (containerData.ImageID) {
180+
// Custom attribute for image ID
181+
attributes["container.image.id"] = containerData.ImageID;
182+
}
183+
184+
if (containerData.ContainerARN) {
185+
attributes[SEMRESATTRS_AWS_ECS_CONTAINER_ARN] = containerData.ContainerARN;
186+
}
187+
188+
const logOptions = containerData.LogOptions;
189+
if (logOptions?.["awslogs-group"]) {
190+
attributes[SEMRESATTRS_AWS_LOG_GROUP_NAMES] = [logOptions["awslogs-group"]];
191+
}
192+
if (logOptions?.["awslogs-stream"]) {
193+
attributes[SEMRESATTRS_AWS_LOG_STREAM_NAMES] = [logOptions["awslogs-stream"]];
194+
}
195+
if (logOptions?.mode) {
196+
// Custom attribute for log mode
197+
attributes["aws.log.mode"] = [logOptions.mode];
198+
}
199+
200+
return attributes;
201+
}
202+
203+
/**
204+
* Fetches ECS metadata from the Task Metadata Endpoint V4
205+
* Returns resource attributes for OpenTelemetry
206+
*/
207+
async function fetchECSMetadata(metadataUri: string): Promise<ResourceAttributes> {
208+
// Return cached metadata if available
209+
if (ecsMetadataCache !== null) {
210+
return ecsMetadataCache;
211+
}
212+
213+
if (!metadataUri) {
214+
// Not running in ECS
215+
ecsMetadataCache = {};
216+
return ecsMetadataCache;
217+
}
218+
219+
// Fetch task metadata and CloudWatch logs config in parallel
220+
const [taskAttributes, containerAttributes] = await Promise.all([
221+
fetchECSTaskMetadata(metadataUri),
222+
fetchECSContainerMetadata(metadataUri),
223+
]);
224+
225+
const attributes: ResourceAttributes = {
226+
[SEMRESATTRS_CLOUD_PROVIDER]: CLOUDPROVIDERVALUES_AWS,
227+
[SEMRESATTRS_CLOUD_PLATFORM]: CLOUDPLATFORMVALUES_AWS_ECS,
228+
...taskAttributes,
229+
...containerAttributes,
230+
};
231+
232+
const region = process.env.AWS_REGION || process.env.AWS_DEFAULT_REGION;
233+
if (region) {
234+
attributes[SEMRESATTRS_CLOUD_REGION] = region;
235+
}
236+
237+
logger.info("🔦 Fetched ECS metadata", { attributes });
238+
239+
ecsMetadataCache = attributes;
240+
return attributes;
241+
}
242+
243+
/**
244+
* Fetches async resource attributes
245+
* Designed to be used with the Resource constructor's asyncAttributesPromise parameter
246+
*
247+
* Usage:
248+
* ```
249+
* new Resource(
250+
* { [SEMRESATTRS_SERVICE_NAME]: 'my-service' },
251+
* getAsyncResourceAttributes()
252+
* )
253+
* ```
254+
*/
255+
export async function getAsyncResourceAttributes(): Promise<ResourceAttributes> {
256+
const metadataUri = process.env.ECS_CONTAINER_METADATA_URI_V4;
257+
258+
if (!metadataUri) {
259+
return {};
260+
}
261+
262+
return fetchECSMetadata(metadataUri);
263+
}

apps/webapp/app/v3/tracer.server.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
4141
import {
4242
SEMRESATTRS_SERVICE_INSTANCE_ID,
4343
SEMRESATTRS_SERVICE_NAME,
44+
SEMRESATTRS_HOST_NAME,
4445
} from "@opentelemetry/semantic-conventions";
4546
import { PrismaInstrumentation } from "@prisma/instrumentation";
4647
import { env } from "~/env.server";
@@ -51,6 +52,8 @@ import { logger } from "~/services/logger.server";
5152
import { flattenAttributes } from "@trigger.dev/core/v3";
5253
import { randomUUID } from "node:crypto";
5354
import { prisma } from "~/db.server";
55+
import { hostname } from "node:os";
56+
import { getAsyncResourceAttributes } from "./telemetry/asyncResourceAttributes.server";
5457

5558
export const SEMINTATTRS_FORCE_RECORDING = "forceRecording";
5659

@@ -168,6 +171,17 @@ export async function emitWarnLog(message: string, params: Record<string, unknow
168171
});
169172
}
170173

174+
function getResource() {
175+
return new Resource(
176+
{
177+
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
178+
[SEMRESATTRS_SERVICE_INSTANCE_ID]: SERVICE_INSTANCE_ID,
179+
[SEMRESATTRS_HOST_NAME]: hostname(),
180+
},
181+
getAsyncResourceAttributes()
182+
);
183+
}
184+
171185
function setupTelemetry() {
172186
if (env.INTERNAL_OTEL_TRACE_DISABLED === "1") {
173187
console.log(`🔦 Tracer disabled, returning a noop tracer`);
@@ -186,10 +200,7 @@ function setupTelemetry() {
186200

187201
const provider = new NodeTracerProvider({
188202
forceFlushTimeoutMillis: 15_000,
189-
resource: new Resource({
190-
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
191-
[SEMRESATTRS_SERVICE_INSTANCE_ID]: SERVICE_INSTANCE_ID,
192-
}),
203+
resource: getResource(),
193204
sampler: new ParentBasedSampler({
194205
root: new CustomWebappSampler(new TraceIdRatioBasedSampler(samplingRate)),
195206
}),
@@ -239,9 +250,7 @@ function setupTelemetry() {
239250
});
240251

241252
const loggerProvider = new LoggerProvider({
242-
resource: new Resource({
243-
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
244-
}),
253+
resource: getResource(),
245254
logRecordLimits: {
246255
attributeCountLimit: 1000,
247256
},
@@ -296,10 +305,7 @@ function setupMetrics() {
296305
const exporter = createMetricsExporter();
297306

298307
const meterProvider = new MeterProvider({
299-
resource: new Resource({
300-
[SEMRESATTRS_SERVICE_NAME]: env.SERVICE_NAME,
301-
[SEMRESATTRS_SERVICE_INSTANCE_ID]: SERVICE_INSTANCE_ID,
302-
}),
308+
resource: getResource(),
303309
readers: [
304310
new PeriodicExportingMetricReader({
305311
exporter,

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,12 @@ export class RunLocker {
144144
}
145145

146146
/** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */
147-
async lock<T>(name: string, resources: string[], routine: () => Promise<T>): Promise<T> {
147+
async lock<T>(
148+
name: string,
149+
resources: string[],
150+
routine: () => Promise<T>,
151+
attributes?: Attributes
152+
): Promise<T> {
148153
const currentContext = this.asyncLocalStorage.getStore();
149154
const joinedResources = [...resources].sort().join(",");
150155

@@ -187,7 +192,7 @@ export class RunLocker {
187192
return result;
188193
},
189194
{
190-
attributes: { name, resources, timeout: this.duration },
195+
attributes: { name, resources, timeout: this.duration, ...attributes },
191196
}
192197
);
193198
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,16 @@ export class DequeueSystem {
573573
},
574574
placementTags: [placementTag("paid", isPaying ? "true" : "false")],
575575
} satisfies DequeuedMessage;
576+
},
577+
{
578+
run_id: runId,
579+
org_id: orgId,
580+
environment_id: message.message.environmentId,
581+
environment_type: message.message.environmentType,
582+
worker_queue_length: message.workerQueueLength ?? 0,
583+
consumer_id: consumerId,
584+
worker_queue: workerQueue,
585+
blocking_pop: blockingPop ?? true,
576586
}
577587
);
578588

0 commit comments

Comments
 (0)