55
66package io .opentelemetry .sdk .logs .export ;
77
8- import io .opentelemetry .api .common .AttributeKey ;
9- import io .opentelemetry .api .common .Attributes ;
10- import io .opentelemetry .api .metrics .LongCounter ;
11- import io .opentelemetry .api .metrics .Meter ;
128import io .opentelemetry .api .metrics .MeterProvider ;
139import io .opentelemetry .context .Context ;
1410import io .opentelemetry .sdk .common .CompletableResultCode ;
11+ import io .opentelemetry .sdk .common .InternalTelemetryVersion ;
12+ import io .opentelemetry .sdk .internal .ComponentId ;
1513import io .opentelemetry .sdk .internal .DaemonThreadFactory ;
1614import io .opentelemetry .sdk .logs .LogRecordProcessor ;
1715import io .opentelemetry .sdk .logs .ReadWriteLogRecord ;
2624import java .util .concurrent .atomic .AtomicBoolean ;
2725import java .util .concurrent .atomic .AtomicInteger ;
2826import java .util .concurrent .atomic .AtomicReference ;
27+ import java .util .function .Supplier ;
2928import java .util .logging .Level ;
3029import java .util .logging .Logger ;
3130
4241 */
4342public final class BatchLogRecordProcessor implements LogRecordProcessor {
4443
44+ private static final ComponentId COMPONENT_ID =
45+ ComponentId .generateLazy ("batching_log_processor" );
46+
4547 private static final String WORKER_THREAD_NAME =
4648 BatchLogRecordProcessor .class .getSimpleName () + "_WorkerThread" ;
47- private static final AttributeKey <String > LOG_RECORD_PROCESSOR_TYPE_LABEL =
48- AttributeKey .stringKey ("processorType" );
49- private static final AttributeKey <Boolean > LOG_RECORD_PROCESSOR_DROPPED_LABEL =
50- AttributeKey .booleanKey ("dropped" );
51- private static final String LOG_RECORD_PROCESSOR_TYPE_VALUE =
52- BatchLogRecordProcessor .class .getSimpleName ();
5349
5450 private final Worker worker ;
5551 private final AtomicBoolean isShutdown = new AtomicBoolean (false );
@@ -67,7 +63,8 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
6763
6864 BatchLogRecordProcessor (
6965 LogRecordExporter logRecordExporter ,
70- MeterProvider meterProvider ,
66+ Supplier <MeterProvider > meterProvider ,
67+ InternalTelemetryVersion telemetryVersion ,
7168 long scheduleDelayNanos ,
7269 int maxQueueSize ,
7370 int maxExportBatchSize ,
@@ -76,10 +73,12 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
7673 new Worker (
7774 logRecordExporter ,
7875 meterProvider ,
76+ telemetryVersion ,
7977 scheduleDelayNanos ,
8078 maxExportBatchSize ,
8179 exporterTimeoutNanos ,
82- new ArrayBlockingQueue <>(maxQueueSize )); // TODO: use JcTools.newFixedSizeQueue(..)
80+ new ArrayBlockingQueue <>(maxQueueSize ),
81+ maxQueueSize ); // TODO: use JcTools.newFixedSizeQueue(..)
8382 Thread workerThread = new DaemonThreadFactory (WORKER_THREAD_NAME ).newThread (worker );
8483 workerThread .start ();
8584 }
@@ -140,9 +139,7 @@ private static final class Worker implements Runnable {
140139
141140 private static final Logger logger = Logger .getLogger (Worker .class .getName ());
142141
143- private final LongCounter processedLogsCounter ;
144- private final Attributes droppedAttrs ;
145- private final Attributes exportedAttrs ;
142+ private final LogRecordProcessorInstrumentation logProcessorInstrumentation ;
146143
147144 private final LogRecordExporter logRecordExporter ;
148145 private final long scheduleDelayNanos ;
@@ -163,59 +160,34 @@ private static final class Worker implements Runnable {
163160 private final AtomicReference <CompletableResultCode > flushRequested = new AtomicReference <>();
164161 private volatile boolean continueWork = true ;
165162 private final ArrayList <LogRecordData > batch ;
163+ private final long maxQueueSize ;
166164
167165 private Worker (
168166 LogRecordExporter logRecordExporter ,
169- MeterProvider meterProvider ,
167+ Supplier <MeterProvider > meterProvider ,
168+ InternalTelemetryVersion telemetryVersion ,
170169 long scheduleDelayNanos ,
171170 int maxExportBatchSize ,
172171 long exporterTimeoutNanos ,
173- Queue <ReadWriteLogRecord > queue ) {
172+ Queue <ReadWriteLogRecord > queue ,
173+ long maxQueueSize ) {
174174 this .logRecordExporter = logRecordExporter ;
175175 this .scheduleDelayNanos = scheduleDelayNanos ;
176176 this .maxExportBatchSize = maxExportBatchSize ;
177177 this .exporterTimeoutNanos = exporterTimeoutNanos ;
178178 this .queue = queue ;
179179 this .signal = new ArrayBlockingQueue <>(1 );
180- Meter meter = meterProvider .meterBuilder ("io.opentelemetry.sdk.logs" ).build ();
181- meter
182- .gaugeBuilder ("queueSize" )
183- .ofLongs ()
184- .setDescription ("The number of items queued" )
185- .setUnit ("1" )
186- .buildWithCallback (
187- result ->
188- result .record (
189- queue .size (),
190- Attributes .of (
191- LOG_RECORD_PROCESSOR_TYPE_LABEL , LOG_RECORD_PROCESSOR_TYPE_VALUE )));
192- processedLogsCounter =
193- meter
194- .counterBuilder ("processedLogs" )
195- .setUnit ("1" )
196- .setDescription (
197- "The number of logs processed by the BatchLogRecordProcessor. "
198- + "[dropped=true if they were dropped due to high throughput]" )
199- .build ();
200- droppedAttrs =
201- Attributes .of (
202- LOG_RECORD_PROCESSOR_TYPE_LABEL ,
203- LOG_RECORD_PROCESSOR_TYPE_VALUE ,
204- LOG_RECORD_PROCESSOR_DROPPED_LABEL ,
205- true );
206- exportedAttrs =
207- Attributes .of (
208- LOG_RECORD_PROCESSOR_TYPE_LABEL ,
209- LOG_RECORD_PROCESSOR_TYPE_VALUE ,
210- LOG_RECORD_PROCESSOR_DROPPED_LABEL ,
211- false );
180+ logProcessorInstrumentation =
181+ LogRecordProcessorInstrumentation .get (telemetryVersion , COMPONENT_ID , meterProvider );
182+ this .maxQueueSize = maxQueueSize ;
212183
213184 this .batch = new ArrayList <>(this .maxExportBatchSize );
214185 }
215186
216187 private void addLog (ReadWriteLogRecord logData ) {
188+ logProcessorInstrumentation .buildQueueMetricsOnce (maxQueueSize , queue ::size );
217189 if (!queue .offer (logData )) {
218- processedLogsCounter . add ( 1 , droppedAttrs );
190+ logProcessorInstrumentation . dropLogs ( 1 );
219191 } else {
220192 if (queue .size () >= logsNeeded .get ()) {
221193 signal .offer (true );
@@ -316,18 +288,24 @@ private void exportCurrentBatch() {
316288 return ;
317289 }
318290
291+ String error = null ;
319292 try {
320293 CompletableResultCode result =
321294 logRecordExporter .export (Collections .unmodifiableList (batch ));
322295 result .join (exporterTimeoutNanos , TimeUnit .NANOSECONDS );
323- if (result .isSuccess ()) {
324- processedLogsCounter .add (batch .size (), exportedAttrs );
325- } else {
296+ if (!result .isSuccess ()) {
326297 logger .log (Level .FINE , "Exporter failed" );
298+ if (result .getFailureThrowable () != null ) {
299+ error = result .getFailureThrowable ().getClass ().getName ();
300+ } else {
301+ error = "export_failed" ;
302+ }
327303 }
328304 } catch (RuntimeException e ) {
329305 logger .log (Level .WARNING , "Exporter threw an Exception" , e );
306+ error = e .getClass ().getName ();
330307 } finally {
308+ logProcessorInstrumentation .finishLogs (batch .size (), error );
331309 batch .clear ();
332310 }
333311 }
0 commit comments