Skip to content

Commit b912f8f

Browse files
initial testing implementation
1 parent 98ee412 commit b912f8f

File tree

8 files changed

+185
-3
lines changed

8 files changed

+185
-3
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Library Instrumentation for Apache Iceberg Version 1.8 and Higher
2+
3+
Provides OpenTelemetry instrumentation for [Apache Iceberg](https://iceberg.apache.org/).
4+
5+
## Quickstart
6+
7+
### Add These Dependencies to Your Project
8+
9+
Replace `OPENTELEMETRY_VERSION` with the [latest release](https://central.sonatype.com/artifact/io.opentelemetry.instrumentation/opentelemetry-iceberg-1.8).
10+
11+
For Maven, add to your `pom.xml` dependencies:
12+
13+
```xml
14+
<dependencies>
15+
<dependency>
16+
<groupId>io.opentelemetry.instrumentation</groupId>
17+
<artifactId>opentelemetry-iceberg-1.8</artifactId>
18+
<version>OPENTELEMETRY_VERSION</version>
19+
</dependency>
20+
</dependencies>
21+
```
22+
23+
For Gradle, add to your dependencies:
24+
25+
```groovy
26+
implementation("io.opentelemetry.instrumentation:opentelemetry-iceberg-1.8:OPENTELEMETRY_VERSION")
27+
```
28+
29+
### Usage
30+
31+
The instrumentation library allows creating instrumented `Scan` (e.g., `TableScan`) instances for collecting
32+
OpenTelemetry-based metrics for scans and commits. For example:
33+
34+
```java
35+
OpenTelemetry openTelemetry = ...
36+
IcebergTelemetry icebergTelemetry = IcebergTelemetry.create(openTelemetry);
37+
TableScan tableScan = icebergTelemetry.wrapScan(table.newScan());
38+
39+
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
40+
// ...
41+
}
42+
```
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
plugins {
22
id("otel.library-instrumentation")
3+
id("otel.nullaway-conventions")
34
}
45

56
dependencies {
67
library("org.apache.iceberg:iceberg-core:1.8.1")
8+
testImplementation(project(":instrumentation:iceberg-1.8:testing"))
79
}

instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
package io.opentelemetry.instrumentation.iceberg.v1_8;
77

8+
import org.apache.iceberg.Scan;
9+
import org.apache.iceberg.ScanTask;
10+
import org.apache.iceberg.ScanTaskGroup;
11+
812
import io.opentelemetry.api.OpenTelemetry;
9-
import org.apache.iceberg.TableScan;
1013

1114
public class IcebergTelemetry {
1215
private final OpenTelemetry openTelemetry;
@@ -19,7 +22,7 @@ public static IcebergTelemetry create(OpenTelemetry openTelemetry) {
1922
this.openTelemetry = openTelemetry;
2023
}
2124

22-
public TableScan wrapTableScan(TableScan tableScan) {
23-
return tableScan.metricsReporter(new IcebergMetricsReporter(openTelemetry));
25+
public <ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> ThisT wrapScan(Scan<ThisT, T, G> scan) {
26+
return scan.metricsReporter(new IcebergMetricsReporter(openTelemetry));
2427
}
2528
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.opentelemetry.instrumentation.iceberg.v1_8;
2+
3+
import org.apache.iceberg.TableScan;
4+
import org.junit.jupiter.api.extension.RegisterExtension;
5+
6+
import io.opentelemetry.api.OpenTelemetry;
7+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
8+
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
9+
10+
public class IcebergLibraryTest extends AbstractIcebergTest {
11+
@RegisterExtension
12+
final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
13+
14+
@Override
15+
protected InstrumentationExtension testing() {
16+
return this.testing;
17+
}
18+
19+
@Override
20+
protected TableScan configure(TableScan tableScan) {
21+
OpenTelemetry openTelemetry = testing.getOpenTelemetry();
22+
IcebergTelemetry icebergTelemetry = IcebergTelemetry.create(openTelemetry);
23+
return icebergTelemetry.wrapScan(tableScan);
24+
}
25+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
description: This standalone instrumentation enables metrics for Apache Iceberg scans and commits.
2+
library_link: https://iceberg.apache.org/
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
plugins {
2+
id("otel.java-conventions")
3+
}
4+
5+
dependencies {
6+
implementation("org.apache.iceberg:iceberg-core:1.8.1")
7+
implementation("org.apache.iceberg:iceberg-core:1.8.1") {
8+
artifact {
9+
classifier = "tests"
10+
}
11+
}
12+
implementation("org.apache.commons:commons-compress:1.26.2")
13+
api(project(":testing-common"))
14+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.opentelemetry.instrumentation.iceberg.v1_8;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5+
import static org.junit.jupiter.api.Assertions.assertNull;
6+
7+
import java.io.File;
8+
import java.io.IOException;
9+
import java.util.Arrays;
10+
11+
import org.apache.iceberg.DataFile;
12+
import org.apache.iceberg.DataFiles;
13+
import org.apache.iceberg.FileScanTask;
14+
import org.apache.iceberg.PartitionSpec;
15+
import org.apache.iceberg.Schema;
16+
import org.apache.iceberg.TableScan;
17+
import org.apache.iceberg.TestTables;
18+
import org.apache.iceberg.TestTables.TestTable;
19+
import org.apache.iceberg.io.CloseableIterable;
20+
import org.apache.iceberg.metrics.MetricsReport;
21+
import org.apache.iceberg.metrics.MetricsReporter;
22+
import org.apache.iceberg.types.Types.IntegerType;
23+
import org.apache.iceberg.types.Types.NestedField;
24+
import org.apache.iceberg.types.Types.StringType;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.io.TempDir;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
32+
33+
abstract class AbstractIcebergTest {
34+
protected abstract InstrumentationExtension testing();
35+
36+
protected abstract TableScan configure(TableScan tableScan);
37+
38+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTest.class);
39+
protected static final int FORMAT_VERSION = 2;
40+
protected static final Schema SCHEMA = new Schema(NestedField.required(3, "id", IntegerType.get()), NestedField.required(4, "data", StringType.get()));
41+
protected static final int BUCKETS_NUMBER = 16;
42+
protected static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
43+
protected static final DataFile FILE_1 = DataFiles.builder(SPEC).withPath("/path/to/data-a.parquet").withFileSizeInBytes(10L).withPartitionPath("data_bucket=0").withRecordCount(1L).build();
44+
protected static final DataFile FILE_2 = DataFiles.builder(SPEC).withPath("/path/to/data-b.parquet").withFileSizeInBytes(10L).withPartitionPath("data_bucket=1").withRecordCount(1L).withSplitOffsets(Arrays.asList(1L)).build();
45+
46+
@TempDir
47+
protected File tableDir = null;
48+
protected TestTable table;
49+
50+
@BeforeEach
51+
void init() {
52+
this.table = TestTables.create(this.tableDir, "test", SCHEMA, SPEC, FORMAT_VERSION);
53+
this.table.newFastAppend().appendFile(FILE_1).appendFile(FILE_2).commit();
54+
}
55+
56+
@Test
57+
void testCreateTelemetry() throws IOException {
58+
SimpleReporter reporter = new SimpleReporter();
59+
60+
TableScan scan = table.newScan()
61+
.select("id", "data");
62+
assertNotNull(scan);
63+
assertNull(reporter.report);
64+
65+
Schema projection = scan.schema();
66+
assertNotNull(projection);
67+
68+
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
69+
assertNotNull(tasks);
70+
int counter = 0;
71+
72+
for (FileScanTask fileTask : tasks) {
73+
LOGGER.info(fileTask.file().location());
74+
counter++;
75+
}
76+
77+
assertEquals(2, counter);
78+
assertNotNull(reporter.report);
79+
}
80+
}
81+
82+
static class SimpleReporter implements MetricsReporter {
83+
MetricsReport report;
84+
85+
@Override
86+
public void report(MetricsReport report) {
87+
LOGGER.error("I am invoked!");
88+
this.report = report;
89+
}
90+
91+
}
92+
93+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ include(":instrumentation:hikaricp-3.0:testing")
304304
include(":instrumentation:http-url-connection:javaagent")
305305
include(":instrumentation:hystrix-1.4:javaagent")
306306
include(":instrumentation:iceberg-1.8:library")
307+
include(":instrumentation:iceberg-1.8:testing")
307308
include(":instrumentation:influxdb-2.4:javaagent")
308309
include(":instrumentation:internal:internal-application-logger:bootstrap")
309310
include(":instrumentation:internal:internal-application-logger:javaagent")

0 commit comments

Comments
 (0)