Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"@opentelemetry/auto-configuration-propagators": "0.3.2",
"@opentelemetry/auto-instrumentations-node": "0.56.0",
"@opentelemetry/api-events": "0.57.1",
"@opentelemetry/baggage-span-processor": "0.3.1",
"@opentelemetry/sdk-events": "0.57.1",
"@opentelemetry/sdk-logs": "0.57.1",
"@opentelemetry/core": "1.30.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ import { OTLPUdpSpanExporter } from './otlp-udp-exporter';
import { AwsXRayRemoteSampler } from './sampler/aws-xray-remote-sampler';
// This file is generated via `npm run compile`
import { LIB_VERSION } from './version';
import { isAgentObservabilityEnabled } from './utils';
import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor';
import { logs } from '@opentelemetry/api-logs';

const XRAY_OTLP_ENDPOINT_PATTERN = '^https://xray\\.([a-z0-9-]+)\\.amazonaws\\.com/v1/traces$';

Expand Down Expand Up @@ -223,7 +226,48 @@ export class AwsOpentelemetryConfigurator {
return isApplicationSignalsEnabled.toLowerCase() === 'true';
}

static exportUnsampledSpanForAgentObservability(spanProcessors: SpanProcessor[], resource: Resource): void {
if (!isAgentObservabilityEnabled()) {
return;
}

// Get the traces endpoint from environment
const tracesEndpoint = process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT;

if (!tracesEndpoint) {
// No traces endpoint configured, skip unsampled span export
diag.warn('No traces endpoint configured for agent observability unsampled spans');
return;
}

let spanExporter: SpanExporter;
// Create the appropriate span exporter based on the endpoint
if (isXrayOtlpEndpoint(tracesEndpoint)) {
spanExporter = new OTLPAwsSpanExporter(tracesEndpoint, undefined, logs.getLoggerProvider());
} else {
spanExporter = new OTLPAwsSpanExporter(tracesEndpoint);
}

// Add the unsampled span processor
spanProcessors.push(new AwsBatchUnsampledSpanProcessor(spanExporter));
}

