Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ export class FallbackSampler implements Sampler {
private fixedRateSampler: TraceIdRatioBasedSampler;
private rateLimitingSampler: RateLimitingSampler;

constructor() {
this.fixedRateSampler = new TraceIdRatioBasedSampler(0.05);
this.rateLimitingSampler = new RateLimitingSampler(1);
constructor(ratio: number = 0.05, quota: number = 1) {
this.fixedRateSampler = new TraceIdRatioBasedSampler(ratio);
this.rateLimitingSampler = new RateLimitingSampler(quota);
}

shouldSample(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,24 @@ export class RuleCache {

// Update ruleAppliers based on the targets fetched from X-Ray service
public updateTargets(targetDocuments: TargetMap, lastRuleModification: number): [boolean, number] {
let minPollingInteral: number | undefined = undefined;
let minPollingInterval: number | undefined = undefined;
let nextPollingInterval: number = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS;
this.ruleAppliers.forEach((rule: SamplingRuleApplier, index: number) => {
const target: SamplingTargetDocument = targetDocuments[rule.samplingRule.RuleName];
if (target) {
this.ruleAppliers[index] = rule.withTarget(target);
if (target.Interval) {
if (minPollingInteral === undefined || minPollingInteral > target.Interval) {
minPollingInteral = target.Interval;
if (typeof target.Interval === 'number') {
if (minPollingInterval === undefined || minPollingInterval > target.Interval) {
minPollingInterval = target.Interval;
}
}
} else {
diag.debug('Invalid sampling target: missing rule name');
}
});

if (minPollingInteral) {
nextPollingInterval = minPollingInteral;
if (typeof minPollingInterval === 'number') {
nextPollingInterval = minPollingInterval;
}

const refreshSamplingRules: boolean = lastRuleModification * 1000 > this.lastUpdatedEpochMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ export class SamplingRuleApplier {

if (target) {
this.borrowingEnabled = false;
if (target.ReservoirQuota) {
if (typeof target.ReservoirQuota === 'number') {
this.reservoirSampler = new RateLimitingSampler(target.ReservoirQuota);
}

if (target.ReservoirQuotaTTL) {
if (typeof target.ReservoirQuotaTTL === 'number') {
this.reservoirExpiryTimeInMillis = new Date(target.ReservoirQuotaTTL * 1000).getTime();
} else {
this.reservoirExpiryTimeInMillis = Date.now();
}

if (target.FixedRate) {
if (typeof target.FixedRate === 'number') {
this.fixedRateSampler = new TraceIdRatioBasedSampler(target.FixedRate);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { context, Span, SpanKind } from '@opentelemetry/api';
import { Attributes, Context, context, Link, Span, SpanKind } from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';
import { SamplingDecision, Tracer } from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
Expand All @@ -10,12 +10,18 @@ import { expect } from 'expect';
import * as nock from 'nock';
import * as sinon from 'sinon';
import { _AwsXRayRemoteSampler, AwsXRayRemoteSampler } from '../../src/sampler/aws-xray-remote-sampler';
import { FallbackSampler } from '../../src/sampler/fallback-sampler';

const DATA_DIR_SAMPLING_RULES = __dirname + '/data/test-remote-sampler_sampling-rules-response-sample.json';
const DATA_DIR_SAMPLING_TARGETS = __dirname + '/data/test-remote-sampler_sampling-targets-response-sample.json';
const TEST_URL = 'http://localhost:2000';
export const testTraceId = '0af7651916cd43dd8448eb211c80319c';

describe('AwsXrayRemoteSampler', () => {
afterEach(() => {
sinon.restore();
});

it('testCreateRemoteSamplerWithEmptyResource', () => {
const sampler: AwsXRayRemoteSampler = new AwsXRayRemoteSampler({ resource: Resource.EMPTY });

Expand Down Expand Up @@ -81,21 +87,21 @@ describe('AwsXrayRemoteSampler', () => {
setTimeout(() => {
expect(((sampler as any)._root._root.ruleCache as any).ruleAppliers[0].samplingRule.RuleName).toEqual('test');
expect(
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
).toEqual(SamplingDecision.NOT_RECORD);

setTimeout(() => {
// restore function
(_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval = tmp;

expect(
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
).toEqual(SamplingDecision.RECORD_AND_SAMPLED);
expect(
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
).toEqual(SamplingDecision.RECORD_AND_SAMPLED);
expect(
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, { abc: '1234' }, []).decision
).toEqual(SamplingDecision.RECORD_AND_SAMPLED);

done();
Expand All @@ -112,40 +118,38 @@ describe('AwsXrayRemoteSampler', () => {
});
const attributes = { abc: '1234' };

// Patch default target polling interval
const tmp = (_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval;
(_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval = () => {
return 0.2; // seconds
};
const sampler = new AwsXRayRemoteSampler({
resource: resource,
});
const internalXraySampler = sampler['_root']['_root'] as _AwsXRayRemoteSampler;
internalXraySampler['getAndUpdateSamplingRules']();

setTimeout(() => {
expect(((sampler as any)._root._root.ruleCache as any).ruleAppliers[0].samplingRule.RuleName).toEqual('test');
expect(sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, attributes, []).decision).toEqual(
SamplingDecision.NOT_RECORD
);
expect(
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, attributes, []).decision
).toEqual(SamplingDecision.NOT_RECORD);
internalXraySampler['getAndUpdateSamplingTargets']();

setTimeout(() => {
let sampled = 0;
for (let i = 0; i < 100000; i++) {
const clock = sinon.useFakeTimers(Date.now());
clock.tick(1000);
for (let i = 0; i < 1000; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, attributes, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, attributes, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled++;
}
}
clock.restore();

// restore function
(_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval = tmp;

expect((sampler as any)._root._root.ruleCache.ruleAppliers[0].reservoirSampler.quota).toEqual(100000);
expect(sampled).toEqual(100000);
expect((sampler as any)._root._root.ruleCache.ruleAppliers[0].reservoirSampler.quota).toEqual(1000);
expect(sampled).toEqual(1000);
done();
}, 2000);
}, 100);
}, 300);
}, 300);
});

it('testSomeReservoir', done => {
Expand All @@ -157,39 +161,54 @@ describe('AwsXrayRemoteSampler', () => {
});
const attributes = { abc: 'non-matching attribute value, use default rule' };

// Patch default target polling interval
const tmp = (_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval;
(_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval = () => {
return 2; // seconds
};
const sampler = new AwsXRayRemoteSampler({
resource: resource,
});
const internalXraySampler = sampler['_root']['_root'] as _AwsXRayRemoteSampler;
sinon
.stub(sampler['_root']['_root'].fallbackSampler as FallbackSampler, 'shouldSample')
.callsFake(
(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
) => {
return {
decision: SamplingDecision.NOT_RECORD,
attributes: attributes,
};
}
);

internalXraySampler['getAndUpdateSamplingRules']();
setTimeout(() => {
expect(((sampler as any)._root._root.ruleCache as any).ruleAppliers[0].samplingRule.RuleName).toEqual('test');
expect(sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, attributes, []).decision).toEqual(
SamplingDecision.NOT_RECORD
);
expect(
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, attributes, []).decision
).toEqual(SamplingDecision.RECORD_AND_SAMPLED);

internalXraySampler['getAndUpdateSamplingTargets']();

setTimeout(() => {
const clock = sinon.useFakeTimers(Date.now());
clock.tick(2000);
clock.tick(1000);
let sampled = 0;
for (let i = 0; i < 100000; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, attributes, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, attributes, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled++;
}
}
clock.restore();
// restore function
(_AwsXRayRemoteSampler.prototype as any).getDefaultTargetPollingInterval = tmp;

expect(sampled).toEqual(100);
done();
}, 2000);
}, 300);
}, 300);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
"FixedRate": 0.0,
"Interval": 100000,
"ReservoirQuota": 100000,
"ReservoirQuota": 1000,
"ReservoirQuotaTTL": 9999999999.0,
"RuleName": "test"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,33 @@ import { SamplingDecision } from '@opentelemetry/sdk-trace-base';
import { expect } from 'expect';
import * as sinon from 'sinon';
import { FallbackSampler } from '../../src/sampler/fallback-sampler';
import { testTraceId } from './aws-xray-remote-sampler.test';

let clock: sinon.SinonFakeTimers;
describe('FallBackSampler', () => {
beforeEach(() => {
clock = sinon.useFakeTimers(Date.now());
});

afterEach(() => {
try {
clock.restore();
} catch {
// do nothing
}
});
it('testShouldSample', () => {
const sampler = new FallbackSampler();

sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []);
it('testShouldSampleWithQuotaOnly', () => {
// Ensure FallbackSampler's internal TraceIdRatioBasedSampler will always return SamplingDecision.NOT_RECORD
const sampler = new FallbackSampler(0);

sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []);

// 0 seconds passed, 0 quota available
let sampled = 0;
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -41,7 +45,7 @@ describe('FallBackSampler', () => {
clock.tick(0.4 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -54,7 +58,7 @@ describe('FallBackSampler', () => {
clock.tick(0.4 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -67,7 +71,7 @@ describe('FallBackSampler', () => {
clock.tick(0.4 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -80,7 +84,7 @@ describe('FallBackSampler', () => {
clock.tick(0.4 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -93,7 +97,7 @@ describe('FallBackSampler', () => {
clock.tick(0.4 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -106,7 +110,7 @@ describe('FallBackSampler', () => {
clock.tick(0.4 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand All @@ -119,7 +123,7 @@ describe('FallBackSampler', () => {
clock.tick(100 * 1000);
for (let i = 0; i < 30; i++) {
if (
sampler.shouldSample(context.active(), '1234', 'name', SpanKind.CLIENT, {}, []).decision !==
sampler.shouldSample(context.active(), testTraceId, 'name', SpanKind.CLIENT, {}, []).decision !==
SamplingDecision.NOT_RECORD
) {
sampled += 1;
Expand Down
Loading
Loading