Skip to content

Commit 149e449

Browse files
committed
implemented OtelpMetricExporter
1 parent 6c1c95a commit 149e449

File tree

6 files changed

+280
-12
lines changed

6 files changed

+280
-12
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Xcode
22
#
33
# gitignore contributors: remember to update Global/Xcode.gitignore, Objective-C.gitignore & Swift.gitignore
4+
## Jetbrain
5+
.idea/
46

57
## User settings
68
xcuserdata/

Sources/Exporters/OpenTelemetryProtocol/metric/MetricsAdapter.swift

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,133 @@ import OpenTelemetrySdk
1818

1919

2020
struct MetricsAdapter {
21-
static func toProtoResourceMetrics(metricDataList: [MetricData]) -> [Opentelemetry_Proto_Metrics_V1_ResourceMetrics] {
22-
return [Opentelemetry_Proto_Metrics_V1_ResourceMetrics()]
21+
static func toProtoResourceMetrics(metricDataList: [Metric]) -> [Opentelemetry_Proto_Metrics_V1_ResourceMetrics] {
22+
let resourceAndLibraryMap = groupByResouceAndLibrary(metricDataList: metricDataList)
23+
var resourceMetrics = [Opentelemetry_Proto_Metrics_V1_ResourceMetrics]()
24+
25+
resourceAndLibraryMap.forEach { resMap in
26+
var instrumentationLibraryMetrics = [Opentelemetry_Proto_Metrics_V1_InstrumentationLibraryMetrics]()
27+
resMap.value.forEach { instLibrary in
28+
var protoInst =
29+
Opentelemetry_Proto_Metrics_V1_InstrumentationLibraryMetrics()
30+
protoInst.instrumentationLibrary =
31+
CommonAdapter.toProtoInstrumentationLibrary(instrumentationLibraryInfo: instLibrary.key)
32+
instLibrary.value.forEach {
33+
protoInst.metrics.append($0)
34+
}
35+
instrumentationLibraryMetrics.append(protoInst)
36+
}
37+
var resourceMetric = Opentelemetry_Proto_Metrics_V1_ResourceMetrics()
38+
resourceMetric.resource = ResourceAdapter.toProtoResource(resource: resMap.key)
39+
resourceMetric.instrumentationLibraryMetrics.append(contentsOf: instrumentationLibraryMetrics)
40+
resourceMetrics.append(resourceMetric)
41+
42+
}
43+
44+
45+
46+
return resourceMetrics
2347
}
2448

25-
private static func groupByLibrary(metricDataList: [MetricData]) -> [InstrumentationLibraryInfo : Opentelemetry_Proto_Metrics_V1_ResourceMetrics] {
49+
private static func groupByResouceAndLibrary(metricDataList: [Metric]) -> [Resource :[InstrumentationLibraryInfo : [Opentelemetry_Proto_Metrics_V1_Metric]]] {
50+
var results = [Resource : [InstrumentationLibraryInfo : [Opentelemetry_Proto_Metrics_V1_Metric]]]()
51+
52+
metricDataList.forEach {
53+
results[$0.resource, default:[InstrumentationLibraryInfo : [Opentelemetry_Proto_Metrics_V1_Metric]]()][$0.instrumentationLibraryInfo,default:[Opentelemetry_Proto_Metrics_V1_Metric]()]
54+
.append(toProtoMetric(metric: $0))
55+
}
56+
2657
return results
2758
}
2859

60+
static func toProtoMetric(metric: Metric) -> Opentelemetry_Proto_Metrics_V1_Metric {
61+
62+
var protoMetric = Opentelemetry_Proto_Metrics_V1_Metric()
63+
protoMetric.name = metric.name
64+
protoMetric.description_p = metric.description
65+
66+
67+
metric.data.forEach {
68+
switch metric.aggregationType {
69+
case .doubleSum:
70+
guard let sumData = $0 as? SumData<Double> else {
71+
break
72+
}
73+
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_DoubleDataPoint()
74+
protoDataPoint.value = sumData.sum
75+
sumData.labels.forEach {
76+
var kvp = Opentelemetry_Proto_Common_V1_StringKeyValue()
77+
kvp.key = $0.key
78+
kvp.value = $0.value
79+
protoDataPoint.labels.append(kvp)
80+
}
81+
82+
protoMetric.doubleSum.dataPoints.append(protoDataPoint)
83+
break
84+
case .doubleSummary:
85+
86+
guard let summaryData = $0 as? SummaryData<Double> else {
87+
break
88+
}
89+
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_DoubleHistogramDataPoint()
90+
protoDataPoint.sum = summaryData.sum
91+
protoDataPoint.count = UInt64(summaryData.count)
92+
protoDataPoint.explicitBounds = [summaryData.min, summaryData.max]
93+
94+
protoDataPoint.startTimeUnixNano = summaryData.startTimestamp.timeIntervalSince1970.toNanoseconds
95+
protoDataPoint.timeUnixNano = summaryData.timestamp.timeIntervalSince1970.toNanoseconds
96+
97+
summaryData.labels.forEach {
98+
var kvp = Opentelemetry_Proto_Common_V1_StringKeyValue()
99+
kvp.key = $0.key
100+
kvp.value = $0.value
101+
protoDataPoint.labels.append(kvp)
102+
}
103+
104+
protoMetric.doubleHistogram.dataPoints.append(protoDataPoint)
105+
106+
break
107+
case .intSum:
108+
guard let sumData = $0 as? SumData<Int> else {
109+
break;
110+
}
111+
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_IntDataPoint()
112+
protoDataPoint.value = Int64(sumData.sum)
113+
sumData.labels.forEach {
114+
var kvp = Opentelemetry_Proto_Common_V1_StringKeyValue()
115+
kvp.key = $0.key
116+
kvp.value = $0.value
117+
protoDataPoint.labels.append(kvp)
118+
}
119+
120+
protoMetric.intSum.dataPoints.append(protoDataPoint)
121+
122+
break
123+
case .intSummary:
124+
guard let summaryData = $0 as? SummaryData<Int> else {
125+
break
126+
}
127+
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_IntHistogramDataPoint()
128+
protoDataPoint.sum = Int64(summaryData.sum)
129+
protoDataPoint.count = UInt64(summaryData.count)
130+
// protoDataPoint.explicitBounds = [summaryData.min, summaryData.max]
131+
132+
protoDataPoint.startTimeUnixNano = summaryData.startTimestamp.timeIntervalSince1970.toNanoseconds
133+
protoDataPoint.timeUnixNano = summaryData.timestamp.timeIntervalSince1970.toNanoseconds
134+
135+
summaryData.labels.forEach {
136+
var kvp = Opentelemetry_Proto_Common_V1_StringKeyValue()
137+
kvp.key = $0.key
138+
kvp.value = $0.value
139+
protoDataPoint.labels.append(kvp)
140+
}
141+
142+
protoMetric.intHistogram.dataPoints.append(protoDataPoint)
143+
144+
break
145+
}
146+
}
147+
return protoMetric
148+
}
149+
29150
}

Sources/Exporters/OpenTelemetryProtocol/metric/OtelpMetricExporter.swift

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,41 @@ public class OtelpMetricExporter : MetricExporter {
2424
let metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceClient
2525
let deadlineMS : Int
2626

27-
public init(channel: GRPCChannel, timeoutMS: Int) {
27+
public init(channel: GRPCChannel, timeoutMS: Int = 0) {
2828
self.channel = channel
2929
self.deadlineMS = timeoutMS
3030
self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceClient(channel: self.channel)
3131
}
3232

3333

3434
public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
35-
return .success
35+
let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest
36+
.with {
37+
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: metrics)
38+
}
39+
40+
if deadlineMS > 0 {
41+
metricClient.defaultCallOptions.timeout = try! GRPCTimeout.milliseconds(deadlineMS)
42+
}
43+
44+
let export = metricClient.export(exportRequest)
45+
46+
47+
do {
48+
_ = try export.response.wait()
49+
return .success
50+
} catch {
51+
return .failureRetryable
52+
}
3653
}
3754

55+
public func flush() -> SpanExporterResultCode {
56+
return .success
57+
}
3858

59+
public func shutdown() {
60+
_ = channel.close()
61+
}
3962

4063

4164
}

Sources/OpenTelemetrySdk/Metrics/Export/Metric.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ import Foundation
1818
public struct Metric {
1919
public private(set) var namespace: String
2020
public private(set) var resource: Resource
21+
public private(set) var instrumentationLibraryInfo : InstrumentationLibraryInfo
2122
public private(set) var name: String
2223
public private(set) var description: String
2324
public private(set) var aggregationType: AggregationType
2425
public internal(set) var data = [MetricData]()
2526

26-
init(namespace: String, name: String, desc: String, type: AggregationType, resource: Resource) {
27+
init(namespace: String, name: String, desc: String, type: AggregationType, resource: Resource, instrumentationLibraryInfo: InstrumentationLibraryInfo) {
2728
self.namespace = namespace
29+
self.instrumentationLibraryInfo = instrumentationLibraryInfo
2830
self.name = name
2931
description = desc
3032
aggregationType = type

Sources/OpenTelemetrySdk/Metrics/MeterSdk.swift

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class MeterSdk: Meter {
2020
fileprivate let collectLock = Lock()
2121
let meterName: String
2222
var metricProcessor: MetricProcessor
23+
var instrumentationLibraryInfo : InstrumentationLibraryInfo
2324
var resource: Resource
2425

2526
var intCounters = [String: CounterMetricSdk<Int>]()
@@ -33,6 +34,7 @@ class MeterSdk: Meter {
3334
self.meterName = instrumentationLibraryInfo.name
3435
self.resource = meterSharedState.resource
3536
self.metricProcessor = meterSharedState.metricProcessor
37+
self.instrumentationLibraryInfo = instrumentationLibraryInfo
3638
}
3739

3840
func getLabelSet(labels: [String: String]) -> LabelSet {
@@ -47,7 +49,7 @@ class MeterSdk: Meter {
4749
let metricName = counter.key
4850
let counterInstrument = counter.value
4951

50-
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.intSum, resource: resource)
52+
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.intSum, resource: resource, instrumentationLibraryInfo: instrumentationLibraryInfo)
5153

5254
counterInstrument.boundInstruments.forEach { boundInstrument in
5355
let labelSet = boundInstrument.key
@@ -82,7 +84,7 @@ class MeterSdk: Meter {
8284
let metricName = counter.key
8385
let counterInstrument = counter.value
8486

85-
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.doubleSum, resource: resource)
87+
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.doubleSum, resource: resource, instrumentationLibraryInfo: instrumentationLibraryInfo)
8688

8789
counterInstrument.boundInstruments.forEach { boundInstrument in
8890
let labelSet = boundInstrument.key
@@ -117,7 +119,7 @@ class MeterSdk: Meter {
117119
intMeasures.forEach { measure in
118120
let metricName = measure.key
119121
let measureInstrument = measure.value
120-
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.intSummary, resource:resource)
122+
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.intSummary, resource: resource, instrumentationLibraryInfo: instrumentationLibraryInfo)
121123
measureInstrument.boundInstruments.forEach { boundInstrument in
122124
let labelSet = boundInstrument.key
123125
let aggregator = boundInstrument.value.getAggregator()
@@ -132,7 +134,7 @@ class MeterSdk: Meter {
132134
doubleMeasures.forEach { measure in
133135
let metricName = measure.key
134136
let measureInstrument = measure.value
135-
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.doubleSummary, resource: resource)
137+
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.doubleSummary, resource: resource, instrumentationLibraryInfo: instrumentationLibraryInfo)
136138
measureInstrument.boundInstruments.forEach { boundInstrument in
137139
let labelSet = boundInstrument.key
138140
let aggregator = boundInstrument.value.getAggregator()
@@ -147,7 +149,7 @@ class MeterSdk: Meter {
147149
intObservers.forEach { observer in
148150
let metricName = observer.key
149151
let observerInstrument = observer.value
150-
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.intSum, resource: resource)
152+
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.intSum, resource: resource, instrumentationLibraryInfo: instrumentationLibraryInfo)
151153
observerInstrument.invokeCallback()
152154

153155
observerInstrument.observerHandles.forEach { handle in
@@ -164,7 +166,7 @@ class MeterSdk: Meter {
164166
doubleObservers.forEach { observer in
165167
let metricName = observer.key
166168
let observerInstrument = observer.value
167-
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.doubleSum, resource: resource)
169+
var metric = Metric(namespace: meterName, name: metricName, desc: meterName + metricName, type: AggregationType.doubleSum, resource: resource, instrumentationLibraryInfo: instrumentationLibraryInfo)
168170
observerInstrument.invokeCallback()
169171

170172
observerInstrument.observerHandles.forEach { handle in
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
2+
import Foundation
3+
import NIO
4+
import GRPC
5+
import OpenTelemetryApi
6+
@testable import OpenTelemetrySdk
7+
@testable import OpenTelemetryProtocolExporter
8+
import XCTest
9+
10+
class OtlpMetricExproterTests : XCTestCase {
11+
12+
var fakeCollector: FakeMetricCollector!
13+
var server: EventLoopFuture<Server>!
14+
var channel: ClientConnection!
15+
16+
let channelGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
17+
let serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
18+
19+
override func setUp() {
20+
fakeCollector = FakeMetricCollector()
21+
server = startServer()
22+
channel = startChannel()
23+
}
24+
25+
override func tearDown() {
26+
try! serverGroup.syncShutdownGracefully()
27+
try! channelGroup.syncShutdownGracefully()
28+
}
29+
30+
func testExport() {
31+
let metric = generateSumMetric()
32+
let exporter = OtelpMetricExporter(channel: channel)
33+
let result = exporter.export(metrics: [metric]) { () -> Bool in
34+
false
35+
}
36+
XCTAssertEqual(result, MetricExporterResultCode.success)
37+
XCTAssertEqual(fakeCollector.receivedMetrics, MetricsAdapter.toProtoResourceMetrics(metricDataList: [metric]))
38+
exporter.shutdown()
39+
}
40+
41+
func testExportMultipleMetrics() {
42+
var metrics = [Metric]()
43+
for _ in 0 ..< 10 {
44+
metrics.append(generateSumMetric())
45+
}
46+
let exporter = OtelpMetricExporter(channel: channel)
47+
let result = exporter.export(metrics: metrics) { () -> Bool in
48+
false
49+
}
50+
XCTAssertEqual(result, MetricExporterResultCode.success)
51+
XCTAssertEqual(fakeCollector.receivedMetrics, MetricsAdapter.toProtoResourceMetrics(metricDataList: metrics))
52+
exporter.shutdown()
53+
}
54+
55+
func testExportAfterShutdown() {
56+
let metric = generateSumMetric()
57+
let exporter = OtelpMetricExporter(channel: channel)
58+
exporter.shutdown()
59+
let result = exporter.export(metrics: [metric]) { () -> Bool in
60+
false
61+
}
62+
XCTAssertEqual(result, MetricExporterResultCode.failureRetryable)
63+
}
64+
65+
66+
func testExportCancelled() {
67+
fakeCollector.returnedStatus = GRPCStatus(code: .cancelled, message: nil)
68+
let exporter = OtelpMetricExporter(channel: channel)
69+
let metric = generateSumMetric()
70+
let result = exporter.export(metrics: [metric]) { () -> Bool in
71+
false
72+
}
73+
XCTAssertEqual(result, MetricExporterResultCode.failureRetryable)
74+
}
75+
76+
func startServer() -> EventLoopFuture<Server> {
77+
// Start the server and print its address once it has started.
78+
let server = Server.insecure(group: serverGroup)
79+
.withServiceProviders([fakeCollector])
80+
.bind(host: "localhost", port: 55680)
81+
82+
server.map {
83+
$0.channel.localAddress
84+
}.whenSuccess { address in
85+
print("server started on port \(address!.port!)")
86+
}
87+
return server
88+
}
89+
90+
func startChannel() -> ClientConnection {
91+
let channel = ClientConnection.insecure(group: channelGroup)
92+
.connect(host: "localhost", port: 55680)
93+
return channel
94+
}
95+
96+
97+
func generateSumMetric() -> Metric {
98+
let library = InstrumentationLibraryInfo(name: "lib", version: "semver:0.0.0")
99+
var metric = Metric(namespace: "namespace", name: "metric", desc: "description", type: .doubleSum, resource: Resource(), instrumentationLibraryInfo: library)
100+
let data = SumData(startTimestamp: Date(), timestamp: Date(), labels: ["hello":"world"], sum: 1)
101+
metric.data.append(data)
102+
return metric
103+
}
104+
}
105+
106+
class FakeMetricCollector : Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceProvider
107+
{
108+
var receivedMetrics = [Opentelemetry_Proto_Metrics_V1_ResourceMetrics]()
109+
var returnedStatus = GRPCStatus.ok
110+
func export(request: Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest, context: StatusOnlyCallContext) ->
111+
EventLoopFuture<Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceResponse> {
112+
receivedMetrics.append(contentsOf: request.resourceMetrics)
113+
if returnedStatus != GRPCStatus.ok {
114+
return context.eventLoop.makeFailedFuture(returnedStatus)
115+
}
116+
return context.eventLoop.makeSucceededFuture(Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceResponse())
117+
}
118+
}

0 commit comments

Comments
 (0)