|
5 | 5 |
|
6 | 6 | package io.opentelemetry.contrib.disk.buffering; |
7 | 7 |
|
8 | | -import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| 8 | +import static java.lang.Thread.sleep; |
9 | 9 | import static org.junit.jupiter.api.Assertions.assertEquals; |
10 | | -import static org.junit.jupiter.api.Assertions.assertFalse; |
11 | | -import static org.junit.jupiter.api.Assertions.assertTrue; |
| 10 | +import static org.mockito.Mockito.clearInvocations; |
12 | 11 | import static org.mockito.Mockito.mock; |
13 | | -import static org.mockito.Mockito.when; |
| 12 | +import static org.mockito.Mockito.verify; |
| 13 | +import static org.mockito.Mockito.verifyNoMoreInteractions; |
14 | 14 |
|
15 | 15 | import io.opentelemetry.api.logs.Logger; |
16 | 16 | import io.opentelemetry.api.metrics.Meter; |
17 | 17 | import io.opentelemetry.api.trace.Span; |
18 | 18 | import io.opentelemetry.api.trace.Tracer; |
19 | | -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; |
20 | | -import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterBuilder; |
21 | | -import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; |
22 | | -import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; |
23 | | -import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; |
24 | | -import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; |
25 | | -import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; |
26 | | -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; |
27 | | -import io.opentelemetry.sdk.common.Clock; |
28 | | -import io.opentelemetry.sdk.common.CompletableResultCode; |
| 19 | +import io.opentelemetry.contrib.disk.buffering.exporters.LogRecordToDiskExporter; |
| 20 | +import io.opentelemetry.contrib.disk.buffering.exporters.MetricToDiskExporter; |
| 21 | +import io.opentelemetry.contrib.disk.buffering.exporters.SpanToDiskExporter; |
| 22 | +import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback; |
| 23 | +import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; |
| 24 | +import io.opentelemetry.contrib.disk.buffering.storage.impl.FileLogRecordStorage; |
| 25 | +import io.opentelemetry.contrib.disk.buffering.storage.impl.FileMetricStorage; |
| 26 | +import io.opentelemetry.contrib.disk.buffering.storage.impl.FileSpanStorage; |
| 27 | +import io.opentelemetry.contrib.disk.buffering.storage.impl.FileStorageConfiguration; |
29 | 28 | import io.opentelemetry.sdk.logs.SdkLoggerProvider; |
30 | 29 | import io.opentelemetry.sdk.logs.data.LogRecordData; |
31 | 30 | import io.opentelemetry.sdk.logs.export.LogRecordExporter; |
|
34 | 33 | import io.opentelemetry.sdk.metrics.data.MetricData; |
35 | 34 | import io.opentelemetry.sdk.metrics.export.MetricExporter; |
36 | 35 | import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; |
37 | | -import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; |
38 | | -import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter; |
39 | | -import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; |
40 | 36 | import io.opentelemetry.sdk.trace.SdkTracerProvider; |
41 | 37 | import io.opentelemetry.sdk.trace.data.SpanData; |
42 | 38 | import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; |
43 | 39 | import io.opentelemetry.sdk.trace.export.SpanExporter; |
44 | 40 | import java.io.File; |
45 | 41 | import java.io.IOException; |
46 | | -import java.util.Collection; |
47 | | -import java.util.concurrent.TimeUnit; |
48 | | -import java.util.function.Function; |
49 | | -import java.util.function.Supplier; |
50 | | -import org.jetbrains.annotations.NotNull; |
| 42 | +import java.util.ArrayList; |
| 43 | +import java.util.List; |
51 | 44 | import org.junit.jupiter.api.AfterEach; |
52 | 45 | import org.junit.jupiter.api.BeforeEach; |
53 | 46 | import org.junit.jupiter.api.Test; |
54 | 47 | import org.junit.jupiter.api.io.TempDir; |
55 | 48 |
|
56 | 49 | public class IntegrationTest { |
57 | | - private InMemorySpanExporter memorySpanExporter; |
58 | 50 | private Tracer tracer; |
59 | | - private InMemoryMetricExporter memoryMetricExporter; |
60 | 51 | private SdkMeterProvider meterProvider; |
61 | 52 | private Meter meter; |
62 | | - private InMemoryLogRecordExporter memoryLogRecordExporter; |
63 | 53 | private Logger logger; |
64 | | - private Clock clock; |
65 | | - @TempDir File rootDir; |
66 | | - private static final long INITIAL_TIME_IN_MILLIS = 1000; |
67 | | - private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); |
68 | | - private StorageConfiguration storageConfig; |
69 | | - private Storage spanStorage; |
| 54 | + private SignalStorage.Span spanStorage; |
| 55 | + private SignalStorage.LogRecord logStorage; |
| 56 | + private SignalStorage.Metric metricStorage; |
| 57 | + private SpanToDiskExporter spanToDiskExporter; |
| 58 | + private MetricToDiskExporter metricToDiskExporter; |
| 59 | + private LogRecordToDiskExporter logToDiskExporter; |
| 60 | + private ExporterCallback callback; |
| 61 | + @TempDir private File rootDir; |
| 62 | + private static final long DELAY_BEFORE_READING_MILLIS = 500; |
70 | 63 |
|
71 | 64 | @BeforeEach |
72 | | - void setUp() throws IOException { |
73 | | - clock = mock(); |
74 | | - storageConfig = StorageConfiguration.getDefault(rootDir); |
75 | | - spanStorage = |
76 | | - Storage.builder(SignalTypes.spans) |
77 | | - .setStorageConfiguration(storageConfig) |
78 | | - .setStorageClock(clock) |
| 65 | + void setUp() { |
| 66 | + callback = mock(); |
| 67 | + FileStorageConfiguration storageConfig = |
| 68 | + FileStorageConfiguration.builder() |
| 69 | + .setMaxFileAgeForWriteMillis(DELAY_BEFORE_READING_MILLIS - 1) |
| 70 | + .setMinFileAgeForReadMillis(DELAY_BEFORE_READING_MILLIS) |
79 | 71 | .build(); |
80 | 72 |
|
81 | | - when(clock.now()).thenReturn(NOW_NANOS); |
82 | | - |
83 | 73 | // Setting up spans |
84 | | - memorySpanExporter = InMemorySpanExporter.create(); |
85 | | - ToDiskExporter<SpanData> toDiskSpanExporter = |
86 | | - buildToDiskExporter(SignalSerializer.ofSpans(), memorySpanExporter::export); |
87 | | - SpanToDiskExporter spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); |
| 74 | + spanStorage = FileSpanStorage.create(new File(rootDir, "spans"), storageConfig); |
| 75 | + spanToDiskExporter = |
| 76 | + SpanToDiskExporter.builder(spanStorage).setExporterCallback(callback).build(); |
88 | 77 | tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); |
89 | 78 |
|
90 | 79 | // Setting up metrics |
91 | | - memoryMetricExporter = InMemoryMetricExporter.create(); |
92 | | - ToDiskExporter<MetricData> toDiskMetricExporter = |
93 | | - buildToDiskExporter(SignalSerializer.ofMetrics(), memoryMetricExporter::export); |
94 | | - MetricToDiskExporter metricToDiskExporter = |
95 | | - new MetricToDiskExporter(toDiskMetricExporter, memoryMetricExporter); |
| 80 | + metricStorage = FileMetricStorage.create(new File(rootDir, "metrics"), storageConfig); |
| 81 | + metricToDiskExporter = |
| 82 | + MetricToDiskExporter.builder(metricStorage).setExporterCallback(callback).build(); |
96 | 83 | meterProvider = createMeterProvider(metricToDiskExporter); |
97 | 84 | meter = meterProvider.get("MetricInstrumentationScope"); |
98 | 85 |
|
99 | 86 | // Setting up logs |
100 | | - memoryLogRecordExporter = InMemoryLogRecordExporter.create(); |
101 | | - ToDiskExporter<LogRecordData> toDiskLogExporter = |
102 | | - buildToDiskExporter(SignalSerializer.ofLogs(), memoryLogRecordExporter::export); |
103 | | - LogRecordToDiskExporter logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); |
| 87 | + logStorage = FileLogRecordStorage.create(new File(rootDir, "logs"), storageConfig); |
| 88 | + logToDiskExporter = |
| 89 | + LogRecordToDiskExporter.builder(logStorage).setExporterCallback(callback).build(); |
104 | 90 | logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); |
105 | 91 | } |
106 | 92 |
|
107 | 93 | @AfterEach |
108 | 94 | void tearDown() throws IOException { |
| 95 | + // Closing span exporter |
| 96 | + spanToDiskExporter.shutdown(); |
| 97 | + verify(callback).onShutdown(SignalType.SPAN); |
| 98 | + verifyNoMoreInteractions(callback); |
| 99 | + |
| 100 | + // Closing log exporter |
| 101 | + clearInvocations(callback); |
| 102 | + logToDiskExporter.shutdown(); |
| 103 | + verify(callback).onShutdown(SignalType.LOG); |
| 104 | + verifyNoMoreInteractions(callback); |
| 105 | + |
| 106 | + // Closing metric exporter |
| 107 | + clearInvocations(callback); |
| 108 | + metricToDiskExporter.shutdown(); |
| 109 | + verify(callback).onShutdown(SignalType.METRIC); |
| 110 | + verifyNoMoreInteractions(callback); |
| 111 | + |
| 112 | + // Closing storages |
109 | 113 | spanStorage.close(); |
110 | | - } |
111 | | - |
112 | | - @NotNull |
113 | | - private <T> ToDiskExporter<T> buildToDiskExporter( |
114 | | - SignalSerializer<T> serializer, Function<Collection<T>, CompletableResultCode> exporter) { |
115 | | - return ToDiskExporter.<T>builder(spanStorage) |
116 | | - .setSerializer(serializer) |
117 | | - .setExportFunction(exporter) |
118 | | - .build(); |
119 | | - } |
120 | | - |
121 | | - @NotNull |
122 | | - private static <T> FromDiskExporterImpl<T> buildFromDiskExporter( |
123 | | - FromDiskExporterBuilder<T> builder, |
124 | | - Function<Collection<T>, CompletableResultCode> exportFunction, |
125 | | - SignalDeserializer<T> deserializer) |
126 | | - throws IOException { |
127 | | - return builder.setExportFunction(exportFunction).setDeserializer(deserializer).build(); |
| 114 | + logStorage.close(); |
| 115 | + metricStorage.close(); |
128 | 116 | } |
129 | 117 |
|
130 | 118 | @Test |
131 | | - void verifySpansIntegration() throws IOException { |
| 119 | + void verifyIntegration() throws InterruptedException { |
| 120 | + // Creating span |
132 | 121 | Span span = tracer.spanBuilder("Span name").startSpan(); |
133 | 122 | span.end(); |
134 | | - FromDiskExporterImpl<SpanData> fromDiskExporter = |
135 | | - buildFromDiskExporter( |
136 | | - FromDiskExporterImpl.builder(spanStorage), |
137 | | - memorySpanExporter::export, |
138 | | - SignalDeserializer.ofSpans()); |
139 | | - assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); |
140 | | - } |
141 | | - |
142 | | - @Test |
143 | | - void verifyMetricsIntegration() throws IOException { |
144 | | - meter.counterBuilder("Counter").build().add(2); |
| 123 | + verify(callback).onExportSuccess(SignalType.SPAN); |
| 124 | + verifyNoMoreInteractions(callback); |
| 125 | + |
| 126 | + // Creating log |
| 127 | + clearInvocations(callback); |
| 128 | + logger.logRecordBuilder().setBody("Log body").emit(); |
| 129 | + verify(callback).onExportSuccess(SignalType.LOG); |
| 130 | + verifyNoMoreInteractions(callback); |
| 131 | + |
| 132 | + // Creating metric |
| 133 | + clearInvocations(callback); |
| 134 | + meter.counterBuilder("counter").build().add(1); |
145 | 135 | meterProvider.forceFlush(); |
146 | | - |
147 | | - FromDiskExporterImpl<MetricData> fromDiskExporter = |
148 | | - buildFromDiskExporter( |
149 | | - FromDiskExporterImpl.builder(spanStorage), |
150 | | - memoryMetricExporter::export, |
151 | | - SignalDeserializer.ofMetrics()); |
152 | | - assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); |
153 | | - } |
154 | | - |
155 | | - @Test |
156 | | - void verifyLogRecordsIntegration() throws IOException { |
157 | | - logger.logRecordBuilder().setBody("I'm a log!").emit(); |
158 | | - |
159 | | - FromDiskExporterImpl<LogRecordData> fromDiskExporter = |
160 | | - buildFromDiskExporter( |
161 | | - FromDiskExporterImpl.builder(spanStorage), |
162 | | - memoryLogRecordExporter::export, |
163 | | - SignalDeserializer.ofLogs()); |
164 | | - assertExporter( |
165 | | - fromDiskExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size()); |
166 | | - } |
167 | | - |
168 | | - private <T> void assertExporter(FromDiskExporterImpl<T> exporter, Supplier<Integer> finishedItems) |
169 | | - throws IOException { |
170 | | - // Verify no data has been received in the original exporter until this point. |
171 | | - assertEquals(0, finishedItems.get()); |
172 | | - |
173 | | - // Go to the future when we can read the stored items. |
174 | | - fastForwardTimeByMillis(storageConfig.getMinFileAgeForReadMillis()); |
175 | | - |
176 | | - // Read and send stored data. |
177 | | - assertTrue(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); |
178 | | - |
179 | | - // Now the data must have been delegated to the original exporter. |
180 | | - assertEquals(1, finishedItems.get()); |
181 | | - |
182 | | - // Bonus: Try to read again, no more data should be available. |
183 | | - assertFalse(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); |
184 | | - assertEquals(1, finishedItems.get()); |
185 | | - } |
186 | | - |
187 | | - @SuppressWarnings("DirectInvocationOnMock") |
188 | | - private void fastForwardTimeByMillis(long milliseconds) { |
189 | | - when(clock.now()).thenReturn(NOW_NANOS + MILLISECONDS.toNanos(milliseconds)); |
| 136 | + verify(callback).onExportSuccess(SignalType.METRIC); |
| 137 | + verifyNoMoreInteractions(callback); |
| 138 | + |
| 139 | + // Waiting for read time |
| 140 | + sleep(DELAY_BEFORE_READING_MILLIS); |
| 141 | + |
| 142 | + // Read |
| 143 | + List<SpanData> storedSpans = new ArrayList<>(); |
| 144 | + List<LogRecordData> storedLogs = new ArrayList<>(); |
| 145 | + List<MetricData> storedMetrics = new ArrayList<>(); |
| 146 | + spanStorage.forEach(storedSpans::addAll); |
| 147 | + logStorage.forEach(storedLogs::addAll); |
| 148 | + metricStorage.forEach(storedMetrics::addAll); |
| 149 | + |
| 150 | + assertEquals(1, storedSpans.size()); |
| 151 | + assertEquals(1, storedLogs.size()); |
| 152 | + assertEquals(1, storedMetrics.size()); |
190 | 153 | } |
191 | 154 |
|
192 | 155 | private static SdkTracerProvider createTracerProvider(SpanExporter exporter) { |
|
0 commit comments