42
42
import akka .stream .javadsl .Zip ;
43
43
import akka .util .ByteString ;
44
44
import com .arpnetworking .clusteraggregator .configuration .ClusterAggregatorConfiguration ;
45
+ import com .arpnetworking .clusteraggregator .http .Routes ;
46
+ import com .arpnetworking .clusteraggregator .models .AggregationMode ;
45
47
import com .arpnetworking .clusteraggregator .models .CombinedMetricData ;
46
48
import com .arpnetworking .metrics .aggregation .protocol .Messages ;
47
49
import com .arpnetworking .steno .Logger ;
48
50
import com .arpnetworking .steno .LoggerFactory ;
49
51
import com .arpnetworking .tsdcore .model .AggregatedData ;
50
52
import com .arpnetworking .tsdcore .model .AggregationMessage ;
53
+ import com .arpnetworking .tsdcore .model .AggregationRequest ;
51
54
import com .arpnetworking .tsdcore .model .FQDSN ;
52
55
import com .arpnetworking .tsdcore .model .PeriodicData ;
53
56
import com .arpnetworking .tsdcore .statistics .Statistic ;
61
64
import java .io .IOException ;
62
65
import java .time .Duration ;
63
66
import java .util .Collections ;
64
- import java .util .List ;
65
67
import java .util .Map ;
66
68
import java .util .Optional ;
67
69
import java .util .concurrent .CompletionStage ;
@@ -100,18 +102,27 @@ public HttpSourceActor(
100
102
@ Named ("host-emitter" ) final ActorRef emitter ,
101
103
final ClusterAggregatorConfiguration configuration ) {
102
104
105
+ _calculateAggregates = configuration .getCalculateClusterAggregations ();
106
+
103
107
final ActorRef self = self ();
104
- _sink = Sink .foreach (msg -> {
105
- final GeneratedMessageV3 generatedMessageV3 = msg .getMessage ();
106
- if (generatedMessageV3 instanceof Messages .StatisticSetRecord ) {
107
- final Messages .StatisticSetRecord statisticSetRecord =
108
- (Messages .StatisticSetRecord ) msg .getMessage ();
109
- if (configuration .getCalculateClusterAggregations ()) {
110
- shardRegion .tell (statisticSetRecord , self );
111
- }
112
- final Optional <PeriodicData > periodicData = buildPeriodicData (statisticSetRecord );
113
- if (periodicData .isPresent ()) {
114
- emitter .tell (periodicData .get (), self );
108
+ _sink = Sink .foreach (aggregationRequest -> {
109
+ final AggregationMode aggregationMode = aggregationRequest .getAggregationMode ();
110
+ for (final AggregationMessage aggregationMessage : aggregationRequest .getAggregationMessages ()) {
111
+ final GeneratedMessageV3 generatedMessageV3 = aggregationMessage .getMessage ();
112
+ if (generatedMessageV3 instanceof Messages .StatisticSetRecord ) {
113
+ final Messages .StatisticSetRecord statisticSetRecord =
114
+ (Messages .StatisticSetRecord ) aggregationMessage .getMessage ();
115
+
116
+ if (aggregationMode .shouldReaggregate ()) {
117
+ shardRegion .tell (statisticSetRecord , self );
118
+ }
119
+
120
+ if (aggregationMode .shouldPersist ()) {
121
+ final Optional <PeriodicData > periodicData = buildPeriodicData (statisticSetRecord );
122
+ if (periodicData .isPresent ()) {
123
+ emitter .tell (periodicData .get (), self );
124
+ }
125
+ }
115
126
}
116
127
}
117
128
});
@@ -130,33 +141,27 @@ public HttpSourceActor(
130
141
.reduce (ByteString ::concat )
131
142
.named ("getBody" );
132
143
133
- final Flow <HttpRequest , ImmutableMultimap <String , String >, NotUsed > getHeadersFlow = Flow .<HttpRequest >create ()
134
- .map (HttpRequest ::getHeaders )
135
- .map (HttpSourceActor ::createHeaderMultimap ) // Transform to array form
136
- .named ("getHeaders" );
137
-
138
- final Flow <Pair <ByteString , ImmutableMultimap <String , String >>, AggregationMessage , NotUsed > createAndParseFlow =
139
- Flow .<Pair <ByteString , ImmutableMultimap <String , String >>>create ()
144
+ final Flow <Pair <ByteString , HttpRequest >, AggregationRequest , NotUsed > createAndParseFlow =
145
+ Flow .<Pair <ByteString , HttpRequest >>create ()
140
146
.map (HttpSourceActor ::mapModel )
141
- .mapConcat (this ::parseRecords ) // Parse the json string into a record builder
147
+ .map (this ::parseRecords ) // Parse the json string into a record builder
142
148
// NOTE: this should be _parser::parse, but aspectj NPEs with that currently
143
149
.named ("createAndParseRequest" );
144
150
145
151
// Shapes
146
152
final UniformFanOutShape <HttpRequest , HttpRequest > split = builder .add (Broadcast .create (2 ));
147
153
148
154
final FlowShape <HttpRequest , ByteString > getBody = builder .add (getBodyFlow );
149
- final FlowShape <HttpRequest , ImmutableMultimap <String , String >> getHeaders = builder .add (getHeadersFlow );
150
155
final FanInShape2 <
151
156
ByteString ,
152
- ImmutableMultimap < String , String > ,
153
- Pair <ByteString , ImmutableMultimap < String , String > >> join = builder .add (Zip .create ());
154
- final FlowShape <Pair <ByteString , ImmutableMultimap < String , String >>, AggregationMessage > createRequest =
157
+ HttpRequest ,
158
+ Pair <ByteString , HttpRequest >> join = builder .add (Zip .create ());
159
+ final FlowShape <Pair <ByteString , HttpRequest >, AggregationRequest > createRequest =
155
160
builder .add (createAndParseFlow );
156
161
157
162
// Wire the shapes
158
163
builder .from (split .out (0 )).via (getBody ).toInlet (join .in0 ()); // Split to get the body bytes
159
- builder .from (split .out (1 )).via ( getHeaders ). toInlet (join .in1 ()); // Split to get the headers
164
+ builder .from (split .out (1 )).toInlet (join .in1 ()); // Pass the Akka HTTP request through
160
165
builder .from (join .out ()).toInlet (createRequest .in ()); // Join to create the Request and parse it
161
166
162
167
return FlowShape .of (split .in (), createRequest .out ());
@@ -201,22 +206,30 @@ public Receive createReceive() {
201
206
.build ();
202
207
}
203
208
204
- private static ImmutableMultimap <String , String > createHeaderMultimap (final Iterable <HttpHeader > headers ) {
209
+ private static com .arpnetworking .clusteraggregator .models .HttpRequest mapModel (
210
+ final Pair <ByteString , HttpRequest > pair ) {
205
211
final ImmutableMultimap .Builder <String , String > headersBuilder = ImmutableMultimap .builder ();
206
212
207
- for (final HttpHeader httpHeader : headers ) {
213
+ for (final HttpHeader httpHeader : pair . second (). getHeaders () ) {
208
214
headersBuilder .put (httpHeader .lowercaseName (), httpHeader .value ());
209
215
}
210
216
211
- return headersBuilder .build ();
217
+ return new com .arpnetworking .clusteraggregator .models .HttpRequest (
218
+ pair .second ().getUri ().path (),
219
+ headersBuilder .build (),
220
+ pair .first ());
212
221
}
213
222
214
- private static com .arpnetworking .clusteraggregator .models .HttpRequest mapModel (
215
- final Pair <ByteString , ImmutableMultimap <String , String >> pair ) {
216
- return new com .arpnetworking .clusteraggregator .models .HttpRequest (pair .second (), pair .first ());
217
- }
223
+ private AggregationRequest parseRecords (final com .arpnetworking .clusteraggregator .models .HttpRequest request ) throws IOException {
224
+ final AggregationMode aggregationMode ;
225
+ if (Routes .INCOMING_DATA_REAGGREGATE_V1_PATH .equals (request .getPath ())) {
226
+ aggregationMode = AggregationMode .REAGGREGATE ;
227
+ } else if (Routes .INCOMING_DATA_PERSIST_V1_PATH .equals (request .getPath ())) {
228
+ aggregationMode = AggregationMode .PERSIST ;
229
+ } else {
230
+ aggregationMode = _calculateAggregates ? AggregationMode .PERSIST_AND_REAGGREGATE : AggregationMode .PERSIST ;
231
+ }
218
232
219
- private List <AggregationMessage > parseRecords (final com .arpnetworking .clusteraggregator .models .HttpRequest request ) throws IOException {
220
233
final ImmutableList .Builder <AggregationMessage > recordsBuilder = ImmutableList .builder ();
221
234
ByteString current = request .getBody ();
222
235
Optional <AggregationMessage > messageOptional = AggregationMessage .deserialize (current );
@@ -234,7 +247,10 @@ private List<AggregationMessage> parseRecords(final com.arpnetworking.clusteragg
234
247
if (records .size () == 0 ) {
235
248
throw new NoRecordsException ();
236
249
}
237
- return records ;
250
+ return new AggregationRequest .Builder ()
251
+ .setAggregationMode (aggregationMode )
252
+ .setAggregationMessages (recordsBuilder .build ())
253
+ .build ();
238
254
}
239
255
240
256
private Optional <PeriodicData > buildPeriodicData (final Messages .StatisticSetRecord setRecord ) {
@@ -299,13 +315,13 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
299
315
.build ());
300
316
}
301
317
318
+ private final boolean _calculateAggregates ;
302
319
private final Materializer _materializer ;
303
- private final Sink <AggregationMessage , CompletionStage <Done >> _sink ;
304
- private final Graph <FlowShape <HttpRequest , AggregationMessage >, NotUsed > _processGraph ;
320
+ private final Sink <AggregationRequest , CompletionStage <Done >> _sink ;
321
+ private final Graph <FlowShape <HttpRequest , AggregationRequest >, NotUsed > _processGraph ;
305
322
306
323
private static final Logger BAD_REQUEST_LOGGER =
307
324
LoggerFactory .getRateLimitLogger (HttpSourceActor .class , Duration .ofSeconds (30 ));
308
- private static final Logger LOGGER = LoggerFactory .getLogger (HttpSourceActor .class );
309
325
310
326
311
327
private static class NoRecordsException extends IOException {
0 commit comments