Skip to content

Commit ca6be73

Browse files
Datadog exporter: Fully synchronize on flush (#70)
Explain in the comments how the synchronisation works on the flush involved methods
1 parent 9d6e046 commit ca6be73

File tree

9 files changed

+62
-32
lines changed

9 files changed

+62
-32
lines changed

README.md

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11

2-
[![Scope](https://app.scope.dev/api/badge/aac7d72a-28e3-4c66-ac04-ad816166cd41/default)](https://app.scope.dev/external/v1/inspect/54b1fab6-1a7f-4d03-9fb4-26fafd169131/aac7d72a-28e3-4c66-ac04-ad816166cd41/default)
3-
42
# opentelemetry-swift
53

64
A swift [OpenTelemetry](https://opentelemetry.io/) client
@@ -35,17 +33,17 @@ or
3533

3634
## Current status
3735

38-
Currently Tracing, metrics and Correlation Context and API's and SDK are implemented, also OpenTracing shims, for compatibility with existing Opentracing code.
36+
Currently Tracing, Metrics and Correlation Context API's and SDK are implemented, also OpenTracing shims, for compatibility with existing Opentracing code.
3937

40-
Implemented a simple stdout, Jaeger, Zipkin and OpenTelemetry collector for traces;
38+
Implemented traces exporters: simple stdout, Jaeger, Zipkin, Datadog and OpenTelemetry collector
4139

42-
Implemented a Prometheus exporter for metrics.
40+
Implemented metrics exporters: Prometheus
4341

4442
## Examples
4543

4644
The package includes some example projects with basic functionality:
4745

4846
- `Logging Tracer` - Simple api implementation of a Tracer that logs every api call
49-
- `Simple Exporter` - Shows the Jaeger an Stdout exporters in action using a MultiSpanExporter
47+
- `Simple Exporter` - Shows the Jaeger an Stdout exporters in action using a MultiSpanExporter. Can be easily modified for other exporters
5048
- `Prometheus Sample` - Shows the Prometheus exporter reporting metrics to a Prometheus instance
5149

Sources/Exporters/DatadogExporter/DatadogExporter.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ public class DatadogExporter: SpanExporter {
4040
}
4141

4242
public func flush() -> SpanExporterResultCode {
43-
if let writer = spansExporter.tracesStorage.writer as? FileWriter {
44-
writer.queue.sync {}
45-
}
43+
spansExporter.tracesStorage.writer.queue.sync {}
44+
logsExporter.logsStorage.writer.queue.sync {}
45+
4646
_ = logsExporter.logsUpload.uploader.flush()
47-
return spansExporter.tracesUpload.uploader.flush()
47+
_ = spansExporter.tracesUpload.uploader.flush()
48+
return .success
4849
}
4950

5051
public func shutdown() {

Sources/Exporters/DatadogExporter/Files/File.swift

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ internal protocol WritableFile {
2424
func size() throws -> UInt64
2525

2626
/// Synchronously appends given data at the end of this file.
27-
func append(data: Data) throws
27+
func append(data: Data, synchronized: Bool ) throws
2828
}
2929

3030
/// Provides convenient interface for reading contents and metadata of the file.
@@ -51,7 +51,7 @@ internal struct File: WritableFile, ReadableFile {
5151
}
5252

5353
/// Appends given data at the end of this file.
54-
func append(data: Data) throws {
54+
func append(data: Data, synchronized: Bool = false) throws {
5555
let fileHandle = try FileHandle(forWritingTo: url)
5656

5757
// NOTE: RUMM-669
@@ -72,7 +72,12 @@ internal struct File: WritableFile, ReadableFile {
7272
This is fixed in iOS 14/Xcode 12
7373
*/
7474
if #available(OSX 10.15, iOS 13.4, watchOS 6.0, tvOS 13.0, *) {
75-
defer { try? fileHandle.close() }
75+
defer {
76+
try? fileHandle.close()
77+
if synchronized {
78+
try? fileHandle.synchronize()
79+
}
80+
}
7681
try fileHandle.seekToEnd()
7782
try fileHandle.write(contentsOf: data)
7883
} else {

Sources/Exporters/DatadogExporter/Persistence/FileReader.swift

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,23 @@ internal final class FileReader {
6262
return nil
6363
}
6464

65+
/// This method gets remaining files at once, and process each file after with the block passed.
66+
/// Being on a queue assures that no other previous batches are uploaded while these are being handled
67+
internal func onRemainingBatches(process: (Batch)->()) -> Bool {
68+
queue.sync {
69+
do {
70+
try orchestrator.getAllFiles(excludingFilesNamed: Set(filesRead.map { $0.name }))?.forEach {
71+
let fileData = try $0.read()
72+
let batchData = dataFormat.prefixData + fileData + dataFormat.suffixData
73+
process(Batch(data: batchData, file: $0))
74+
}
75+
} catch {
76+
return false
77+
}
78+
return true
79+
}
80+
}
81+
6582
// MARK: - Accepting batches
6683

6784
func markBatchAsRead(_ batch: Batch) {
@@ -70,7 +87,7 @@ internal final class FileReader {
7087
}
7188
}
7289

73-
private func synchronizedMarkBatchAsRead(_ batch: Batch) {
90+
func synchronizedMarkBatchAsRead(_ batch: Batch) {
7491
orchestrator.delete(readableFile: batch.file)
7592
filesRead.append(batch.file)
7693
}

Sources/Exporters/DatadogExporter/Persistence/FileWriter.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,20 @@ internal final class FileWriter {
4545

4646
func writeSync<T: Encodable>(value: T) {
4747
queue.sync { [weak self] in
48-
self?.synchronizedWrite(value: value)
48+
self?.synchronizedWrite(value: value, syncOnEnd: true)
4949
}
5050
}
5151

52-
private func synchronizedWrite<T: Encodable>(value: T) {
52+
private func synchronizedWrite<T: Encodable>(value: T, syncOnEnd: Bool = false) {
5353
do {
5454
let data = try jsonEncoder.encode(value)
5555
let file = try orchestrator.getWritableFile(writeSize: UInt64(data.count))
5656

5757
if try file.size() == 0 {
58-
try file.append(data: data)
58+
try file.append(data: data, synchronized: syncOnEnd)
5959
} else {
6060
let atomicData = dataFormat.separatorData + data
61-
try file.append(data: atomicData)
61+
try file.append(data: atomicData, synchronized: syncOnEnd)
6262
}
6363
} catch {
6464
print("🔥 Failed to write file: \(error)")

Sources/Exporters/DatadogExporter/Persistence/FilesOrchestrator.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ internal class FilesOrchestrator {
113113
}
114114
}
115115

116+
func getAllFiles(excludingFilesNamed excludedFileNames: Set<String> = []) -> [ReadableFile]? {
117+
do {
118+
return try directory.files()
119+
.filter({ excludedFileNames.contains($0.name) == false })
120+
} catch {
121+
print("🔥 Failed to obtain readable files: \(error)")
122+
return nil
123+
}
124+
}
125+
116126
func delete(readableFile: ReadableFile) {
117127
do {
118128
try readableFile.delete()

Sources/Exporters/DatadogExporter/Upload/DataUploadWorker.swift

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,18 @@ internal class DataUploadWorker: DataUploadWorkerType {
8686
}
8787
}
8888

89+
/// This method gets remaining files at once, and uploads them
90+
/// It assures that periodic uploader cannot read or upload the files while the flush is being processed
8991
internal func flush() -> SpanExporterResultCode {
90-
var resultCode: SpanExporterResultCode = .success
91-
queue.sync {
92-
let nextBatch = self.fileReader.readNextBatch()
93-
if let batch = nextBatch {
94-
let uploadStatus = self.dataUploader.upload(data: batch.data)
92+
let success = queue.sync {
93+
self.fileReader.onRemainingBatches {
94+
let uploadStatus = self.dataUploader.upload(data: $0.data)
9595
let shouldBeAccepted = self.acceptableUploadStatuses.contains(uploadStatus)
9696
if shouldBeAccepted {
97-
self.fileReader.markBatchAsRead(batch)
98-
} else {
99-
resultCode = .failure
97+
self.fileReader.synchronizedMarkBatchAsRead($0)
10098
}
10199
}
102100
}
103-
return resultCode
101+
return success ? .success : .failure
104102
}
105103
}

Sources/OpenTelemetrySdk/Trace/Export/SimpleSpanProcessor.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public struct SimpleSpanProcessor: SpanProcessor {
4141
}
4242

4343
public func forceFlush() {
44+
spanExporter.flush()
4445
}
4546

4647
/// Returns a new SimpleSpansProcessor that converts spans to proto and forwards them to

Tests/ExportersTests/DatadogExporter/Persistence/FilesOrchestratorTests.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class FilesOrchestratorTests: XCTestCase {
8383
)
8484

8585
let file1 = try orchestrator.getWritableFile(writeSize: performance.maxObjectSize)
86-
try chunkedData.forEach { chunk in try file1.append(data: chunk) }
86+
try chunkedData.forEach { chunk in try file1.append(data: chunk, synchronized: false) }
8787
let file2 = try orchestrator.getWritableFile(writeSize: 1)
8888

8989
XCTAssertNotEqual(file1.name, file2.name)
@@ -144,15 +144,15 @@ class FilesOrchestratorTests: XCTestCase {
144144

145145
// write 1MB to first file (1MB of directory size in total)
146146
let file1 = try orchestrator.getWritableFile(writeSize: oneMB)
147-
try file1.append(data: .mock(ofSize: oneMB))
147+
try file1.append(data: .mock(ofSize: oneMB), synchronized: false)
148148

149149
// write 1MB to second file (2MB of directory size in total)
150150
let file2 = try orchestrator.getWritableFile(writeSize: oneMB)
151-
try file2.append(data: .mock(ofSize: oneMB))
151+
try file2.append(data: .mock(ofSize: oneMB), synchronized: true)
152152

153153
// write 1MB to third file (3MB of directory size in total)
154154
let file3 = try orchestrator.getWritableFile(writeSize: oneMB + 1) // +1 byte to exceed the limit
155-
try file3.append(data: .mock(ofSize: oneMB + 1))
155+
try file3.append(data: .mock(ofSize: oneMB + 1), synchronized: false)
156156

157157
XCTAssertEqual(try temporaryDirectory.files().count, 3)
158158

@@ -161,7 +161,7 @@ class FilesOrchestratorTests: XCTestCase {
161161
let file4 = try orchestrator.getWritableFile(writeSize: oneMB)
162162
XCTAssertEqual(try temporaryDirectory.files().count, 3)
163163
XCTAssertNil(try? temporaryDirectory.file(named: file1.name))
164-
try file4.append(data: .mock(ofSize: oneMB + 1))
164+
try file4.append(data: .mock(ofSize: oneMB + 1), synchronized: true)
165165

166166
_ = try orchestrator.getWritableFile(writeSize: oneMB)
167167
XCTAssertEqual(try temporaryDirectory.files().count, 3)

0 commit comments

Comments
 (0)