Skip to content

Commit f08297c

Browse files
authored
Record samplesDropped and samplesSent metrics in HttpSink (#133)
* record samplesSent and samplesDropped metrics in KDB sink
1 parent 31e5bc2 commit f08297c

File tree

13 files changed

+342
-165
lines changed

13 files changed

+342
-165
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2020 Dropbox
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.tsdcore.model;
17+
18+
import com.arpnetworking.commons.builder.OvalBuilder;
19+
import net.sf.oval.constraint.NotNull;
20+
import org.asynchttpclient.Request;
21+
22+
import java.time.Instant;
23+
import java.util.Optional;
24+
25+
/**
26+
* Contains the info for a http request.
27+
*
28+
* @author Qinyan Li (lqy520s at hotmail dot com)
29+
*/
30+
public final class RequestEntry {
31+
public Request getRequest() {
32+
return _request;
33+
}
34+
35+
public Instant getEnterTime() {
36+
return _enterTime;
37+
}
38+
39+
public Optional<Long> getPopulationSize() {
40+
return _populationSize;
41+
}
42+
43+
private RequestEntry(final Builder builder) {
44+
_request = builder._request;
45+
_enterTime = builder._enterTime;
46+
_populationSize = builder._populationSize;
47+
}
48+
49+
private final Request _request;
50+
private final Instant _enterTime;
51+
private final Optional<Long> _populationSize;
52+
53+
/**
54+
* {@link com.arpnetworking.commons.builder.Builder} implementation for
55+
* {@link RequestEntry}.
56+
*
57+
* TODO(ville): Convert RequestEntry.Builder would be a ThreadLocalBuilder
58+
* See comments in HttpPostSink:createRequests
59+
*/
60+
public static final class Builder extends OvalBuilder<RequestEntry> {
61+
62+
/**
63+
* Public constructor.
64+
*/
65+
public Builder() {
66+
super(RequestEntry::new);
67+
}
68+
69+
/**
70+
* Set the request. Required. Cannot be null.
71+
*
72+
* @param value The request.
73+
* @return This {@link Builder} instance.
74+
*/
75+
public Builder setRequest(final Request value) {
76+
_request = value;
77+
return this;
78+
}
79+
80+
/**
81+
* Set the time when the request enter the pending request queue. Required. Cannot be null.
82+
*
83+
* @param value The enter time.
84+
* @return This {@link Builder} instance.
85+
*/
86+
public Builder setEnterTime(final Instant value) {
87+
_enterTime = value;
88+
return this;
89+
}
90+
91+
/**
92+
* Set the population size of the request. Optional. Cannot be null.
93+
*
94+
* @param value The population size.
95+
* @return This {@link Builder} instance.
96+
*/
97+
public Builder setPopulationSize(final Optional<Long> value) {
98+
_populationSize = value;
99+
return this;
100+
}
101+
102+
@NotNull
103+
private Request _request;
104+
@NotNull
105+
private Instant _enterTime;
106+
@NotNull
107+
private Optional<Long> _populationSize = Optional.empty();
108+
}
109+
}
110+

src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Collection;
3838
import java.util.Collections;
3939
import java.util.List;
40+
import java.util.Optional;
4041

4142
/**
4243
* Publishes aggregations to Data Dog. This class is thread safe.
@@ -63,7 +64,7 @@ public Object toLogValue() {
6364
}
6465

6566
@Override
66-
protected Collection<byte[]> serialize(final PeriodicData periodicData) {
67+
protected Collection<SerializedDatum> serialize(final PeriodicData periodicData) {
6768
final String period = periodicData.getPeriod().toString();
6869
final long timestamp = (periodicData.getStart().toInstant().toEpochMilli()
6970
+ periodicData.getPeriod().toMillis()) / 1000;
@@ -94,7 +95,9 @@ protected Collection<byte[]> serialize(final PeriodicData periodicData) {
9495
.log();
9596
return Collections.emptyList();
9697
}
97-
return Collections.singletonList(dataDogDataAsJson.getBytes(Charsets.UTF_8));
98+
return Collections.singletonList(new SerializedDatum(
99+
dataDogDataAsJson.getBytes(Charsets.UTF_8),
100+
Optional.empty()));
98101
}
99102

100103
private static List<String> createTags(final PeriodicData periodicData, final AggregatedData datum) {

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.arpnetworking.steno.Logger;
2727
import com.arpnetworking.steno.LoggerFactory;
2828
import com.arpnetworking.tsdcore.model.PeriodicData;
29+
import com.arpnetworking.tsdcore.model.RequestEntry;
2930
import com.fasterxml.jackson.annotation.JacksonInject;
3031
import com.google.common.collect.Lists;
3132
import net.sf.oval.constraint.CheckWith;
@@ -43,6 +44,7 @@
4344
import java.net.URI;
4445
import java.time.Duration;
4546
import java.util.Collection;
47+
import java.util.Optional;
4648
import java.util.function.Function;
4749

4850
/**
@@ -67,11 +69,6 @@ public void close() {
6769
_sinkActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
6870
}
6971

70-
/**
71-
* Generate a Steno log compatible representation.
72-
*
73-
* @return Steno log compatible representation.
74-
*/
7572
@LogValue
7673
@Override
7774
public Object toLogValue() {
@@ -107,15 +104,21 @@ protected Request createRequest(final AsyncHttpClient client, final byte[] seria
107104
* @param periodicData The {@link PeriodicData} to be serialized.
108105
* @return The {@link Request} instance to execute.
109106
*/
110-
protected Collection<Request> createRequests(
107+
protected Collection<RequestEntry.Builder> createRequests(
111108
final AsyncHttpClient client,
112109
final PeriodicData periodicData) {
113-
final Collection<byte[]> serializedData = serialize(periodicData);
114-
final Collection<Request> requests = Lists.newArrayListWithExpectedSize(serializedData.size());
115-
for (final byte[] serializedDatum : serializedData) {
116-
requests.add(createRequest(client, serializedDatum));
110+
final Collection<SerializedDatum> serializedData = serialize(periodicData);
111+
final Collection<RequestEntry.Builder> requestEntryBuilders = Lists.newArrayListWithExpectedSize(serializedData.size());
112+
for (final SerializedDatum serializedDatum : serializedData) {
113+
// TODO(ville): Convert RequestEntry.Builder would be a ThreadLocalBuilder
114+
// Unfortunately, the split builder logic across HttpPostSink and
115+
// HttpSinkActor does not permit this as-is. The logic would need
116+
// to be refactored to permit the use of a TLB.
117+
requestEntryBuilders.add(new RequestEntry.Builder()
118+
.setRequest(createRequest(client, serializedDatum.getDatum()))
119+
.setPopulationSize(serializedDatum.getPopulationSize()));
117120
}
118-
return requests;
121+
return requestEntryBuilders;
119122
}
120123

121124
/**
@@ -169,7 +172,7 @@ protected Duration getRetryMaximumDelay() {
169172
* @param periodicData The {@link PeriodicData} to be serialized.
170173
* @return The serialized representation of {@link PeriodicData}.
171174
*/
172-
protected abstract Collection<byte[]> serialize(PeriodicData periodicData);
175+
protected abstract Collection<SerializedDatum> serialize(PeriodicData periodicData);
173176

174177
/**
175178
* Protected constructor.
@@ -376,4 +379,22 @@ public boolean isSatisfied(final Object validatedObject, final Object value) {
376379
}
377380
}
378381
}
382+
383+
static final class SerializedDatum {
384+
SerializedDatum(final byte[] datum, final Optional<Long> populationSize) {
385+
_datum = datum;
386+
_populationSize = populationSize;
387+
}
388+
389+
public byte[] getDatum() {
390+
return _datum;
391+
}
392+
393+
public Optional<Long> getPopulationSize() {
394+
return _populationSize;
395+
}
396+
397+
private final byte[] _datum;
398+
private final Optional<Long> _populationSize;
399+
}
379400
}

0 commit comments

Comments
 (0)