static customizeSpanProcessors(spanProcessors: SpanProcessor[], resource: Resource): void {
if (isAgentObservabilityEnabled()) {
// We always send 100% spans to Genesis platform for agent observability because
// AI applications typically have low throughput traffic patterns and require
// comprehensive monitoring to catch subtle failure modes like hallucinations
// and quality degradation that sampling could miss.
this.exportUnsampledSpanForAgentObservability(spanProcessors, resource);

// Add session.id baggage attribute to span attributes to support AI Agent use cases
// enabling session ID tracking in spans.
const sessionIdPredicate = (baggageKey: string) => {
return baggageKey === 'session.id';
};
spanProcessors.push(new BaggageSpanProcessor(sessionIdPredicate));
}

if (!AwsOpentelemetryConfigurator.isApplicationSignalsEnabled()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import { setAwsDefaultEnvironmentVariables } from '../src/register';
import { AwsXRayRemoteSampler } from '../src/sampler/aws-xray-remote-sampler';
import { AwsXraySamplingClient } from '../src/sampler/aws-xray-sampling-client';
import { GetSamplingRulesResponse } from '../src/sampler/remote-sampler.types';
import { OTLPAwsSpanExporter } from '../src/otlp-aws-span-exporter';
import { BaggageSpanProcessor } from '@opentelemetry/baggage-span-processor';

// Tests AwsOpenTelemetryConfigurator after running Environment Variable setup in register.ts
describe('AwsOpenTelemetryConfiguratorTest', () => {
Expand Down Expand Up @@ -276,7 +278,10 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {

it('CustomizeSpanProcessorsTest', () => {
delete process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED;
const spanProcessors: SpanProcessor[] = [];
delete process.env.AGENT_OBSERVABILITY_ENABLED;

// Test application signals only
let spanProcessors: SpanProcessor[] = [];
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessors, Resource.empty());
expect(spanProcessors.length).toEqual(0);

Expand Down Expand Up @@ -310,6 +315,81 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {
spanProcessors.forEach(spanProcessor => {
spanProcessor.shutdown();
});

// Reset spanProcessors list for next set of tests
spanProcessors = [];

process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED = 'True';
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessors, Resource.empty());
expect(spanProcessors.length).toEqual(3);

// Verify processors are added in the expected order
expect(spanProcessors[0]).toBeInstanceOf(BaggageSpanProcessor);
expect(spanProcessors[1]).toBeInstanceOf(AttributePropagatingSpanProcessor);
expect(spanProcessors[2]).toBeInstanceOf(AwsSpanMetricsProcessor);

// shut down exporters for test cleanup
spanProcessors.forEach(spanProcessor => {
spanProcessor.shutdown();
});
delete process.env.AGENT_OBSERVABILITY_ENABLED;
delete process.env.OTEL_AWS_APPLICATION_SIGNALS_ENABLED;
});

it('CustomizeSpanProcessorsWithAgentObservabilityTest', () => {
const spanProcessorsToTest: SpanProcessor[] = [];

// Test that BaggageSpanProcessor is not added when agent observability is disabled
delete process.env.AGENT_OBSERVABILITY_ENABLED;
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessorsToTest, Resource.empty());
expect(spanProcessorsToTest).toEqual([]);

// Test that BaggageSpanProcessor is added when agent observability is enabled
process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessorsToTest, Resource.empty());
expect(spanProcessorsToTest.length).toEqual(1);

// Verify the added processor is BaggageSpanProcessor
const addedProcessor = spanProcessorsToTest[0];
expect(addedProcessor).toBeInstanceOf(BaggageSpanProcessor);

// Clean up
delete process.env.AGENT_OBSERVABILITY_ENABLED;
});

it('BaggageSpanProcessorSessionIdFilteringTest', () => {
// Set up agent observability
process.env.AGENT_OBSERVABILITY_ENABLED = 'true';

// Create a SpanProcessor list for this test
const spanProcessorsToTest: SpanProcessor[] = [];

// Add our span processors
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessorsToTest, Resource.empty());

// Verify that the BaggageSpanProcessor was added
const baggageProcessors = spanProcessorsToTest.filter(
processor => processor.constructor.name === 'BaggageSpanProcessor'
);
expect(baggageProcessors.length).toBe(1);

// Verify the predicate function only accepts session.id
const baggageProcessor = baggageProcessors[0];
expect(baggageProcessor).toBeInstanceOf(BaggageSpanProcessor);
const predicate = (baggageProcessor as BaggageSpanProcessor)['_keyPredicate'].bind(baggageProcessor);

// Test the predicate function directly
expect(predicate('session.id')).toBeTruthy();
expect(predicate('user.id')).toBeFalsy();
expect(predicate('request.id')).toBeFalsy();
expect(predicate('other.key')).toBeFalsy();
expect(predicate('')).toBeFalsy();
expect(predicate('session')).toBeFalsy();
expect(predicate('id')).toBeFalsy();

// Clean up
delete process.env.AGENT_OBSERVABILITY_ENABLED;
});

it('ApplicationSignalsExporterProviderTest', () => {
Expand Down Expand Up @@ -667,4 +747,75 @@ describe('AwsOpenTelemetryConfiguratorTest', () => {
// Cleanup
delete process.env.OTEL_EXPORTER_OTLP_TRACES_PROTOCOL;
});

it('ExportUnsampledSpanForAgentObservabilityTest', () => {
const spanProcessorsToTest: SpanProcessor[] = [];

// Test with agent observability disabled
AwsOpentelemetryConfigurator.exportUnsampledSpanForAgentObservability(spanProcessorsToTest, Resource.empty());
expect(spanProcessorsToTest).toEqual([]);

// Test with agent observability enabled
process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = 'https://xray.us-east-1.amazonaws.com/v1/traces';

AwsOpentelemetryConfigurator.exportUnsampledSpanForAgentObservability(spanProcessorsToTest, Resource.empty());
expect(spanProcessorsToTest.length).toEqual(1);

const processor = spanProcessorsToTest[0];
expect(processor).toBeInstanceOf(AwsBatchUnsampledSpanProcessor);

// Cleanup
delete process.env.AGENT_OBSERVABILITY_ENABLED;
delete process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT;
});

it('ExportUnsampledSpanForAgentObservabilityUsesOtlpAwsSpanExporterTest', () => {
const spanProcessorsToTest: SpanProcessor[] = [];

process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = 'https://xray.us-east-1.amazonaws.com/v1/traces';

AwsOpentelemetryConfigurator.exportUnsampledSpanForAgentObservability(spanProcessorsToTest, Resource.empty());

// Verify AwsBatchUnsampledSpanProcessor was created with the AWS exporter
expect(spanProcessorsToTest[0]).toBeInstanceOf(AwsBatchUnsampledSpanProcessor);
const otlpAwsSpanExporter = (spanProcessorsToTest[0] as AwsBatchUnsampledSpanProcessor)['_exporter'];

// Verify OTLPAwsSpanExporter was created with correct parameters
expect(otlpAwsSpanExporter).toBeInstanceOf(OTLPAwsSpanExporter);
expect(otlpAwsSpanExporter['endpoint']).toEqual('https://xray.us-east-1.amazonaws.com/v1/traces');
expect(otlpAwsSpanExporter['loggerProvider']).toBeDefined();

// Cleanup environment variables
delete process.env.AGENT_OBSERVABILITY_ENABLED;
delete process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT;
});

it('CustomizeSpanProcessorsCallsExportUnsampledSpanTest', () => {
const spanProcessorsToTest: SpanProcessor[] = [];

// Create spy for exportUnsampledSpanForAgentObservability
const exportUnsampledSpanSpy = sinon.spy(AwsOpentelemetryConfigurator, 'exportUnsampledSpanForAgentObservability');

try {
// Test that function is NOT called when agent observability is disabled
delete process.env.AGENT_OBSERVABILITY_ENABLED;
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessorsToTest, Resource.empty());
expect(exportUnsampledSpanSpy.called).toBeFalsy();

// Test that function is called when agent observability is enabled
exportUnsampledSpanSpy.resetHistory();
process.env.AGENT_OBSERVABILITY_ENABLED = 'true';
AwsOpentelemetryConfigurator.customizeSpanProcessors(spanProcessorsToTest, Resource.empty());
expect(exportUnsampledSpanSpy.calledOnce).toBeTruthy();
expect(exportUnsampledSpanSpy.calledWith(spanProcessorsToTest, Resource.empty())).toBeTruthy();
} finally {
// Restore original implementation
exportUnsampledSpanSpy.restore();

// Cleanup
delete process.env.AGENT_OBSERVABILITY_ENABLED;
}
});
});
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading