Skip to content

Commit 5bfd592

Browse files
authored
feat(opentelemetry-sampler-aws-xray): Add Rate Limiter and Sampling Targets Poller Logic (#2924)
1 parent bf302a5 commit 5bfd592

12 files changed

+1218
-24
lines changed

incubator/opentelemetry-sampler-aws-xray/src/aws-xray-sampling-client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export class AWSXRaySamplingClient {
4545
this.makeSamplingRequest<GetSamplingTargetsResponse>(
4646
this.samplingTargetsEndpoint,
4747
callback,
48-
this.samplerDiag.debug,
48+
(message: string) => this.samplerDiag.debug(message),
4949
JSON.stringify(requestBody)
5050
);
5151
}
@@ -56,7 +56,7 @@ export class AWSXRaySamplingClient {
5656
this.makeSamplingRequest<GetSamplingRulesResponse>(
5757
this.getSamplingRulesEndpoint,
5858
callback,
59-
this.samplerDiag.error
59+
(message: string) => this.samplerDiag.error(message)
6060
);
6161
}
6262

incubator/opentelemetry-sampler-aws-xray/src/fallback-sampler.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@
2121
import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
2222
import {
2323
Sampler,
24+
SamplingDecision,
2425
SamplingResult,
2526
TraceIdRatioBasedSampler,
2627
} from '@opentelemetry/sdk-trace-base';
28+
import { RateLimitingSampler } from './rate-limiting-sampler';
2729

2830
// FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
2931
export class FallbackSampler implements Sampler {
3032
private fixedRateSampler: TraceIdRatioBasedSampler;
33+
private rateLimitingSampler: RateLimitingSampler;
3134

32-
constructor() {
33-
this.fixedRateSampler = new TraceIdRatioBasedSampler(0.05);
35+
constructor(ratio = 0.05, quota = 1) {
36+
this.fixedRateSampler = new TraceIdRatioBasedSampler(ratio);
37+
this.rateLimitingSampler = new RateLimitingSampler(quota);
3438
}
3539

3640
shouldSample(
@@ -41,7 +45,19 @@ export class FallbackSampler implements Sampler {
4145
attributes: Attributes,
4246
links: Link[]
4347
): SamplingResult {
44-
// TODO: implement and use Rate Limiting Sampler
48+
const samplingResult: SamplingResult =
49+
this.rateLimitingSampler.shouldSample(
50+
context,
51+
traceId,
52+
spanName,
53+
spanKind,
54+
attributes,
55+
links
56+
);
57+
58+
if (samplingResult.decision !== SamplingDecision.NOT_RECORD) {
59+
return samplingResult;
60+
}
4561

4662
return this.fixedRateSampler.shouldSample(context, traceId);
4763
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// Includes work from:
18+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
19+
// SPDX-License-Identifier: Apache-2.0
20+
21+
/*
22+
* The RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
23+
* If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
24+
* A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
25+
*/
26+
export class RateLimiter {
27+
// Quota assigned to client to dictate maximum quota balance that can be consumed per second.
28+
private quota: number;
29+
private MAX_BALANCE_MILLIS: number;
30+
// Used to measure current quota balance.
31+
private walletFloorMillis: number;
32+
33+
constructor(quota: number, maxBalanceInSeconds = 1) {
34+
this.MAX_BALANCE_MILLIS = maxBalanceInSeconds * 1000.0;
35+
this.quota = quota;
36+
this.walletFloorMillis = Date.now();
37+
// current "balance" would be `ceiling - floor`
38+
}
39+
40+
public take(cost = 1): boolean {
41+
if (this.quota === 0) {
42+
return false;
43+
}
44+
45+
// assume divide by zero not possible
46+
const costInMillis: number = (cost * 1000.0) / this.quota;
47+
48+
const walletCeilingMillis: number = Date.now();
49+
let currentBalanceMillis: number =
50+
walletCeilingMillis - this.walletFloorMillis;
51+
currentBalanceMillis = Math.min(
52+
currentBalanceMillis,
53+
this.MAX_BALANCE_MILLIS
54+
);
55+
const pendingRemainingBalanceMillis: number =
56+
currentBalanceMillis - costInMillis;
57+
if (pendingRemainingBalanceMillis >= 0) {
58+
this.walletFloorMillis =
59+
walletCeilingMillis - pendingRemainingBalanceMillis;
60+
return true;
61+
}
62+
// No changes to the wallet state
63+
return false;
64+
}
65+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// Includes work from:
18+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
19+
// SPDX-License-Identifier: Apache-2.0
20+
21+
import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
22+
import {
23+
Sampler,
24+
SamplingDecision,
25+
SamplingResult,
26+
} from '@opentelemetry/sdk-trace-base';
27+
import { RateLimiter } from './rate-limiter';
28+
29+
export class RateLimitingSampler implements Sampler {
30+
private quota: number;
31+
private reservoir: RateLimiter;
32+
33+
constructor(quota: number) {
34+
this.quota = quota;
35+
this.reservoir = new RateLimiter(quota);
36+
}
37+
38+
shouldSample(
39+
context: Context,
40+
traceId: string,
41+
spanName: string,
42+
spanKind: SpanKind,
43+
attributes: Attributes,
44+
links: Link[]
45+
): SamplingResult {
46+
if (this.reservoir.take(1)) {
47+
return {
48+
decision: SamplingDecision.RECORD_AND_SAMPLED,
49+
attributes: attributes,
50+
};
51+
}
52+
return { decision: SamplingDecision.NOT_RECORD, attributes: attributes };
53+
}
54+
55+
public toString(): string {
56+
return `RateLimitingSampler{rate limiting sampling with sampling config of ${this.quota} req/sec and 0% of additional requests}`;
57+
}
58+
}

incubator/opentelemetry-sampler-aws-xray/src/remote-sampler.ts

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,19 @@ import { FallbackSampler } from './fallback-sampler';
3636
import {
3737
AWSXRayRemoteSamplerConfig,
3838
GetSamplingRulesResponse,
39+
GetSamplingTargetsBody,
40+
GetSamplingTargetsResponse,
3941
SamplingRuleRecord,
42+
SamplingTargetDocument,
43+
TargetMap,
4044
} from './types';
41-
import { RuleCache } from './rule-cache';
45+
import {
46+
DEFAULT_TARGET_POLLING_INTERVAL_SECONDS,
47+
RuleCache,
48+
} from './rule-cache';
4249

4350
import { SamplingRuleApplier } from './sampling-rule-applier';
51+
import { PACKAGE_NAME } from './version';
4452

4553
// 5 minute default sampling rules polling interval
4654
const DEFAULT_RULES_POLLING_INTERVAL_SECONDS: number = 5 * 60;
@@ -94,17 +102,22 @@ export class AWSXRayRemoteSampler implements Sampler {
94102
// Not intended for external use, use Parent-based `AWSXRayRemoteSampler` instead.
95103
export class _AWSXRayRemoteSampler implements Sampler {
96104
private rulePollingIntervalMillis: number;
105+
private targetPollingInterval: number;
97106
private awsProxyEndpoint: string;
98107
private ruleCache: RuleCache;
99108
private fallbackSampler: FallbackSampler;
100109
private samplerDiag: DiagLogger;
101110
private rulePoller: NodeJS.Timeout | undefined;
111+
private targetPoller: NodeJS.Timeout | undefined;
102112
private clientId: string;
103113
private rulePollingJitterMillis: number;
114+
private targetPollingJitterMillis: number;
104115
private samplingClient: AWSXRaySamplingClient;
105116

106117
constructor(samplerConfig: AWSXRayRemoteSamplerConfig) {
107-
this.samplerDiag = diag;
118+
this.samplerDiag = diag.createComponentLogger({
119+
namespace: PACKAGE_NAME,
120+
});
108121

109122
if (
110123
samplerConfig.pollingInterval == null ||
@@ -120,6 +133,8 @@ export class _AWSXRayRemoteSampler implements Sampler {
120133
}
121134

122135
this.rulePollingJitterMillis = Math.random() * 5 * 1000;
136+
this.targetPollingInterval = this.getDefaultTargetPollingInterval();
137+
this.targetPollingJitterMillis = (Math.random() / 10) * 1000;
123138

124139
this.awsProxyEndpoint = samplerConfig.endpoint
125140
? samplerConfig.endpoint
@@ -137,7 +152,12 @@ export class _AWSXRayRemoteSampler implements Sampler {
137152
// Start the Sampling Rules poller
138153
this.startSamplingRulesPoller();
139154

140-
// TODO: Start the Sampling Targets poller
155+
// Start the Sampling Targets poller where the first poll occurs after the default interval
156+
this.startSamplingTargetsPoller();
157+
}
158+
159+
public getDefaultTargetPollingInterval(): number {
160+
return DEFAULT_TARGET_POLLING_INTERVAL_SECONDS;
141161
}
142162

143163
public shouldSample(
@@ -203,6 +223,7 @@ export class _AWSXRayRemoteSampler implements Sampler {
203223

204224
public stopPollers() {
205225
clearInterval(this.rulePoller);
226+
clearInterval(this.targetPoller);
206227
}
207228

208229
private startSamplingRulesPoller(): void {
@@ -216,6 +237,27 @@ export class _AWSXRayRemoteSampler implements Sampler {
216237
this.rulePoller.unref();
217238
}
218239

240+
private startSamplingTargetsPoller(): void {
241+
// Update sampling targets every targetPollingInterval (usually 10 seconds)
242+
this.targetPoller = setInterval(
243+
() => this.getAndUpdateSamplingTargets(),
244+
this.targetPollingInterval * 1000 + this.targetPollingJitterMillis
245+
);
246+
this.targetPoller.unref();
247+
}
248+
249+
private getAndUpdateSamplingTargets(): void {
250+
const requestBody: GetSamplingTargetsBody = {
251+
SamplingStatisticsDocuments:
252+
this.ruleCache.createSamplingStatisticsDocuments(this.clientId),
253+
};
254+
255+
this.samplingClient.fetchSamplingTargets(
256+
requestBody,
257+
this.updateSamplingTargets.bind(this)
258+
);
259+
}
260+
219261
private getAndUpdateSamplingRules(): void {
220262
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this));
221263
}
@@ -242,6 +284,41 @@ export class _AWSXRayRemoteSampler implements Sampler {
242284
}
243285
}
244286

287+
private updateSamplingTargets(
288+
responseObject: GetSamplingTargetsResponse
289+
): void {
290+
try {
291+
const targetDocuments: TargetMap = {};
292+
293+
// Create Target-Name-to-Target-Map from sampling targets response
294+
responseObject.SamplingTargetDocuments.forEach(
295+
(newTarget: SamplingTargetDocument) => {
296+
targetDocuments[newTarget.RuleName] = newTarget;
297+
}
298+
);
299+
300+
// Update targets in the cache
301+
const [refreshSamplingRules, nextPollingInterval]: [boolean, number] =
302+
this.ruleCache.updateTargets(
303+
targetDocuments,
304+
responseObject.LastRuleModification
305+
);
306+
this.targetPollingInterval = nextPollingInterval;
307+
clearInterval(this.targetPoller);
308+
this.startSamplingTargetsPoller();
309+
310+
if (refreshSamplingRules) {
311+
this.samplerDiag.debug(
312+
'Performing out-of-band sampling rule polling to fetch updated rules.'
313+
);
314+
clearInterval(this.rulePoller);
315+
this.startSamplingRulesPoller();
316+
}
317+
} catch (error: unknown) {
318+
this.samplerDiag.debug('Error occurred when updating Sampling Targets');
319+
}
320+
}
321+
245322
private static generateClientId(): string {
246323
const hexChars: string[] = [
247324
'0',

0 commit comments

Comments
 (0)