2626import com .azure .core .http .HttpMethod ;
2727import com .azure .core .http .HttpPipeline ;
2828import com .azure .core .http .HttpRequest ;
29+ import com .azure .core .http .HttpResponse ;
2930import com .azure .core .util .Context ;
3031import com .azure .core .util .tracing .Tracer ;
3132import com .fasterxml .jackson .annotation .JsonInclude ;
3233import com .fasterxml .jackson .core .JsonGenerator ;
34+ import com .fasterxml .jackson .core .JsonProcessingException ;
3335import com .fasterxml .jackson .core .io .SerializedString ;
36+ import com .fasterxml .jackson .databind .JsonNode ;
3437import com .fasterxml .jackson .databind .ObjectMapper ;
3538import com .fasterxml .jackson .databind .SerializationFeature ;
3639import com .microsoft .applicationinsights .agent .internal .common .NetworkFriendlyExceptions ;
5356import java .util .List ;
5457import java .util .Map ;
5558import java .util .concurrent .atomic .AtomicBoolean ;
59+ import java .util .function .Consumer ;
5660import java .util .zip .GZIPOutputStream ;
5761import javax .annotation .Nullable ;
5862import org .slf4j .Logger ;
5963import org .slf4j .LoggerFactory ;
6064import reactor .core .publisher .Flux ;
65+ import reactor .core .publisher .Mono ;
6166
6267// TODO performance testing
6368public class TelemetryChannel {
@@ -68,14 +73,17 @@ public class TelemetryChannel {
6873
6974 private static final AppInsightsByteBufferPool byteBufferPool = new AppInsightsByteBufferPool ();
7075
76+ // TODO (heya) should we suppress logging statsbeat telemetry ingestion issues?
7177 private static final OperationLogger operationLogger =
78+ new OperationLogger (TelemetryChannel .class , "Sending telemetry to the ingestion service" );
79+
80+ private static final OperationLogger retryOperationLogger =
7281 new OperationLogger (
73- TelemetryChannel .class ,
74- "Sending telemetry to the ingestion service (telemetry will be stored to disk on failure and retried later)" );
82+ TelemetryChannel .class , "Sending telemetry to the ingestion service (retry)" );
7583
7684 // TODO (kryalama) do we still need this AtomicBoolean, or can we use throttling built in to the
7785 // operationLogger?
78- private static final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean ();
86+ private final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean ();
7987
8088 @ SuppressWarnings ("CatchAndPrintStackTrace" )
8189 private static ObjectMapper createObjectMapper () {
@@ -91,7 +99,7 @@ private static ObjectMapper createObjectMapper() {
9199
92100 private final HttpPipeline pipeline ;
93101 private final URL endpointUrl ;
94- @ Nullable private final LocalFileWriter localFileWriter ;
102+ private final LocalFileWriter localFileWriter ;
95103 private final StatsbeatModule statsbeatModule ;
96104 private final boolean isStatsbeat ;
97105
@@ -107,8 +115,13 @@ public static TelemetryChannel create(
107115 httpPipeline , endpointUrl , localFileWriter , statsbeatModule , isStatsbeat );
108116 }
109117
110- public CompletableResultCode sendRawBytes (ByteBuffer buffer , String instrumentationKey ) {
111- return internalSend (singletonList (buffer ), instrumentationKey , true );
118+ public CompletableResultCode sendRawBytes (
119+ ByteBuffer buffer ,
120+ String instrumentationKey ,
121+ Runnable onSuccess ,
122+ Consumer <Boolean > onFailure ) {
123+ return internalSend (
124+ singletonList (buffer ), instrumentationKey , onSuccess , onFailure , retryOperationLogger );
112125 }
113126
114127 // used by tests only
@@ -153,7 +166,15 @@ public CompletableResultCode internalSendByInstrumentationKey(
153166 return CompletableResultCode .ofFailure ();
154167 }
155168 try {
156- return internalSend (byteBuffers , instrumentationKey , false );
169+ return internalSend (
170+ byteBuffers ,
171+ instrumentationKey ,
172+ () -> byteBufferPool .offer (byteBuffers ),
173+ retryable -> {
174+ localFileWriter .writeToDisk (byteBuffers , instrumentationKey );
175+ byteBufferPool .offer (byteBuffers );
176+ },
177+ operationLogger );
157178 } catch (Throwable t ) {
158179 operationLogger .recordFailure ("Error sending telemetry items: " + t .getMessage (), t );
159180 return CompletableResultCode .ofFailure ();
@@ -201,7 +222,11 @@ private static void writeTelemetryItems(JsonGenerator jg, List<TelemetryItem> te
201222 * sent as {@code List<ByteBuffer>}. Persisted telemetries will be sent as byte[]
202223 */
203224 private CompletableResultCode internalSend (
204- List <ByteBuffer > byteBuffers , String instrumentationKey , boolean persisted ) {
225+ List <ByteBuffer > byteBuffers ,
226+ String instrumentationKey ,
227+ Runnable onSuccess ,
228+ Consumer <Boolean > onFailure ,
229+ OperationLogger operationLogger ) {
205230 HttpRequest request = new HttpRequest (HttpMethod .POST , endpointUrl );
206231
207232 request .setBody (Flux .fromIterable (byteBuffers ));
@@ -234,115 +259,141 @@ private CompletableResultCode internalSend(
234259 pipeline
235260 .send (request , Context .of (contextKeyValues ))
236261 .subscribe (
237- response -> {
238- parseResponseCode (
239- response .getStatusCode (), instrumentationKey , byteBuffers , persisted );
240- LazyHttpClient .consumeResponseBody (response );
241- if (!isStatsbeat ) {
242- if (response .getStatusCode () == 200 ) {
243- statsbeatModule
244- .getNetworkStatsbeat ()
245- .incrementRequestSuccessCount (
246- System .currentTimeMillis () - startTime , instrumentationKey );
247- } else {
248- statsbeatModule
249- .getNetworkStatsbeat ()
250- .incrementRequestFailureCount (instrumentationKey );
251- }
252- }
253- if (!persisted ) {
254- // persisted byte buffers don't come from the pool so shouldn't go back to the pool
255- byteBufferPool .offer (byteBuffers );
256- }
257- if (response .getStatusCode () == 200 ) {
258- result .succeed ();
259- } else {
260- result .fail ();
261- }
262- },
263- error -> {
264- // AMPLS
265- if (isStatsbeat && error instanceof UnknownHostException ) {
266- // when sending a Statsbeat request and server returns an UnknownHostException, it's
267- // likely that
268- // it's using a virtual network. In that case, we use the kill-switch to turn off
269- // Statsbeat.
270- statsbeatModule .shutdown ();
271- } else {
272- if (!NetworkFriendlyExceptions .logSpecialOneTimeFriendlyException (
273- error , endpointUrl .toString (), friendlyExceptionThrown , logger )) {
274- operationLogger .recordFailure (
275- "Error sending telemetry items: " + error .getMessage (), error );
276- }
277-
278- if (!isStatsbeat ) {
279- statsbeatModule
280- .getNetworkStatsbeat ()
281- .incrementRequestFailureCount (instrumentationKey );
282- }
283- // no need to write to disk again when failing to send raw bytes from the persisted
284- // file
285- if (!persisted ) {
286- writeToDiskOnFailure (byteBuffers , instrumentationKey );
287- }
288- }
289-
290- if (!persisted ) {
291- // persisted byte buffers don't come from the pool so shouldn't go back to the pool
292- byteBufferPool .offer (byteBuffers );
293- }
294- result .fail ();
295- });
262+ responseHandler (
263+ instrumentationKey ,
264+ startTime ,
265+ () -> {
266+ onSuccess .run ();
267+ result .succeed ();
268+ },
269+ retryable -> {
270+ onFailure .accept (retryable );
271+ result .fail ();
272+ },
273+ operationLogger ),
274+ errorHandler (
275+ instrumentationKey ,
276+ retryable -> {
277+ onFailure .accept (retryable );
278+ result .fail ();
279+ },
280+ operationLogger ));
296281 return result ;
297282 }
298283
299- private void writeToDiskOnFailure (List <ByteBuffer > byteBuffers , String instrumentationKey ) {
300- if (localFileWriter != null ) {
301- localFileWriter .writeToDisk (byteBuffers , instrumentationKey );
284+ private Consumer <HttpResponse > responseHandler (
285+ String instrumentationKey ,
286+ long startTime ,
287+ Runnable onSuccess ,
288+ Consumer <Boolean > onFailure ,
289+ OperationLogger operationLogger ) {
290+
291+ return response ->
292+ response
293+ .getBodyAsString ()
294+ .switchIfEmpty (Mono .just ("" ))
295+ .subscribe (
296+ body -> {
297+ int statusCode = response .getStatusCode ();
298+ switch (statusCode ) {
299+ case 200 : // SUCCESS
300+ operationLogger .recordSuccess ();
301+ onSuccess .run ();
302+ break ;
303+ case 206 : // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS
304+ operationLogger .recordFailure (
305+ getErrorMessageFromPartialSuccessResponse (body ));
306+ onFailure .accept (false );
307+ break ;
308+ case 401 : // breeze returns if aad enabled and no authentication token provided
309+ case 403 : // breeze returns if aad enabled or disabled (both cases) and
310+ // wrong/expired credentials provided
311+ case 408 : // REQUEST TIMEOUT
312+ case 429 : // TOO MANY REQUESTS
313+ case 500 : // INTERNAL SERVER ERROR
314+ case 503 : // SERVICE UNAVAILABLE
315+ operationLogger .recordFailure (
316+ "received response code "
317+ + statusCode
318+ + " (telemetry will be stored to disk and retried later)" );
319+ onFailure .accept (true );
320+ break ;
321+ case 439 : // Breeze-specific: THROTTLED OVER EXTENDED TIME
322+ // TODO handle throttling
323+ operationLogger .recordFailure (
324+ "received response code 439 (throttled over extended time)" );
325+ onFailure .accept (false );
326+ break ;
327+ default :
328+ operationLogger .recordFailure ("received response code: " + statusCode );
329+ onFailure .accept (false );
330+ }
331+ if (!isStatsbeat ) {
332+ handleStatsbeatOnResponse (instrumentationKey , startTime , statusCode );
333+ }
334+ },
335+ exception -> {
336+ operationLogger .recordFailure ("exception retrieving response body" , exception );
337+ onFailure .accept (false );
338+ });
339+ }
340+
341+ private void handleStatsbeatOnResponse (
342+ String instrumentationKey , long startTime , int statusCode ) {
343+ if (statusCode == 200 ) {
344+ statsbeatModule
345+ .getNetworkStatsbeat ()
346+ .incrementRequestSuccessCount (System .currentTimeMillis () - startTime , instrumentationKey );
347+ } else {
348+ statsbeatModule .getNetworkStatsbeat ().incrementRequestFailureCount (instrumentationKey );
349+ }
350+ if (statusCode == 439 ) {
351+ statsbeatModule .getNetworkStatsbeat ().incrementThrottlingCount (instrumentationKey );
302352 }
303353 }
304354
305- private void parseResponseCode (
306- int statusCode , String instrumentationKey , List < ByteBuffer > byteBuffers , boolean persisted ) {
307- switch ( statusCode ) {
308- case 401 : // UNAUTHORIZED
309- case 403 : // FORBIDDEN
310- logger . warn (
311- "Failed to send telemetry with status code:{}, please check your credentials" ,
312- statusCode );
313- // no need to write to disk again when failing to send raw bytes from the persisted file
314- if (! persisted ) {
315- writeToDiskOnFailure ( byteBuffers , instrumentationKey );
316- }
317- break ;
318- case 408 : // REQUEST TIMEOUT
319- case 500 : // INTERNAL SERVER ERROR
320- case 503 : // SERVICE UNAVAILABLE
321- case 429 : // TOO MANY REQUESTS
322- case 439 : // Breeze-specific: THROTTLED OVER EXTENDED TIME
323- // TODO handle throttling
324- // TODO (heya) track throttling count via Statsbeat
325- // instrumentationKey is null when sending persisted file's raw bytes.
326- if (! isStatsbeat ) {
327- statsbeatModule . getNetworkStatsbeat (). incrementThrottlingCount ( instrumentationKey );
328- }
329- break ;
330- case 200 : // SUCCESS
331- operationLogger . recordSuccess ();
332- break ;
333- case 206 : // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS
334- // TODO handle partial success
335- break ;
336- case 0 : // client-side exception
337- // TODO exponential backoff and retry to a limit
338- // TODO (heya) track failure count via Statsbeat
339- // instrumentationKey is null when sending persisted file's raw bytes.
340- if (! isStatsbeat ) {
341- statsbeatModule . getNetworkStatsbeat (). incrementRetryCount ( instrumentationKey );
342- }
343- break ;
344- default :
345- // ok
355+ private Consumer < Throwable > errorHandler (
356+ String instrumentationKey , Consumer < Boolean > onFailure , OperationLogger operationLogger ) {
357+
358+ return error -> {
359+ if ( isStatsbeat && error instanceof UnknownHostException ) {
360+ // when sending a Statsbeat request and server returns an UnknownHostException, it's
361+ // likely that it's using AMPLS. In that case, we use the kill-switch to turn off Statsbeat.
362+ statsbeatModule . shutdown ( );
363+ onFailure . accept ( false );
364+ return ;
365+ }
366+
367+ // TODO (trask) only log one-time friendly exception if no prior successes
368+ if (! NetworkFriendlyExceptions . logSpecialOneTimeFriendlyException (
369+ error , endpointUrl . toString (), friendlyExceptionThrown , logger )) {
370+ operationLogger . recordFailure (
371+ "Error sending telemetry items: " + error . getMessage (), error );
372+ }
373+
374+ if (! isStatsbeat ) {
375+ statsbeatModule . getNetworkStatsbeat (). incrementRequestFailureCount ( instrumentationKey );
376+ }
377+
378+ onFailure . accept ( true );
379+ } ;
380+ }
381+
382+ private static String getErrorMessageFromPartialSuccessResponse ( String body ) {
383+ JsonNode jsonNode ;
384+ try {
385+ jsonNode = new ObjectMapper (). readTree ( body ) ;
386+ } catch ( JsonProcessingException e ) {
387+ return "ingestion service returned 206, but could not parse response as json: " + body ;
388+ }
389+ List < JsonNode > errors = new ArrayList <>();
390+ jsonNode . get ( "errors" ). forEach ( errors :: add );
391+ StringBuilder message = new StringBuilder ( );
392+ message . append ( errors . get ( 0 ). get ( "message" ). asText ());
393+ int moreErrors = errors . size () - 1 ;
394+ if ( moreErrors > 0 ) {
395+ message . append ( " (and " ). append ( moreErrors ). append ( " more)" );
346396 }
397+ return message .toString ();
347398 }
348399}
0 commit comments