Skip to content

Commit 481753e

Browse files
author
Liudmila Molkova
authored
Onboard ingestion onto sync stack (Azure#33146)
* Onboard ingestion to sync stack
1 parent cae8e85 commit 481753e

16 files changed

+924
-226
lines changed

sdk/monitor/azure-monitor-ingestion/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ System.out.println("Logs uploaded successfully");
128128
### Upload custom logs with max concurrency
129129

130130
If the in input logs collection is too large, the client will split the input into multiple smaller requests. These
131-
requests are sent serially, by default, but by configuring the max concurrency in `UploadLogsOptions`, these requests
131+
requests are sent serially, by default, but by configuring the max concurrency in `LogsUploadOptions`, these requests
132132
can be concurrently sent to the service as shown in the example below.
133133

134134
```java readme-sample-uploadLogsWithMaxConcurrency

sdk/monitor/azure-monitor-ingestion/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
<javaModulesSurefireArgLine>
4949
--add-exports com.azure.core/com.azure.core.implementation.jackson=ALL-UNNAMED
5050
--add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
51+
--add-exports com.azure.monitor.ingestion/com.azure.monitor.ingestion.implementation=ALL-UNNAMED
5152
--add-opens com.azure.monitor.ingestion/com.azure.monitor.ingestion=ALL-UNNAMED
5253
</javaModulesSurefireArgLine>
5354
</properties>

sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionAsyncClient.java

Lines changed: 12 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,27 @@
1515
import com.azure.core.http.rest.Response;
1616
import com.azure.core.util.BinaryData;
1717
import com.azure.core.util.Context;
18-
import com.azure.core.util.logging.ClientLogger;
19-
import com.azure.core.util.serializer.JsonSerializer;
20-
import com.azure.core.util.serializer.JsonSerializerProviders;
21-
import com.azure.core.util.serializer.ObjectSerializer;
18+
import com.azure.monitor.ingestion.implementation.Batcher;
2219
import com.azure.monitor.ingestion.implementation.IngestionUsingDataCollectionRulesAsyncClient;
2320
import com.azure.monitor.ingestion.implementation.LogsIngestionRequest;
2421
import com.azure.monitor.ingestion.implementation.UploadLogsResponseHolder;
2522
import com.azure.monitor.ingestion.models.LogsUploadError;
2623
import com.azure.monitor.ingestion.models.LogsUploadException;
2724
import com.azure.monitor.ingestion.models.LogsUploadOptions;
28-
import com.fasterxml.jackson.core.JsonFactory;
29-
import com.fasterxml.jackson.core.JsonGenerator;
30-
import reactor.core.publisher.Flux;
31-
import reactor.core.publisher.FluxSink;
3225
import reactor.core.publisher.Mono;
3326
import reactor.core.publisher.SynchronousSink;
3427

35-
import java.io.ByteArrayOutputStream;
36-
import java.io.IOException;
37-
import java.io.UncheckedIOException;
38-
import java.nio.charset.StandardCharsets;
3928
import java.util.ArrayList;
4029
import java.util.Collections;
41-
import java.util.Iterator;
4230
import java.util.List;
4331
import java.util.Objects;
4432
import java.util.function.Consumer;
45-
import java.util.stream.Collectors;
46-
import java.util.zip.GZIPOutputStream;
4733

4834
import static com.azure.core.util.FluxUtil.withContext;
35+
import static com.azure.monitor.ingestion.implementation.Utils.CONTENT_ENCODING;
36+
import static com.azure.monitor.ingestion.implementation.Utils.GZIP;
37+
import static com.azure.monitor.ingestion.implementation.Utils.getConcurrency;
38+
import static com.azure.monitor.ingestion.implementation.Utils.gzipRequest;
4939

5040
/**
5141
* The asynchronous client for uploading logs to Azure Monitor.
@@ -62,12 +52,6 @@
6252
*/
6353
@ServiceClient(isAsync = true, builder = LogsIngestionClientBuilder.class)
6454
public final class LogsIngestionAsyncClient {
65-
private static final ClientLogger LOGGER = new ClientLogger(LogsIngestionAsyncClient.class);
66-
private static final String CONTENT_ENCODING = "Content-Encoding";
67-
private static final long MAX_REQUEST_PAYLOAD_SIZE = 1024 * 1024; // 1 MB
68-
private static final String GZIP = "gzip";
69-
private static final JsonSerializer DEFAULT_SERIALIZER = JsonSerializerProviders.createInstance(true);
70-
7155
private final IngestionUsingDataCollectionRulesAsyncClient service;
7256

7357
LogsIngestionAsyncClient(IngestionUsingDataCollectionRulesAsyncClient service) {
@@ -201,71 +185,15 @@ Mono<Void> upload(String ruleId, String streamName,
201185
*/
202186
private Mono<Void> splitAndUpload(String ruleId, String streamName, Iterable<Object> logs,
203187
LogsUploadOptions options, Context context) {
204-
// set concurrency to 1 as default
205-
int concurrency = 1;
206-
ObjectSerializer objectSerializer = DEFAULT_SERIALIZER;
207-
if (options != null) {
208-
if (options.getObjectSerializer() != null) {
209-
objectSerializer = options.getObjectSerializer();
210-
}
211-
if (options.getMaxConcurrency() != null) {
212-
concurrency = options.getMaxConcurrency();
213-
}
214-
}
215188

216-
ObjectSerializer serializer = objectSerializer;
217-
Iterator<Object> iterator = logs.iterator();
189+
int concurrency = getConcurrency(options);
218190

219-
return Flux.<LogsIngestionRequest>create(emitter -> createHttpRequest(serializer, iterator, emitter))
220-
.flatMapSequential(request -> uploadToService(ruleId, streamName, context, request), concurrency)
221-
.<LogsUploadException>handle((responseHolder, sink) -> processResponse(options, responseHolder, sink))
222-
.collectList()
223-
.handle((result, sink) -> processExceptions(result, sink));
224-
}
225-
226-
/**
227-
* This method converts raw {@link Object} logs into serialized and gzipped byte array that forms the request
228-
* body of the service call.
229-
* @param serializer The serializer used to serialize logs.
230-
* @param iterator The input logs interator.
231-
* @param emitter The Flux emitter to which the HTTP request is emitted.
232-
*/
233-
private void createHttpRequest(ObjectSerializer serializer, Iterator<Object> iterator, FluxSink<LogsIngestionRequest> emitter) {
234-
try {
235-
long currentBatchSize = 0;
236-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
237-
JsonGenerator generator = JsonFactory.builder().build().createGenerator(byteArrayOutputStream);
238-
generator.writeStartArray();
239-
List<String> serializedLogs = new ArrayList<>();
240-
List<Object> originalLogsRequest = new ArrayList<>();
241-
while (iterator.hasNext()) {
242-
Object currentLog = iterator.next();
243-
byte[] bytes = serializer.serializeToBytes(currentLog);
244-
int currentLogSize = bytes.length;
245-
currentBatchSize += currentLogSize;
246-
if (currentBatchSize > MAX_REQUEST_PAYLOAD_SIZE) {
247-
writeLogsAndCloseJsonGenerator(generator, serializedLogs);
248-
byte[] zippedRequestBody = gzipRequest(byteArrayOutputStream.toByteArray());
249-
emitter.next(new LogsIngestionRequest(originalLogsRequest, zippedRequestBody));
250-
byteArrayOutputStream = new ByteArrayOutputStream();
251-
generator = JsonFactory.builder().build().createGenerator(byteArrayOutputStream);
252-
generator.writeStartArray();
253-
currentBatchSize = currentLogSize;
254-
originalLogsRequest = new ArrayList<>();
255-
serializedLogs.clear();
256-
}
257-
serializedLogs.add(new String(bytes, StandardCharsets.UTF_8));
258-
originalLogsRequest.add(currentLog);
259-
}
260-
if (currentBatchSize > 0) {
261-
writeLogsAndCloseJsonGenerator(generator, serializedLogs);
262-
byte[] zippedRequestBody = gzipRequest(byteArrayOutputStream.toByteArray());
263-
emitter.next(new LogsIngestionRequest(originalLogsRequest, zippedRequestBody));
264-
}
265-
emitter.complete();
266-
} catch (IOException e) {
267-
emitter.error(e);
268-
}
191+
return new Batcher(options, logs)
192+
.toFlux()
193+
.flatMapSequential(request -> uploadToService(ruleId, streamName, context, request), concurrency)
194+
.<LogsUploadException>handle((responseHolder, sink) -> processResponse(options, responseHolder, sink))
195+
.collectList()
196+
.handle((result, sink) -> processExceptions(result, sink));
269197
}
270198

271199
private void processExceptions(List<LogsUploadException> result, SynchronousSink<Void> sink) {
@@ -311,28 +239,4 @@ private Mono<UploadLogsResponseHolder> uploadToService(String ruleId, String str
311239
.onErrorResume(HttpResponseException.class,
312240
ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, ex)));
313241
}
314-
315-
private void writeLogsAndCloseJsonGenerator(JsonGenerator generator, List<String> serializedLogs) throws IOException {
316-
generator.writeRaw(serializedLogs.stream()
317-
.collect(Collectors.joining(",")));
318-
generator.writeEndArray();
319-
generator.close();
320-
}
321-
322-
/**
323-
* Gzips the input byte array.
324-
* @param bytes The input byte array.
325-
* @return gzipped byte array.
326-
*/
327-
private byte[] gzipRequest(byte[] bytes) {
328-
// This should be moved to azure-core and should be enabled when the client library requests for gzipping the
329-
// request body content.
330-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
331-
try (GZIPOutputStream zip = new GZIPOutputStream(byteArrayOutputStream)) {
332-
zip.write(bytes);
333-
} catch (IOException exception) {
334-
throw LOGGER.logExceptionAsError(new UncheckedIOException(exception));
335-
}
336-
return byteArrayOutputStream.toByteArray();
337-
}
338242
}

sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClient.java

Lines changed: 127 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,35 @@
1010
import com.azure.core.exception.HttpResponseException;
1111
import com.azure.core.exception.ResourceModifiedException;
1212
import com.azure.core.exception.ResourceNotFoundException;
13+
import com.azure.core.http.HttpHeader;
1314
import com.azure.core.http.rest.RequestOptions;
1415
import com.azure.core.http.rest.Response;
1516
import com.azure.core.util.BinaryData;
1617
import com.azure.core.util.Context;
18+
import com.azure.core.util.logging.ClientLogger;
19+
import com.azure.monitor.ingestion.implementation.Batcher;
20+
import com.azure.monitor.ingestion.implementation.IngestionUsingDataCollectionRulesClient;
21+
import com.azure.monitor.ingestion.implementation.LogsIngestionRequest;
22+
import com.azure.monitor.ingestion.implementation.UploadLogsResponseHolder;
23+
import com.azure.monitor.ingestion.models.LogsUploadError;
24+
import com.azure.monitor.ingestion.models.LogsUploadException;
1725
import com.azure.monitor.ingestion.models.LogsUploadOptions;
1826

27+
import java.util.List;
28+
import java.util.Objects;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.function.Consumer;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.Stream;
34+
35+
import static com.azure.monitor.ingestion.implementation.Utils.CONTENT_ENCODING;
36+
import static com.azure.monitor.ingestion.implementation.Utils.GZIP;
37+
import static com.azure.monitor.ingestion.implementation.Utils.createThreadPool;
38+
import static com.azure.monitor.ingestion.implementation.Utils.getConcurrency;
39+
import static com.azure.monitor.ingestion.implementation.Utils.gzipRequest;
40+
import static com.azure.monitor.ingestion.implementation.Utils.registerShutdownHook;
41+
1942
/**
2043
* The synchronous client for uploading logs to Azure Monitor.
2144
*
@@ -30,12 +53,20 @@
3053
* <!-- end com.azure.monitor.ingestion.LogsIngestionClient.instantiation -->
3154
*/
3255
@ServiceClient(builder = LogsIngestionClientBuilder.class)
33-
public final class LogsIngestionClient {
56+
public final class LogsIngestionClient implements AutoCloseable {
57+
private static final ClientLogger LOGGER = new ClientLogger(LogsIngestionClient.class);
58+
private static final String HTTP_REST_PROXY_SYNC_PROXY_ENABLE = "com.azure.core.http.restproxy.syncproxy.enable";
59+
private static final Context ENABLE_SYNC_CONTEXT = new Context(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true);
60+
private final IngestionUsingDataCollectionRulesClient client;
3461

35-
private final LogsIngestionAsyncClient asyncClient;
62+
// dynamic thread pool that scales up and down on demand.
63+
private final ExecutorService threadPool;
64+
private final Thread shutdownHook;
3665

37-
LogsIngestionClient(LogsIngestionAsyncClient asyncClient) {
38-
this.asyncClient = asyncClient;
66+
LogsIngestionClient(IngestionUsingDataCollectionRulesClient client) {
67+
this.client = client;
68+
this.threadPool = createThreadPool();
69+
this.shutdownHook = registerShutdownHook(this.threadPool, 5);
3970
}
4071

4172
/**
@@ -61,7 +92,7 @@ public final class LogsIngestionClient {
6192
*/
6293
@ServiceMethod(returns = ReturnType.SINGLE)
6394
public void upload(String ruleId, String streamName, Iterable<Object> logs) {
64-
asyncClient.upload(ruleId, streamName, logs).block();
95+
upload(ruleId, streamName, logs, null);
6596
}
6697

6798
/**
@@ -90,7 +121,7 @@ public void upload(String ruleId, String streamName, Iterable<Object> logs) {
90121
@ServiceMethod(returns = ReturnType.SINGLE)
91122
public void upload(String ruleId, String streamName,
92123
Iterable<Object> logs, LogsUploadOptions options) {
93-
asyncClient.upload(ruleId, streamName, logs, options, Context.NONE).block();
124+
upload(ruleId, streamName, logs, options, Context.NONE);
94125
}
95126

96127
/**
@@ -111,7 +142,64 @@ public void upload(String ruleId, String streamName,
111142
@ServiceMethod(returns = ReturnType.SINGLE)
112143
public void upload(String ruleId, String streamName,
113144
Iterable<Object> logs, LogsUploadOptions options, Context context) {
114-
asyncClient.upload(ruleId, streamName, logs, options, context).block();
145+
Objects.requireNonNull(ruleId, "'ruleId' cannot be null.");
146+
Objects.requireNonNull(streamName, "'streamName' cannot be null.");
147+
Objects.requireNonNull(logs, "'logs' cannot be null.");
148+
149+
context = enableSync(context);
150+
151+
Consumer<LogsUploadError> uploadLogsErrorConsumer = options == null ? null : options.getLogsUploadErrorConsumer();
152+
153+
RequestOptions requestOptions = new RequestOptions();
154+
requestOptions.addHeader(CONTENT_ENCODING, GZIP);
155+
requestOptions.setContext(context);
156+
157+
Stream<UploadLogsResponseHolder> responses = new Batcher(options, logs)
158+
.toStream()
159+
.map(r -> uploadToService(ruleId, streamName, requestOptions, r));
160+
161+
responses = submit(responses, getConcurrency(options))
162+
.filter(response -> response.getException() != null);
163+
164+
if (uploadLogsErrorConsumer != null) {
165+
responses.forEach(response -> uploadLogsErrorConsumer.accept(new LogsUploadError(response.getException(), response.getRequest().getLogs())));
166+
return;
167+
}
168+
169+
final int[] failedLogCount = new int[1];
170+
List<HttpResponseException> exceptions = responses
171+
.map(response -> {
172+
failedLogCount[0] += response.getRequest().getLogs().size();
173+
return response.getException();
174+
})
175+
.collect(Collectors.toList());
176+
177+
if (exceptions.size() > 0) {
178+
throw LOGGER.logExceptionAsError(new LogsUploadException(exceptions, failedLogCount[0]));
179+
}
180+
}
181+
182+
private Stream<UploadLogsResponseHolder> submit(Stream<UploadLogsResponseHolder> responseStream, int concurrency) {
183+
if (concurrency == 1) {
184+
return responseStream;
185+
}
186+
187+
try {
188+
return threadPool.submit(() -> responseStream).get();
189+
} catch (InterruptedException | ExecutionException e) {
190+
throw LOGGER.logExceptionAsError(new RuntimeException(e));
191+
}
192+
}
193+
194+
private UploadLogsResponseHolder uploadToService(String ruleId, String streamName, RequestOptions requestOptions, LogsIngestionRequest request) {
195+
HttpResponseException exception = null;
196+
try {
197+
client.uploadWithResponse(ruleId, streamName, BinaryData.fromBytes(request.getRequestBody()), requestOptions);
198+
} catch (HttpResponseException ex) {
199+
exception = ex;
200+
}
201+
202+
return new UploadLogsResponseHolder(request, exception);
115203
}
116204

117205
/**
@@ -147,6 +235,37 @@ public void upload(String ruleId, String streamName,
147235
@ServiceMethod(returns = ReturnType.SINGLE)
148236
public Response<Void> uploadWithResponse(
149237
String ruleId, String streamName, BinaryData logs, RequestOptions requestOptions) {
150-
return asyncClient.uploadWithResponse(ruleId, streamName, logs, requestOptions).block();
238+
Objects.requireNonNull(ruleId, "'ruleId' cannot be null.");
239+
Objects.requireNonNull(streamName, "'streamName' cannot be null.");
240+
Objects.requireNonNull(logs, "'logs' cannot be null.");
241+
242+
if (requestOptions == null) {
243+
requestOptions = new RequestOptions();
244+
}
245+
246+
requestOptions.setContext(enableSync(requestOptions.getContext()));
247+
requestOptions.addRequestCallback(request -> {
248+
HttpHeader httpHeader = request.getHeaders().get(CONTENT_ENCODING);
249+
if (httpHeader == null) {
250+
BinaryData gzippedRequest = BinaryData.fromBytes(gzipRequest(logs.toBytes()));
251+
request.setBody(gzippedRequest);
252+
request.setHeader(CONTENT_ENCODING, GZIP);
253+
}
254+
});
255+
return client.uploadWithResponse(ruleId, streamName, logs, requestOptions);
256+
}
257+
258+
private static Context enableSync(Context context) {
259+
if (context == null || context == Context.NONE) {
260+
return ENABLE_SYNC_CONTEXT;
261+
}
262+
263+
return context.addData(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true);
264+
}
265+
266+
@Override
267+
public void close() {
268+
threadPool.shutdown();
269+
Runtime.getRuntime().removeShutdownHook(shutdownHook);
151270
}
152271
}

sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClientBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,13 @@ public LogsIngestionClientBuilder serviceVersion(LogsIngestionServiceVersion ser
189189
* @return A synchronous {@link LogsIngestionClient}.
190190
*/
191191
public LogsIngestionClient buildClient() {
192-
return new LogsIngestionClient(buildAsyncClient());
192+
if (endpoint == null) {
193+
throw LOGGER.logExceptionAsError(new IllegalStateException("endpoint is required to build the client."));
194+
}
195+
if (tokenCredential == null) {
196+
throw LOGGER.logExceptionAsError(new IllegalStateException("credential is required to build the client."));
197+
}
198+
return new LogsIngestionClient(innerLogBuilder.buildClient());
193199
}
194200

195201
/**

0 commit comments

Comments
 (0)