Skip to content

Commit f7061de

Browse files
authored
Added OTLP-HTTP Exporter for the Stable metrics (#476)
1 parent dea8ace commit f7061de

File tree

10 files changed

+411
-46
lines changed

10 files changed

+411
-46
lines changed

Sources/Exporters/OpenTelemetryProtocolCommon/metric/MetricsAdapter.swift

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ public enum MetricsAdapter {
103103
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_NumberDataPoint()
104104
injectPointData(protoNumberPoint: &protoDataPoint, pointData: gaugeData)
105105
protoDataPoint.value = .asInt(Int64(gaugeData.value))
106-
protoMetric.sum.aggregationTemporality = .cumulative
106+
protoMetric.sum.aggregationTemporality = stableMetric.data.aggregationTemporality.convertToProtoEnum()
107107
protoMetric.sum.dataPoints.append(protoDataPoint)
108+
protoMetric.sum.isMonotonic = stableMetric.isMonotonic
108109
case .DoubleGauge:
109110
guard let gaugeData = $0 as? DoublePointData else {
110111
break
@@ -120,8 +121,9 @@ public enum MetricsAdapter {
120121
var protoDataPoint = Opentelemetry_Proto_Metrics_V1_NumberDataPoint()
121122
injectPointData(protoNumberPoint: &protoDataPoint, pointData: gaugeData)
122123
protoDataPoint.value = .asDouble(gaugeData.value)
123-
protoMetric.sum.aggregationTemporality = .cumulative
124+
protoMetric.sum.aggregationTemporality = stableMetric.data.aggregationTemporality.convertToProtoEnum()
124125
protoMetric.sum.dataPoints.append(protoDataPoint)
126+
protoMetric.sum.isMonotonic = stableMetric.isMonotonic
125127
case .Summary:
126128
guard let summaryData = $0 as? SummaryPointData else {
127129
break
@@ -147,7 +149,7 @@ public enum MetricsAdapter {
147149
protoDataPoint.count = UInt64(histogramData.count)
148150
protoDataPoint.explicitBounds = histogramData.boundaries.map { Double($0) }
149151
protoDataPoint.bucketCounts = histogramData.counts.map { UInt64($0) }
150-
protoMetric.histogram.aggregationTemporality = .cumulative
152+
protoMetric.histogram.aggregationTemporality = stableMetric.data.aggregationTemporality.convertToProtoEnum()
151153
protoMetric.histogram.dataPoints.append(protoDataPoint)
152154
case .ExponentialHistogram:
153155
// TODO: implement
@@ -394,3 +396,14 @@ public enum MetricsAdapter {
394396
return protoMetric
395397
}
396398
}
399+
400+
extension AggregationTemporality {
401+
func convertToProtoEnum() -> Opentelemetry_Proto_Metrics_V1_AggregationTemporality {
402+
switch self {
403+
case .cumulative:
404+
return .cumulative
405+
case .delta:
406+
return .delta
407+
}
408+
}
409+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//
2+
// Copyright The OpenTelemetry Authors
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
import Foundation
7+
import SwiftProtobuf
8+
import OpenTelemetryProtocolExporterCommon
9+
10+
public class StableOtlpHTTPExporterBase {
11+
let endpoint: URL
12+
let httpClient: HTTPClient
13+
let envVarHeaders: [(String, String)]?
14+
let config: OtlpConfiguration
15+
16+
// MARK: - Init
17+
18+
public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
19+
self.envVarHeaders = envVarHeaders
20+
self.endpoint = endpoint
21+
self.config = config
22+
if let providedSession = useSession {
23+
self.httpClient = HTTPClient(session: providedSession)
24+
} else {
25+
self.httpClient = HTTPClient()
26+
}
27+
}
28+
29+
public func createRequest(body: Message, endpoint: URL) -> URLRequest {
30+
var request = URLRequest(url: endpoint)
31+
32+
for header in config.headers ?? [] {
33+
request.addValue(header.1, forHTTPHeaderField: header.0)
34+
}
35+
36+
do {
37+
request.httpMethod = "POST"
38+
request.httpBody = try body.serializedData()
39+
request.setValue(Headers.getUserAgentHeader(), forHTTPHeaderField: Constants.HTTP.userAgent)
40+
request.setValue("application/x-protobuf", forHTTPHeaderField: "Content-Type")
41+
} catch {
42+
print("Error serializing body: \(error)")
43+
}
44+
45+
return request
46+
}
47+
48+
public func shutdown(explicitTimeout: TimeInterval? = nil) { }
49+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
//
2+
// Copyright The OpenTelemetry Authors
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
import Foundation
7+
import OpenTelemetrySdk
8+
import OpenTelemetryProtocolExporterCommon
9+
10+
public func defaultStableOtlpHTTPMetricsEndpoint() -> URL {
11+
URL(string: "http://localhost:4318/v1/metrics")!
12+
}
13+
14+
public class StableOtlpHTTPMetricExporter: StableOtlpHTTPExporterBase, StableMetricExporter {
15+
var aggregationTemporalitySelector: AggregationTemporalitySelector
16+
var defaultAggregationSelector: DefaultAggregationSelector
17+
18+
var pendingMetrics: [StableMetricData] = []
19+
20+
// MARK: - Init
21+
22+
public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(), defaultAggregationSelector: DefaultAggregationSelector = AggregationSelector.instance, useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
23+
24+
self.aggregationTemporalitySelector = aggregationTemporalitySelector
25+
self.defaultAggregationSelector = defaultAggregationSelector
26+
27+
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
28+
}
29+
30+
31+
// MARK: - StableMetricsExporter
32+
33+
public func export(metrics : [StableMetricData]) -> ExportResult {
34+
pendingMetrics.append(contentsOf: metrics)
35+
let sendingMetrics = pendingMetrics
36+
pendingMetrics = []
37+
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
38+
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: sendingMetrics)
39+
}
40+
41+
let request = createRequest(body: body, endpoint: endpoint)
42+
httpClient.send(request: request) { [weak self] result in
43+
switch result {
44+
case .success(_):
45+
break
46+
case .failure(let error):
47+
self?.pendingMetrics.append(contentsOf: sendingMetrics)
48+
print(error)
49+
}
50+
}
51+
52+
return .success
53+
}
54+
55+
public func flush() -> ExportResult {
56+
var exporterResult: ExportResult = .success
57+
58+
if !pendingMetrics.isEmpty {
59+
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
60+
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: pendingMetrics)
61+
}
62+
let semaphore = DispatchSemaphore(value: 0)
63+
let request = createRequest(body: body, endpoint: endpoint)
64+
httpClient.send(request: request) { result in
65+
switch result {
66+
case .success(_):
67+
break
68+
case .failure(let error):
69+
print(error)
70+
exporterResult = .failure
71+
}
72+
semaphore.signal()
73+
}
74+
semaphore.wait()
75+
}
76+
77+
return exporterResult
78+
}
79+
80+
public func shutdown() -> ExportResult {
81+
return .success
82+
}
83+
84+
// MARK: - AggregationTemporalitySelectorProtocol
85+
86+
public func getAggregationTemporality(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.AggregationTemporality {
87+
return aggregationTemporalitySelector.getAggregationTemporality(for: instrument)
88+
}
89+
90+
// MARK: - DefaultAggregationSelector
91+
92+
public func getDefaultAggregation(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.Aggregation {
93+
return defaultAggregationSelector.getDefaultAggregation(for: instrument)
94+
}
95+
}

Sources/OpenTelemetrySdk/Metrics/Stable/Aggregation/AggregationTemporality.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class AggregationTemporalitySelector : AggregationTemporalitySelectorProt
1414
return aggregationTemporalitySelector(instrument)
1515
}
1616

17-
init(aggregationTemporalitySelector: @escaping (InstrumentType) -> AggregationTemporality) {
17+
public init(aggregationTemporalitySelector: @escaping (InstrumentType) -> AggregationTemporality) {
1818
self.aggregationTemporalitySelector = aggregationTemporalitySelector
1919
}
2020

@@ -31,6 +31,12 @@ public enum AggregationTemporality {
3131
}
3232

3333
}
34+
35+
public static func alwaysDelta() -> AggregationTemporalitySelector {
36+
return AggregationTemporalitySelector() { (type) in
37+
.delta
38+
}
39+
}
3440

3541
public static func deltaPreferred() -> AggregationTemporalitySelector {
3642
return AggregationTemporalitySelector() { type in

Sources/OpenTelemetrySdk/Metrics/Stable/Aggregation/DoubleSumAggregator.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class DoubleSumAggregator: SumAggregator, StableAggregator {
2222
}
2323

2424
public func toMetricData(resource: Resource, scope: InstrumentationScopeInfo, descriptor: MetricDescriptor, points: [PointData], temporality: AggregationTemporality) -> StableMetricData {
25-
StableMetricData.createDoubleSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, data: StableSumData(aggregationTemporality: temporality, points: points as! [DoublePointData]))
25+
StableMetricData.createDoubleSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, isMonotonic: self.isMonotonic, data: StableSumData(aggregationTemporality: temporality, points: points as! [DoublePointData]))
2626
}
2727

2828
init(instrumentDescriptor: InstrumentDescriptor, reservoirSupplier: @escaping () -> ExemplarReservoir) {

Sources/OpenTelemetrySdk/Metrics/Stable/Aggregation/LongSumAggregator.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class LongSumAggregator: SumAggregator, StableAggregator {
2727
}
2828

2929
public func toMetricData(resource: Resource, scope: InstrumentationScopeInfo, descriptor: MetricDescriptor, points: [PointData], temporality: AggregationTemporality) -> StableMetricData {
30-
StableMetricData.createLongSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, data: StableSumData(aggregationTemporality: temporality, points: points as! [LongPointData]))
30+
StableMetricData.createLongSum(resource: resource, instrumentationScopeInfo: scope, name: descriptor.instrument.name, description: descriptor.instrument.description, unit: descriptor.instrument.unit, isMonotonic: self.isMonotonic, data: StableSumData(aggregationTemporality: temporality, points: points as! [LongPointData]))
3131
}
3232

3333
private class Handle: AggregatorHandle {

Sources/OpenTelemetrySdk/Metrics/Stable/Data/StableMetricData.swift

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,33 @@ public struct StableMetricData: Equatable {
2323
public private(set) var description: String
2424
public private(set) var unit: String
2525
public private(set) var type: MetricDataType
26+
public private(set) var isMonotonic: Bool
2627
public private(set) var data: Data
2728

28-
public static let empty = StableMetricData(resource: Resource.empty, instrumentationScopeInfo: InstrumentationScopeInfo(), name: "", description: "", unit: "", type: .Summary, data: StableMetricData.Data(points: [PointData]()))
29+
public static let empty = StableMetricData(resource: Resource.empty, instrumentationScopeInfo: InstrumentationScopeInfo(), name: "", description: "", unit: "", type: .Summary, isMonotonic: false, data: StableMetricData.Data(aggregationTemporality: .cumulative, points: [PointData]()))
2930

3031
public class Data: Equatable {
3132
public private(set) var points: [PointData]
33+
public private(set) var aggregationTemporality: AggregationTemporality
3234

33-
internal init(points: [PointData]) {
35+
internal init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
36+
self.aggregationTemporality = aggregationTemporality
3437
self.points = points
3538
}
3639

3740
public static func == (lhs: StableMetricData.Data, rhs: StableMetricData.Data) -> Bool {
38-
return lhs.points == rhs.points
41+
return lhs.points == rhs.points && lhs.aggregationTemporality == rhs.aggregationTemporality
3942
}
4043
}
4144

42-
internal init(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, type: MetricDataType, data: StableMetricData.Data) {
45+
internal init(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, type: MetricDataType, isMonotonic: Bool, data: StableMetricData.Data) {
4346
self.resource = resource
4447
self.instrumentationScopeInfo = instrumentationScopeInfo
4548
self.name = name
4649
self.description = description
4750
self.unit = unit
4851
self.type = type
52+
self.isMonotonic = isMonotonic
4953
self.data = data
5054
}
5155

@@ -56,33 +60,34 @@ public struct StableMetricData: Equatable {
5660
lhs.description == rhs.description &&
5761
lhs.unit == rhs.unit &&
5862
lhs.type == rhs.type &&
59-
lhs.data.points == rhs.data.points
63+
lhs.isMonotonic == rhs.isMonotonic &&
64+
lhs.data == rhs.data
6065
}
6166
}
6267

6368
extension StableMetricData {
6469
static func createExponentialHistogram(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableExponentialHistogramData) -> StableMetricData {
65-
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .ExponentialHistogram, data: data)
70+
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .ExponentialHistogram, isMonotonic: false, data: data)
6671
}
6772

6873
static func createDoubleGauge(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableGaugeData) -> StableMetricData {
69-
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleGauge, data: data)
74+
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleGauge, isMonotonic: false, data: data)
7075
}
7176

7277
static func createLongGauge(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableGaugeData) -> StableMetricData {
73-
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongGauge, data: data)
78+
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongGauge, isMonotonic: false, data: data)
7479
}
7580

