diff --git a/incubator/opentelemetry-sampler-aws-xray/src/aws-xray-sampling-client.ts b/incubator/opentelemetry-sampler-aws-xray/src/aws-xray-sampling-client.ts index 75c299295b..63e90c29fb 100644 --- a/incubator/opentelemetry-sampler-aws-xray/src/aws-xray-sampling-client.ts +++ b/incubator/opentelemetry-sampler-aws-xray/src/aws-xray-sampling-client.ts @@ -45,7 +45,7 @@ export class AWSXRaySamplingClient { this.makeSamplingRequest( this.samplingTargetsEndpoint, callback, - this.samplerDiag.debug, + (message: string) => this.samplerDiag.debug(message), JSON.stringify(requestBody) ); } @@ -56,7 +56,7 @@ export class AWSXRaySamplingClient { this.makeSamplingRequest( this.getSamplingRulesEndpoint, callback, - this.samplerDiag.error + (message: string) => this.samplerDiag.error(message) ); } diff --git a/incubator/opentelemetry-sampler-aws-xray/src/fallback-sampler.ts b/incubator/opentelemetry-sampler-aws-xray/src/fallback-sampler.ts index d61dd8fb60..11aabd0ac7 100644 --- a/incubator/opentelemetry-sampler-aws-xray/src/fallback-sampler.ts +++ b/incubator/opentelemetry-sampler-aws-xray/src/fallback-sampler.ts @@ -21,16 +21,20 @@ import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api'; import { Sampler, + SamplingDecision, SamplingResult, TraceIdRatioBasedSampler, } from '@opentelemetry/sdk-trace-base'; +import { RateLimitingSampler } from './rate-limiting-sampler'; // FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler. export class FallbackSampler implements Sampler { private fixedRateSampler: TraceIdRatioBasedSampler; + private rateLimitingSampler: RateLimitingSampler; - constructor() { - this.fixedRateSampler = new TraceIdRatioBasedSampler(0.05); + constructor(ratio = 0.05, quota = 1) { + this.fixedRateSampler = new TraceIdRatioBasedSampler(ratio); + this.rateLimitingSampler = new RateLimitingSampler(quota); } shouldSample( @@ -41,7 +45,19 @@ export class FallbackSampler implements Sampler { attributes: Attributes, links: Link[] ): SamplingResult { - // TODO: implement and use Rate Limiting Sampler + const samplingResult: SamplingResult = + this.rateLimitingSampler.shouldSample( + context, + traceId, + spanName, + spanKind, + attributes, + links + ); + + if (samplingResult.decision !== SamplingDecision.NOT_RECORD) { + return samplingResult; + } return this.fixedRateSampler.shouldSample(context, traceId); } diff --git a/incubator/opentelemetry-sampler-aws-xray/src/rate-limiter.ts b/incubator/opentelemetry-sampler-aws-xray/src/rate-limiter.ts new file mode 100644 index 0000000000..2c407fce6c --- /dev/null +++ b/incubator/opentelemetry-sampler-aws-xray/src/rate-limiter.ts @@ -0,0 +1,65 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Includes work from: +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/* + * The RateLimiter keeps track of the current reservoir quota balance available (measured via available time) + * If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time) + * A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available. + */ +export class RateLimiter { + // Quota assigned to client to dictate maximum quota balance that can be consumed per second. + private quota: number; + private MAX_BALANCE_MILLIS: number; + // Used to measure current quota balance. + private walletFloorMillis: number; + + constructor(quota: number, maxBalanceInSeconds = 1) { + this.MAX_BALANCE_MILLIS = maxBalanceInSeconds * 1000.0; + this.quota = quota; + this.walletFloorMillis = Date.now(); + // current "balance" would be `ceiling - floor` + } + + public take(cost = 1): boolean { + if (this.quota === 0) { + return false; + } + + // assume divide by zero not possible + const costInMillis: number = (cost * 1000.0) / this.quota; + + const walletCeilingMillis: number = Date.now(); + let currentBalanceMillis: number = + walletCeilingMillis - this.walletFloorMillis; + currentBalanceMillis = Math.min( + currentBalanceMillis, + this.MAX_BALANCE_MILLIS + ); + const pendingRemainingBalanceMillis: number = + currentBalanceMillis - costInMillis; + if (pendingRemainingBalanceMillis >= 0) { + this.walletFloorMillis = + walletCeilingMillis - pendingRemainingBalanceMillis; + return true; + } + // No changes to the wallet state + return false; + } +} diff --git a/incubator/opentelemetry-sampler-aws-xray/src/rate-limiting-sampler.ts b/incubator/opentelemetry-sampler-aws-xray/src/rate-limiting-sampler.ts new file mode 100644 index 0000000000..c384ced22e --- /dev/null +++ b/incubator/opentelemetry-sampler-aws-xray/src/rate-limiting-sampler.ts @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Includes work from: +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api'; +import { + Sampler, + SamplingDecision, + SamplingResult, +} from '@opentelemetry/sdk-trace-base'; +import { RateLimiter } from './rate-limiter'; + +export class RateLimitingSampler implements Sampler { + private quota: number; + private reservoir: RateLimiter; + + constructor(quota: number) { + this.quota = quota; + this.reservoir = new RateLimiter(quota); + } + + shouldSample( + context: Context, + traceId: string, + spanName: string, + spanKind: SpanKind, + attributes: Attributes, + links: Link[] + ): SamplingResult { + if (this.reservoir.take(1)) { + return { + decision: SamplingDecision.RECORD_AND_SAMPLED, + attributes: attributes, + }; + } + return { decision: SamplingDecision.NOT_RECORD, attributes: attributes }; + } + + public toString(): string { + return `RateLimitingSampler{rate limiting sampling with sampling config of ${this.quota} req/sec and 0% of additional requests}`; + } +} diff --git a/incubator/opentelemetry-sampler-aws-xray/src/remote-sampler.ts b/incubator/opentelemetry-sampler-aws-xray/src/remote-sampler.ts index 5cc080ee90..d6876bf60e 100644 --- a/incubator/opentelemetry-sampler-aws-xray/src/remote-sampler.ts +++ b/incubator/opentelemetry-sampler-aws-xray/src/remote-sampler.ts @@ -36,11 +36,19 @@ import { FallbackSampler } from './fallback-sampler'; import { AWSXRayRemoteSamplerConfig, GetSamplingRulesResponse, + GetSamplingTargetsBody, + GetSamplingTargetsResponse, SamplingRuleRecord, + SamplingTargetDocument, + TargetMap, } from './types'; -import { RuleCache } from './rule-cache'; +import { + DEFAULT_TARGET_POLLING_INTERVAL_SECONDS, + RuleCache, +} from './rule-cache'; import { SamplingRuleApplier } from './sampling-rule-applier'; +import { PACKAGE_NAME } from './version'; // 5 minute default sampling rules polling interval const DEFAULT_RULES_POLLING_INTERVAL_SECONDS: number = 5 * 60; @@ -94,17 +102,22 @@ export class AWSXRayRemoteSampler implements Sampler { // Not intended for external use, use Parent-based `AWSXRayRemoteSampler` instead. export class _AWSXRayRemoteSampler implements Sampler { private rulePollingIntervalMillis: number; + private targetPollingInterval: number; private awsProxyEndpoint: string; private ruleCache: RuleCache; private fallbackSampler: FallbackSampler; private samplerDiag: DiagLogger; private rulePoller: NodeJS.Timeout | undefined; + private targetPoller: NodeJS.Timeout | undefined; private clientId: string; private rulePollingJitterMillis: number; + private targetPollingJitterMillis: number; private samplingClient: AWSXRaySamplingClient; constructor(samplerConfig: AWSXRayRemoteSamplerConfig) { - this.samplerDiag = diag; + this.samplerDiag = diag.createComponentLogger({ + namespace: PACKAGE_NAME, + }); if ( samplerConfig.pollingInterval == null || @@ -120,6 +133,8 @@ export class _AWSXRayRemoteSampler implements Sampler { } this.rulePollingJitterMillis = Math.random() * 5 * 1000; + this.targetPollingInterval = this.getDefaultTargetPollingInterval(); + this.targetPollingJitterMillis = (Math.random() / 10) * 1000; this.awsProxyEndpoint = samplerConfig.endpoint ? samplerConfig.endpoint @@ -137,7 +152,12 @@ export class _AWSXRayRemoteSampler implements Sampler { // Start the Sampling Rules poller this.startSamplingRulesPoller(); - // TODO: Start the Sampling Targets poller + // Start the Sampling Targets poller where the first poll occurs after the default interval + this.startSamplingTargetsPoller(); + } + + public getDefaultTargetPollingInterval(): number { + return DEFAULT_TARGET_POLLING_INTERVAL_SECONDS; } public shouldSample( @@ -203,6 +223,7 @@ export class _AWSXRayRemoteSampler implements Sampler { public stopPollers() { clearInterval(this.rulePoller); + clearInterval(this.targetPoller); } private startSamplingRulesPoller(): void { @@ -216,6 +237,27 @@ export class _AWSXRayRemoteSampler implements Sampler { this.rulePoller.unref(); } + private startSamplingTargetsPoller(): void { + // Update sampling targets every targetPollingInterval (usually 10 seconds) + this.targetPoller = setInterval( + () => this.getAndUpdateSamplingTargets(), + this.targetPollingInterval * 1000 + this.targetPollingJitterMillis + ); + this.targetPoller.unref(); + } + + private getAndUpdateSamplingTargets(): void { + const requestBody: GetSamplingTargetsBody = { + SamplingStatisticsDocuments: + this.ruleCache.createSamplingStatisticsDocuments(this.clientId), + }; + + this.samplingClient.fetchSamplingTargets( + requestBody, + this.updateSamplingTargets.bind(this) + ); + } + private getAndUpdateSamplingRules(): void { this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this)); } @@ -242,6 +284,41 @@ export class _AWSXRayRemoteSampler implements Sampler { } } + private updateSamplingTargets( + responseObject: GetSamplingTargetsResponse + ): void { + try { + const targetDocuments: TargetMap = {}; + + // Create Target-Name-to-Target-Map from sampling targets response + responseObject.SamplingTargetDocuments.forEach( + (newTarget: SamplingTargetDocument) => { + targetDocuments[newTarget.RuleName] = newTarget; + } + ); + + // Update targets in the cache + const [refreshSamplingRules, nextPollingInterval]: [boolean, number] = + this.ruleCache.updateTargets( + targetDocuments, + responseObject.LastRuleModification + ); + this.targetPollingInterval = nextPollingInterval; + clearInterval(this.targetPoller); + this.startSamplingTargetsPoller(); + + if (refreshSamplingRules) { + this.samplerDiag.debug( + 'Performing out-of-band sampling rule polling to fetch updated rules.' + ); + clearInterval(this.rulePoller); + this.startSamplingRulesPoller(); + } + } catch (error: unknown) { + this.samplerDiag.debug('Error occurred when updating Sampling Targets'); + } + } + private static generateClientId(): string { const hexChars: string[] = [ '0', diff --git a/incubator/opentelemetry-sampler-aws-xray/src/rule-cache.ts b/incubator/opentelemetry-sampler-aws-xray/src/rule-cache.ts index cf762d454c..05960c3f44 100644 --- a/incubator/opentelemetry-sampler-aws-xray/src/rule-cache.ts +++ b/incubator/opentelemetry-sampler-aws-xray/src/rule-cache.ts @@ -18,13 +18,22 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Attributes } from '@opentelemetry/api'; +import { Attributes, diag } from '@opentelemetry/api'; import { Resource } from '@opentelemetry/resources'; +import { + ISamplingStatistics, + SamplingStatisticsDocument, + SamplingTargetDocument, + TargetMap, +} from './types'; import { SamplingRuleApplier } from './sampling-rule-applier'; // The cache expires 1 hour after the last refresh time. const RULE_CACHE_TTL_MILLIS: number = 60 * 60 * 1000; +// 10 second default sampling targets polling interval +export const DEFAULT_TARGET_POLLING_INTERVAL_SECONDS = 10; + export class RuleCache { private ruleAppliers: SamplingRuleApplier[]; private lastUpdatedEpochMillis: number; @@ -89,4 +98,62 @@ export class RuleCache { this.sortRulesByPriority(); this.lastUpdatedEpochMillis = Date.now(); } + + public createSamplingStatisticsDocuments( + clientId: string + ): SamplingStatisticsDocument[] { + const statisticsDocuments: SamplingStatisticsDocument[] = []; + + this.ruleAppliers.forEach((rule: SamplingRuleApplier) => { + const statistics: ISamplingStatistics = rule.snapshotStatistics(); + const nowInSeconds: number = Math.floor(Date.now() / 1000); + + const samplingStatisticsDoc: SamplingStatisticsDocument = { + ClientID: clientId, + RuleName: rule.samplingRule.RuleName, + Timestamp: nowInSeconds, + RequestCount: statistics.RequestCount, + BorrowCount: statistics.BorrowCount, + SampledCount: statistics.SampleCount, + }; + + statisticsDocuments.push(samplingStatisticsDoc); + }); + return statisticsDocuments; + } + + // Update ruleAppliers based on the targets fetched from X-Ray service + public updateTargets( + targetDocuments: TargetMap, + lastRuleModification: number + ): [boolean, number] { + let minPollingInterval: number | undefined = undefined; + let nextPollingInterval: number = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS; + + for (const [index, rule] of this.ruleAppliers.entries()) { + const target: SamplingTargetDocument = + targetDocuments[rule.samplingRule.RuleName]; + if (target) { + this.ruleAppliers[index] = rule.withTarget(target); + if (typeof target.Interval === 'number') { + if ( + minPollingInterval === undefined || + minPollingInterval > target.Interval + ) { + minPollingInterval = target.Interval; + } + } + } else { + diag.debug('Invalid sampling target: missing rule name'); + } + } + + if (typeof minPollingInterval === 'number') { + nextPollingInterval = minPollingInterval; + } + + const refreshSamplingRules: boolean = + lastRuleModification * 1000 > this.lastUpdatedEpochMillis; + return [refreshSamplingRules, nextPollingInterval]; + } } diff --git a/incubator/opentelemetry-sampler-aws-xray/src/sampling-rule-applier.ts b/incubator/opentelemetry-sampler-aws-xray/src/sampling-rule-applier.ts index a148f2ca98..9a4fd3c1ba 100644 --- a/incubator/opentelemetry-sampler-aws-xray/src/sampling-rule-applier.ts +++ b/incubator/opentelemetry-sampler-aws-xray/src/sampling-rule-applier.ts @@ -54,15 +54,26 @@ import { ATTR_AWS_LAMBDA_INVOKED_ARN, ATTR_CLOUD_RESOURCE_ID, } from './semconv'; -import { ISamplingRule, SamplingTargetDocument } from './types'; +import { RateLimitingSampler } from './rate-limiting-sampler'; +import { + ISamplingRule, + ISamplingStatistics, + SamplingTargetDocument, +} from './types'; import { SamplingRule } from './sampling-rule'; import { Statistics } from './statistics'; import { CLOUD_PLATFORM_MAPPING, attributeMatch, wildcardMatch } from './utils'; +// Max date time in JavaScript +const MAX_DATE_TIME_MILLIS: number = new Date(8_640_000_000_000_000).getTime(); + export class SamplingRuleApplier { public samplingRule: SamplingRule; + private reservoirSampler: RateLimitingSampler; private fixedRateSampler: TraceIdRatioBasedSampler; private statistics: Statistics; + private borrowingEnabled: boolean; + private reservoirExpiryTimeInMillis: number; constructor( samplingRule: ISamplingRule, @@ -74,12 +85,44 @@ export class SamplingRuleApplier { this.fixedRateSampler = new TraceIdRatioBasedSampler( this.samplingRule.FixedRate ); - // TODO: Add Reservoir Sampler (Rate Limiting Sampler) + if (samplingRule.ReservoirSize > 0) { + this.reservoirSampler = new RateLimitingSampler(1); + } else { + this.reservoirSampler = new RateLimitingSampler(0); + } + this.reservoirExpiryTimeInMillis = MAX_DATE_TIME_MILLIS; this.statistics = statistics; this.statistics.resetStatistics(); + this.borrowingEnabled = true; + + if (target) { + this.borrowingEnabled = false; + if (typeof target.ReservoirQuota === 'number') { + this.reservoirSampler = new RateLimitingSampler(target.ReservoirQuota); + } - // TODO: Update Sampling Targets using provided `target` parameter + if (typeof target.ReservoirQuotaTTL === 'number') { + this.reservoirExpiryTimeInMillis = new Date( + target.ReservoirQuotaTTL * 1000 + ).getTime(); + } else { + this.reservoirExpiryTimeInMillis = Date.now(); + } + + if (typeof target.FixedRate === 'number') { + this.fixedRateSampler = new TraceIdRatioBasedSampler(target.FixedRate); + } + } + } + + public withTarget(target: SamplingTargetDocument): SamplingRuleApplier { + const newApplier: SamplingRuleApplier = new SamplingRuleApplier( + this.samplingRule, + this.statistics, + target + ); + return newApplier; } public matches(attributes: Attributes, resource: Resource): boolean { @@ -153,19 +196,45 @@ export class SamplingRuleApplier { attributes: Attributes, links: Link[] ): SamplingResult { - // TODO: Record Sampling Statistics - + let hasBorrowed = false; let result: SamplingResult = { decision: SamplingDecision.NOT_RECORD }; - // TODO: Apply Reservoir Sampling + const nowInMillis: number = Date.now(); + const reservoirExpired: boolean = + nowInMillis >= this.reservoirExpiryTimeInMillis; + + if (!reservoirExpired) { + result = this.reservoirSampler.shouldSample( + context, + traceId, + spanName, + spanKind, + attributes, + links + ); + hasBorrowed = + this.borrowingEnabled && + result.decision !== SamplingDecision.NOT_RECORD; + } if (result.decision === SamplingDecision.NOT_RECORD) { result = this.fixedRateSampler.shouldSample(context, traceId); } + this.statistics.SampleCount += + result.decision !== SamplingDecision.NOT_RECORD ? 1 : 0; + this.statistics.BorrowCount += hasBorrowed ? 1 : 0; + this.statistics.RequestCount += 1; + return result; } + public snapshotStatistics(): ISamplingStatistics { + const statisticsCopy: ISamplingStatistics = { ...this.statistics }; + this.statistics.resetStatistics(); + return statisticsCopy; + } + private getArn( resource: Resource, attributes: Attributes diff --git a/incubator/opentelemetry-sampler-aws-xray/test/fallback-sampler.test.ts b/incubator/opentelemetry-sampler-aws-xray/test/fallback-sampler.test.ts index 06ce23cb55..400bdfb18a 100644 --- a/incubator/opentelemetry-sampler-aws-xray/test/fallback-sampler.test.ts +++ b/incubator/opentelemetry-sampler-aws-xray/test/fallback-sampler.test.ts @@ -18,11 +18,190 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +import { SpanKind, context } from '@opentelemetry/api'; +import { SamplingDecision } from '@opentelemetry/sdk-trace-base'; import { expect } from 'expect'; +import * as sinon from 'sinon'; import { FallbackSampler } from '../src/fallback-sampler'; +import { testTraceId } from './remote-sampler.test'; + +let clock: sinon.SinonFakeTimers; describe('FallBackSampler', () => { - // TODO: Add tests for Fallback sampler when Rate Limiter is implemented + beforeEach(() => { + clock = sinon.useFakeTimers(Date.now()); + }); + afterEach(() => { + try { + clock.restore(); + } catch { + // do nothing + } + }); + it('testShouldSampleWithQuotaOnly', () => { + // Ensure FallbackSampler's internal TraceIdRatioBasedSampler will always return SamplingDecision.NOT_RECORD + const sampler = new FallbackSampler(0); + + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ); + + // 0 seconds passed, 0 quota available + let sampled = 0; + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + // 0.4 seconds passed, 0.4 quota available + sampled = 0; + clock.tick(0.4 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + // 0.8 seconds passed, 0.8 quota available + sampled = 0; + clock.tick(0.4 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + // 1.2 seconds passed, 1 quota consumed, 0 quota available + sampled = 0; + clock.tick(0.4 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(1); + + // 1.6 seconds passed, 0.4 quota available + sampled = 0; + clock.tick(0.4 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + // 2.0 seconds passed, 0.8 quota available + sampled = 0; + clock.tick(0.4 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + // 2.4 seconds passed, one more quota consumed, 0 quota available + sampled = 0; + clock.tick(0.4 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(1); + + // 100 seconds passed, only one quota can be consumed + sampled = 0; + clock.tick(100 * 1000); + for (let i = 0; i < 30; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(1); + }); it('toString()', () => { expect(new FallbackSampler().toString()).toEqual( diff --git a/incubator/opentelemetry-sampler-aws-xray/test/rate-limiter.test.ts b/incubator/opentelemetry-sampler-aws-xray/test/rate-limiter.test.ts new file mode 100644 index 0000000000..3a062dbcb3 --- /dev/null +++ b/incubator/opentelemetry-sampler-aws-xray/test/rate-limiter.test.ts @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Includes work from: +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { expect } from 'expect'; +import * as sinon from 'sinon'; +import { RateLimiter } from '../src/rate-limiter'; + +let clock: sinon.SinonFakeTimers; +describe('RateLimiter', () => { + beforeEach(() => { + clock = sinon.useFakeTimers(Date.now()); + }); + afterEach(() => { + clock.restore(); + }); + it('testTake', () => { + const limiter = new RateLimiter(30, 1); + + let spent = 0; + for (let i = 0; i < 100; i++) { + if (limiter.take(1)) { + spent++; + } + } + expect(spent).toEqual(0); + + spent = 0; + clock.tick(0.5 * 1000); + for (let i = 0; i < 100; i++) { + if (limiter.take(1)) { + spent++; + } + } + expect(spent).toEqual(15); + + spent = 0; + clock.tick(1000 * 1000); + for (let i = 0; i < 100; i++) { + if (limiter.take(1)) { + spent++; + } + } + expect(spent).toEqual(30); + }); +}); diff --git a/incubator/opentelemetry-sampler-aws-xray/test/rate-limiting-sampler.test.ts b/incubator/opentelemetry-sampler-aws-xray/test/rate-limiting-sampler.test.ts new file mode 100644 index 0000000000..73adc55b45 --- /dev/null +++ b/incubator/opentelemetry-sampler-aws-xray/test/rate-limiting-sampler.test.ts @@ -0,0 +1,217 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Includes work from: +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { SpanKind, context } from '@opentelemetry/api'; +import { SamplingDecision } from '@opentelemetry/sdk-trace-base'; +import { expect } from 'expect'; +import * as sinon from 'sinon'; +import { RateLimitingSampler } from '../src/rate-limiting-sampler'; +import { testTraceId } from './remote-sampler.test'; + +let clock: sinon.SinonFakeTimers; + +describe('RateLimitingSampler', () => { + beforeEach(() => { + clock = sinon.useFakeTimers(Date.now()); + }); + afterEach(() => { + clock.restore(); + }); + it('testShouldSample', () => { + const sampler = new RateLimitingSampler(30); + + let sampled = 0; + for (let i = 0; i < 100; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + clock.tick(0.5 * 1000); // Move forward half a second + + sampled = 0; + for (let i = 0; i < 100; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(15); + + clock.tick(1 * 1000); // Move forward 1 second + + sampled = 0; + for (let i = 0; i < 100; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(30); + + clock.tick(2.5 * 1000); // Move forward 2.5 seconds + + sampled = 0; + for (let i = 0; i < 100; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(30); + + clock.tick(1000 * 1000); // Move forward 1000 seconds + + sampled = 0; + for (let i = 0; i < 100; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(30); + }); + + it('testShouldSampleWithQuotaOfOne', () => { + const sampler = new RateLimitingSampler(1); + + let sampled = 0; + for (let i = 0; i < 50; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + clock.tick(0.5 * 1000); // Move forward half a second + + sampled = 0; + for (let i = 0; i < 50; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(0); + + clock.tick(0.5 * 1000); + + sampled = 0; + for (let i = 0; i < 50; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(1); + + clock.tick(1000 * 1000); // Move forward 1000 seconds + + sampled = 0; + for (let i = 0; i < 50; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + {}, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled += 1; + } + } + expect(sampled).toEqual(1); + }); + + it('toString()', () => { + expect(new RateLimitingSampler(123).toString()).toEqual( + 'RateLimitingSampler{rate limiting sampling with sampling config of 123 req/sec and 0% of additional requests}' + ); + }); +}); diff --git a/incubator/opentelemetry-sampler-aws-xray/test/remote-sampler.test.ts b/incubator/opentelemetry-sampler-aws-xray/test/remote-sampler.test.ts index 27fecc0670..48f2e21a09 100644 --- a/incubator/opentelemetry-sampler-aws-xray/test/remote-sampler.test.ts +++ b/incubator/opentelemetry-sampler-aws-xray/test/remote-sampler.test.ts @@ -22,7 +22,16 @@ import { resourceFromAttributes, emptyResource, } from '@opentelemetry/resources'; -import { context, Span, SpanKind, Tracer, trace } from '@opentelemetry/api'; +import { + context, + Span, + SpanKind, + Tracer, + trace, + Attributes, + Link, + Context, +} from '@opentelemetry/api'; import { SamplingDecision } from '@opentelemetry/sdk-trace-base'; import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; import { @@ -40,19 +49,23 @@ import { AWSXRaySamplingClient } from '../src/aws-xray-sampling-client'; const DATA_DIR_SAMPLING_RULES = __dirname + '/data/test-remote-sampler_sampling-rules-response-sample.json'; +const DATA_DIR_SAMPLING_TARGETS = + __dirname + '/data/test-remote-sampler_sampling-targets-response-sample.json'; const TEST_URL = 'http://localhost:2000'; +export const testTraceId = '0af7651916cd43dd8448eb211c80319c'; describe('AWSXRayRemoteSampler', () => { let sampler: AWSXRayRemoteSampler; afterEach(() => { + sinon.restore(); if (sampler != null) { sampler.stopPollers(); } }); it('testCreateRemoteSamplerWithEmptyResource', () => { - const sampler: AWSXRayRemoteSampler = new AWSXRayRemoteSampler({ + sampler = new AWSXRayRemoteSampler({ resource: emptyResource(), }); @@ -114,7 +127,9 @@ describe('AWSXRayRemoteSampler', () => { nock(TEST_URL) .post('/GetSamplingRules') .reply(200, require(DATA_DIR_SAMPLING_RULES)); - + nock(TEST_URL) + .post('/SamplingTargets') + .reply(200, require(DATA_DIR_SAMPLING_TARGETS)); const resource = resourceFromAttributes({ [ATTR_SERVICE_NAME]: 'test-service-name', [SEMRESATTRS_CLOUD_PLATFORM]: 'test-cloud-platform', @@ -132,7 +147,7 @@ describe('AWSXRayRemoteSampler', () => { expect( sampler.shouldSample( context.active(), - '1234', + testTraceId, 'name', SpanKind.CLIENT, { abc: '1234' }, @@ -140,8 +155,188 @@ describe('AWSXRayRemoteSampler', () => { ).decision ).toEqual(SamplingDecision.NOT_RECORD); - // TODO: Run more tests after updating Sampling Targets - done(); + sampler['internalXraySampler']['getAndUpdateSamplingTargets'](); + + setTimeout(() => { + expect( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + { abc: '1234' }, + [] + ).decision + ).toEqual(SamplingDecision.RECORD_AND_SAMPLED); + expect( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + { abc: '1234' }, + [] + ).decision + ).toEqual(SamplingDecision.RECORD_AND_SAMPLED); + expect( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + { abc: '1234' }, + [] + ).decision + ).toEqual(SamplingDecision.RECORD_AND_SAMPLED); + + done(); + }, 50); + }, 50); + }); + + it('testLargeReservoir', done => { + nock(TEST_URL) + .post('/GetSamplingRules') + .reply(200, require(DATA_DIR_SAMPLING_RULES)); + nock(TEST_URL) + .post('/SamplingTargets') + .reply(200, require(DATA_DIR_SAMPLING_TARGETS)); + const resource = resourceFromAttributes({ + [ATTR_SERVICE_NAME]: 'test-service-name', + [SEMRESATTRS_CLOUD_PLATFORM]: 'test-cloud-platform', + }); + const attributes = { abc: '1234' }; + + sampler = new AWSXRayRemoteSampler({ + resource: resource, + }); + sampler['internalXraySampler']['getAndUpdateSamplingRules'](); + + setTimeout(() => { + expect( + sampler['internalXraySampler']['ruleCache']['ruleAppliers'][0] + .samplingRule.RuleName + ).toEqual('test'); + expect( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + attributes, + [] + ).decision + ).toEqual(SamplingDecision.NOT_RECORD); + sampler['internalXraySampler']['getAndUpdateSamplingTargets'](); + + setTimeout(() => { + const clock = sinon.useFakeTimers(Date.now()); + clock.tick(1500); + let sampled = 0; + for (let i = 0; i < 1005; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + attributes, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled++; + } + } + clock.restore(); + + expect( + sampler['internalXraySampler']['ruleCache']['ruleAppliers'][0][ + 'reservoirSampler' + ]['quota'] + ).toEqual(1000); + expect(sampled).toEqual(1000); + done(); + }, 50); + }, 50); + }); + + it('testSomeReservoir', done => { + nock(TEST_URL) + .post('/GetSamplingRules') + .reply(200, require(DATA_DIR_SAMPLING_RULES)); + nock(TEST_URL) + .post('/SamplingTargets') + .reply(200, require(DATA_DIR_SAMPLING_TARGETS)); + const resource = resourceFromAttributes({ + [ATTR_SERVICE_NAME]: 'test-service-name', + [SEMRESATTRS_CLOUD_PLATFORM]: 'test-cloud-platform', + }); + const attributes = { + abc: 'non-matching attribute value, use default rule', + }; + + sampler = new AWSXRayRemoteSampler({ + resource: resource, + }); + sinon + .stub(sampler['internalXraySampler']['fallbackSampler'], 'shouldSample') + .callsFake( + ( + context: Context, + traceId: string, + spanName: string, + spanKind: SpanKind, + attributes: Attributes, + links: Link[] + ) => { + return { + decision: SamplingDecision.NOT_RECORD, + attributes: attributes, + }; + } + ); + sampler['internalXraySampler']['getAndUpdateSamplingRules'](); + + setTimeout(() => { + expect( + sampler['internalXraySampler']['ruleCache']['ruleAppliers'][0] + .samplingRule.RuleName + ).toEqual('test'); + expect( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + attributes, + [] + ).decision + ).toEqual(SamplingDecision.RECORD_AND_SAMPLED); + sampler['internalXraySampler']['getAndUpdateSamplingTargets'](); + + setTimeout(() => { + const clock = sinon.useFakeTimers(Date.now()); + clock.tick(1000); + let sampled = 0; + for (let i = 0; i < 1000; i++) { + if ( + sampler.shouldSample( + context.active(), + testTraceId, + 'name', + SpanKind.CLIENT, + attributes, + [] + ).decision !== SamplingDecision.NOT_RECORD + ) { + sampled++; + } + } + clock.restore(); + + expect(sampled).toEqual(100); + done(); + }, 50); }, 50); }); @@ -159,7 +354,85 @@ describe('AWSXRayRemoteSampler', () => { ); }); - // TODO: Run tests for Reservoir Sampling and Sampling Statistics + it('ParentBased AWSXRayRemoteSampler creates expected Statistics from the 1 Span with no Parent, disregarding 2 Child Spans', done => { + const defaultRuleDir = + __dirname + '/data/get-sampling-rules-response-sample-sample-all.json'; + nock(TEST_URL) + .post('/GetSamplingRules') + .reply(200, require(defaultRuleDir)); + + sampler = new AWSXRayRemoteSampler({ + resource: emptyResource(), + }); + const tracerProvider: NodeTracerProvider = new NodeTracerProvider({ + sampler: sampler, + }); + const tracer: Tracer = tracerProvider.getTracer('test'); + + setTimeout(() => { + const span0 = tracer.startSpan('test0'); + const ctx = trace.setSpan(context.active(), span0); + const span1: Span = tracer.startSpan('test1', {}, ctx); + const span2: Span = tracer.startSpan('test2', {}, ctx); + span2.end(); + span1.end(); + span0.end(); + + // span1 and span2 are child spans of root span0 + // For AWSXRayRemoteSampler (ParentBased), expect only span0 to update statistics + expect( + sampler['internalXraySampler']['ruleCache']['ruleAppliers'][0][ + 'statistics' + ].RequestCount + ).toBe(1); + expect( + sampler['internalXraySampler']['ruleCache']['ruleAppliers'][0][ + 'statistics' + ].SampleCount + ).toBe(1); + done(); + }, 50); + }); + + it('Non-ParentBased _AWSXRayRemoteSampler creates expected Statistics based on all 3 Spans, disregarding Parent Span Sampling Decision', done => { + const defaultRuleDir = + __dirname + '/data/get-sampling-rules-response-sample-sample-all.json'; + nock(TEST_URL) + .post('/GetSamplingRules') + .reply(200, require(defaultRuleDir)); + + sampler = new AWSXRayRemoteSampler({ + resource: emptyResource(), + }); + const internalSampler: _AWSXRayRemoteSampler = + sampler['internalXraySampler']; + const tracerProvider: NodeTracerProvider = new NodeTracerProvider({ + sampler: internalSampler, + }); + const tracer: Tracer = tracerProvider.getTracer('test'); + + setTimeout(() => { + const span0 = tracer.startSpan('test0'); + const ctx = trace.setSpan(context.active(), span0); + const span1: Span = tracer.startSpan('test1', {}, ctx); + const span2: Span = tracer.startSpan('test2', {}, ctx); + span2.end(); + span1.end(); + span0.end(); + + // span1 and span2 are child spans of root span0 + // For _AWSXRayRemoteSampler (Non-ParentBased), expect all 3 spans to update statistics + expect( + internalSampler['ruleCache']['ruleAppliers'][0]['statistics'] + .RequestCount + ).toBe(3); + expect( + internalSampler['ruleCache']['ruleAppliers'][0]['statistics'] + .SampleCount + ).toBe(3); + done(); + }, 50); + }); }); describe('_AWSXRayRemoteSampler', () => { diff --git a/incubator/opentelemetry-sampler-aws-xray/test/rule-cache.test.ts b/incubator/opentelemetry-sampler-aws-xray/test/rule-cache.test.ts index ae3cfdc725..952a0f9982 100644 --- a/incubator/opentelemetry-sampler-aws-xray/test/rule-cache.test.ts +++ b/incubator/opentelemetry-sampler-aws-xray/test/rule-cache.test.ts @@ -143,5 +143,116 @@ describe('RuleCache', () => { ); }); - // TODO: Add tests for updating Sampling Targets and getting statistics + it('testUpdateSamplingTargets', () => { + const rule1 = createRule('default', 10000, 1, 0.05); + const rule2 = createRule('test', 20, 10, 0.2); + const cache = new RuleCache(emptyResource()); + cache.updateRules([rule1, rule2]); + + expect(cache['ruleAppliers'][0]['reservoirSampler']['quota']).toEqual(1); + expect(cache['ruleAppliers'][0]['fixedRateSampler']['_ratio']).toEqual( + rule2.samplingRule.FixedRate + ); + + expect(cache['ruleAppliers'][1]['reservoirSampler']['quota']).toEqual(1); + expect(cache['ruleAppliers'][1]['fixedRateSampler']['_ratio']).toEqual( + rule1.samplingRule.FixedRate + ); + + const time = Date.now() / 1000; + const target1 = { + FixedRate: 0.05, + Interval: 15, + ReservoirQuota: 1, + ReservoirQuotaTTL: time + 10, + RuleName: 'default', + }; + const target2 = { + FixedRate: 0.15, + Interval: 12, + ReservoirQuota: 5, + ReservoirQuotaTTL: time + 10, + RuleName: 'test', + }; + const target3 = { + FixedRate: 0.15, + Interval: 3, + ReservoirQuota: 5, + ReservoirQuotaTTL: time + 10, + RuleName: 'associated rule does not exist', + }; + + const targetMap = { + default: target1, + test: target2, + 'associated rule does not exist': target3, + }; + const [refreshSamplingRules, nextPollingInterval] = cache.updateTargets( + targetMap, + time - 10 + ); + expect(refreshSamplingRules).toEqual(false); + expect(nextPollingInterval).toEqual(target2.Interval); + + // Ensure cache is still of length 2 + expect(cache['ruleAppliers'].length).toEqual(2); + + expect(cache['ruleAppliers'][0]['reservoirSampler']['quota']).toEqual( + target2.ReservoirQuota + ); + expect(cache['ruleAppliers'][0]['fixedRateSampler']['_ratio']).toEqual( + target2.FixedRate + ); + expect(cache['ruleAppliers'][1]['reservoirSampler']['quota']).toEqual( + target1.ReservoirQuota + ); + expect(cache['ruleAppliers'][1]['fixedRateSampler']['_ratio']).toEqual( + target1.FixedRate + ); + + const [refreshSamplingRulesAfter, _] = cache.updateTargets( + targetMap, + time + 1 + ); + expect(refreshSamplingRulesAfter).toBe(true); + }); + + it('testGetAllStatistics', () => { + const time = Date.now(); + const clock = sinon.useFakeTimers(time); + + const rule1 = createRule('test', 4, 2, 2.0); + const rule2 = createRule('default', 5, 5, 5.0); + + const cache = new RuleCache(emptyResource()); + cache.updateRules([rule1, rule2]); + + clock.tick(1); // ms + + const clientId = '12345678901234567890abcd'; + const statistics = cache.createSamplingStatisticsDocuments( + '12345678901234567890abcd' + ); + + // 1 ms should not be big enough to expect a timestamp difference + expect(statistics).toEqual([ + { + ClientID: clientId, + RuleName: 'test', + Timestamp: Math.floor(time / 1000), + RequestCount: 0, + BorrowCount: 0, + SampledCount: 0, + }, + { + ClientID: clientId, + RuleName: 'default', + Timestamp: Math.floor(time / 1000), + RequestCount: 0, + BorrowCount: 0, + SampledCount: 0, + }, + ]); + clock.restore(); + }); });