Skip to content

Commit f2afb66

Browse files
author
Anuraag Agrawal
authored
Fetch and update rules with sampling targets. (#53)
* Fetch and update rules with sampling targets.
1 parent 439cdce commit f2afb66

File tree

12 files changed

+715
-117
lines changed

12 files changed

+715
-117
lines changed

aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,42 +7,52 @@
77
import io.opentelemetry.api.common.Attributes;
88
import io.opentelemetry.api.trace.SpanKind;
99
import io.opentelemetry.context.Context;
10+
import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRuleRecord;
11+
import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument;
12+
import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument;
13+
import io.opentelemetry.sdk.common.Clock;
1014
import io.opentelemetry.sdk.resources.Resource;
1115
import io.opentelemetry.sdk.trace.data.LinkData;
1216
import io.opentelemetry.sdk.trace.samplers.Sampler;
1317
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
1418
import java.io.Closeable;
19+
import java.time.Instant;
20+
import java.util.Date;
1521
import java.util.List;
22+
import java.util.Map;
1623
import java.util.Random;
24+
import java.util.Set;
1725
import java.util.concurrent.Executors;
1826
import java.util.concurrent.ScheduledExecutorService;
1927
import java.util.concurrent.ScheduledFuture;
2028
import java.util.concurrent.TimeUnit;
29+
import java.util.function.Function;
2130
import java.util.logging.Level;
2231
import java.util.logging.Logger;
32+
import java.util.stream.Collectors;
2333
import org.checkerframework.checker.nullness.qual.Nullable;
2434

2535
/** Remote sampler that gets sampling configuration from AWS X-Ray. */
2636
public final class AwsXrayRemoteSampler implements Sampler, Closeable {
2737

28-
private static final Random RANDOM = new Random();
38+
static final long DEFAULT_TARGET_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(10);
2939

40+
private static final Random RANDOM = new Random();
3041
private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName());
3142

32-
private static final String WORKER_THREAD_NAME =
33-
AwsXrayRemoteSampler.class.getSimpleName() + "_WorkerThread";
34-
3543
// Unique per-process client ID, generated as a random string.
3644
private static final String CLIENT_ID = generateClientId();
3745

3846
private final Resource resource;
47+
private final Clock clock;
3948
private final Sampler initialSampler;
4049
private final XraySamplerClient client;
4150
private final ScheduledExecutorService executor;
4251
private final long pollingIntervalNanos;
4352
private final int jitterNanos;
4453

4554
@Nullable private volatile ScheduledFuture<?> pollFuture;
55+
@Nullable private volatile ScheduledFuture<?> fetchTargetsFuture;
4656
@Nullable private volatile GetSamplingRulesResponse previousRulesResponse;
4757
private volatile Sampler sampler;
4858

@@ -57,8 +67,13 @@ public static AwsXrayRemoteSamplerBuilder newBuilder(Resource resource) {
5767
}
5868

5969
AwsXrayRemoteSampler(
60-
Resource resource, String endpoint, Sampler initialSampler, long pollingIntervalNanos) {
70+
Resource resource,
71+
Clock clock,
72+
String endpoint,
73+
Sampler initialSampler,
74+
long pollingIntervalNanos) {
6175
this.resource = resource;
76+
this.clock = clock;
6277
this.initialSampler = initialSampler;
6378
client = new XraySamplerClient(endpoint);
6479
executor =
@@ -107,8 +122,22 @@ private void getAndUpdateSampler() {
107122
client.getSamplingRules(GetSamplingRulesRequest.create(null));
108123
if (!response.equals(previousRulesResponse)) {
109124
sampler =
110-
new XrayRulesSampler(CLIENT_ID, resource, initialSampler, response.getSamplingRules());
125+
new XrayRulesSampler(
126+
CLIENT_ID,
127+
resource,
128+
clock,
129+
initialSampler,
130+
response.getSamplingRules().stream()
131+
.map(SamplingRuleRecord::getRule)
132+
.collect(Collectors.toList()));
111133
previousRulesResponse = response;
134+
ScheduledFuture<?> existingFetchTargetsFuture = fetchTargetsFuture;
135+
if (existingFetchTargetsFuture != null) {
136+
existingFetchTargetsFuture.cancel(false);
137+
}
138+
fetchTargetsFuture =
139+
executor.schedule(
140+
this::fetchTargets, DEFAULT_TARGET_INTERVAL_NANOS, TimeUnit.NANOSECONDS);
112141
}
113142
} catch (Throwable t) {
114143
logger.log(Level.FINE, "Failed to update sampler", t);
@@ -121,6 +150,39 @@ private void scheduleSamplerUpdate() {
121150
pollFuture = executor.schedule(this::getAndUpdateSampler, delay, TimeUnit.NANOSECONDS);
122151
}
123152

153+
private void fetchTargets() {
154+
if (!(sampler instanceof XrayRulesSampler)) {
155+
throw new IllegalStateException("Programming bug.");
156+
}
157+
158+
XrayRulesSampler xrayRulesSampler = (XrayRulesSampler) sampler;
159+
try {
160+
Date now = Date.from(Instant.ofEpochSecond(0, clock.now()));
161+
List<SamplingStatisticsDocument> statistics = xrayRulesSampler.snapshot(now);
162+
Set<String> requestedTargetRuleNames =
163+
statistics.stream()
164+
.map(SamplingStatisticsDocument::getRuleName)
165+
.collect(Collectors.toSet());
166+
167+
GetSamplingTargetsResponse response =
168+
client.getSamplingTargets(GetSamplingTargetsRequest.create(statistics));
169+
Map<String, SamplingTargetDocument> targets =
170+
response.getDocuments().stream()
171+
.collect(Collectors.toMap(SamplingTargetDocument::getRuleName, Function.identity()));
172+
sampler =
173+
xrayRulesSampler = xrayRulesSampler.withTargets(targets, requestedTargetRuleNames, now);
174+
} catch (Throwable t) {
175+
// Might be a transient API failure, try again after a default interval.
176+
executor.schedule(this::fetchTargets, DEFAULT_TARGET_INTERVAL_NANOS, TimeUnit.NANOSECONDS);
177+
return;
178+
}
179+
180+
long nextTargetFetchIntervalNanos =
181+
xrayRulesSampler.nextTargetFetchTimeNanos() - clock.nanoTime();
182+
fetchTargetsFuture =
183+
executor.schedule(this::fetchTargets, nextTargetFetchIntervalNanos, TimeUnit.NANOSECONDS);
184+
}
185+
124186
@Override
125187
public void close() {
126188
ScheduledFuture<?> pollFuture = this.pollFuture;

aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import static java.util.Objects.requireNonNull;
88

9+
import io.opentelemetry.sdk.common.Clock;
910
import io.opentelemetry.sdk.resources.Resource;
1011
import io.opentelemetry.sdk.trace.samplers.Sampler;
1112
import java.time.Duration;
@@ -19,6 +20,7 @@ public final class AwsXrayRemoteSamplerBuilder {
1920

2021
private final Resource resource;
2122

23+
private Clock clock = Clock.getDefault();
2224
private String endpoint = DEFAULT_ENDPOINT;
2325
private Sampler initialSampler = Sampler.parentBased(Sampler.traceIdRatioBased(0.05));
2426
private long pollingIntervalNanos = TimeUnit.SECONDS.toNanos(DEFAULT_POLLING_INTERVAL_SECS);
@@ -70,8 +72,19 @@ public AwsXrayRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) {
7072
return this;
7173
}
7274

75+
/**
76+
* Sets the {@link Clock} used for time measurements for sampling, such as rate limiting or quota
77+
* expiry.
78+
*/
79+
public AwsXrayRemoteSamplerBuilder setClock(Clock clock) {
80+
requireNonNull(clock, "clock");
81+
this.clock = clock;
82+
return this;
83+
}
84+
7385
/** Returns a {@link AwsXrayRemoteSampler} with the configuration of this builder. */
7486
public AwsXrayRemoteSampler build() {
75-
return new AwsXrayRemoteSampler(resource, endpoint, initialSampler, pollingIntervalNanos);
87+
return new AwsXrayRemoteSampler(
88+
resource, clock, endpoint, initialSampler, pollingIntervalNanos);
7689
}
7790
}

aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ abstract static class SamplingTargetDocument {
3535
@JsonCreator
3636
static SamplingTargetDocument create(
3737
@JsonProperty("FixedRate") double fixedRate,
38-
@JsonProperty("Interval") int intervalSecs,
39-
@JsonProperty("ReservoirQuota") int reservoirQuota,
38+
@JsonProperty("Interval") @Nullable Integer intervalSecs,
39+
@JsonProperty("ReservoirQuota") @Nullable Integer reservoirQuota,
4040
@JsonProperty("ReservoirQuotaTTL") @Nullable Date reservoirQuotaTtl,
4141
@JsonProperty("RuleName") String ruleName) {
4242
return new AutoValue_GetSamplingTargetsResponse_SamplingTargetDocument(
@@ -45,10 +45,14 @@ static SamplingTargetDocument create(
4545

4646
abstract double getFixedRate();
4747

48-
abstract int getIntervalSecs();
48+
@Nullable
49+
abstract Integer getIntervalSecs();
4950

50-
abstract int getReservoirQuota();
51+
@Nullable
52+
abstract Integer getReservoirQuota();
5153

54+
// Careful that this is a timestamp when the quota expires, not a duration as we'd normally
55+
// expect for a Time to live.
5256
@Nullable
5357
abstract Date getReservoirQuotaTtl();
5458

aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/RateLimitingSampler.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ final class RateLimitingSampler implements Sampler {
1919
private final RateLimiter limiter;
2020
private final int numPerSecond;
2121

22-
RateLimitingSampler(int numPerSecond) {
23-
this(numPerSecond, Clock.getDefault());
24-
}
25-
26-
// Visible for testing
2722
RateLimitingSampler(int numPerSecond, Clock clock) {
2823
limiter = new RateLimiter(numPerSecond, numPerSecond, clock);
2924
this.numPerSecond = numPerSecond;

0 commit comments

Comments
 (0)