76-
static func createDoubleSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableSumData) -> StableMetricData {
77-
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleSum, data: data)
81+
static func createDoubleSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, isMonotonic: Bool, data: StableSumData) -> StableMetricData {
82+
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .DoubleSum, isMonotonic: isMonotonic, data: data)
7883
}
7984

80-
static func createLongSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableSumData) -> StableMetricData {
81-
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongSum, data: data)
85+
static func createLongSum(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, isMonotonic: Bool, data: StableSumData) -> StableMetricData {
86+
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .LongSum, isMonotonic: isMonotonic, data: data)
8287
}
8388

8489
static func createHistogram(resource: Resource, instrumentationScopeInfo: InstrumentationScopeInfo, name: String, description: String, unit: String, data: StableHistogramData) -> StableMetricData {
85-
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .Histogram, data: data)
90+
StableMetricData(resource: resource, instrumentationScopeInfo: instrumentationScopeInfo, name: name, description: description, unit: unit, type: .Histogram, isMonotonic: false, data: data)
8691
}
8792

8893
func isEmpty() -> Bool {
@@ -99,41 +104,31 @@ extension StableMetricData {
99104
}
100105

101106
public class StableHistogramData: StableMetricData.Data {
102-
public private(set) var aggregationTemporality: AggregationTemporality
103107
init(aggregationTemporality: AggregationTemporality, points: [HistogramPointData]) {
104-
self.aggregationTemporality = aggregationTemporality
105-
super.init(points: points)
108+
super.init(aggregationTemporality: aggregationTemporality, points: points)
106109
}
107110
}
108111

109112
public class StableExponentialHistogramData: StableMetricData.Data {
110-
public private(set) var aggregationTemporality: AggregationTemporality
111-
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
112-
self.aggregationTemporality = aggregationTemporality
113-
super.init(points: points)
113+
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
114+
super.init(aggregationTemporality: aggregationTemporality, points: points)
114115
}
115116
}
116117

117118
public class StableGaugeData: StableMetricData.Data {
118-
public private(set) var aggregationTemporality: AggregationTemporality
119-
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
120-
self.aggregationTemporality = aggregationTemporality
121-
super.init(points: points)
119+
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
120+
super.init(aggregationTemporality: aggregationTemporality, points: points)
122121
}
123122
}
124123

125124
public class StableSumData: StableMetricData.Data {
126-
public private(set) var aggregationTemporality: AggregationTemporality
127-
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
128-
self.aggregationTemporality = aggregationTemporality
129-
super.init(points: points)
125+
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
126+
super.init(aggregationTemporality: aggregationTemporality, points: points)
130127
}
131128
}
132129

133130
public class StableSummaryData: StableMetricData.Data {
134-
public private(set) var aggregationTemporality: AggregationTemporality
135-
init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
136-
self.aggregationTemporality = aggregationTemporality
137-
super.init(points: points)
131+
override init(aggregationTemporality: AggregationTemporality, points: [PointData]) {
132+
super.init(aggregationTemporality: aggregationTemporality, points: points)
138133
}
139134
}

0 commit comments

Comments
 (0)