Skip to content

Commit af80448

Browse files
author
Ignacio Bonafonte
authored
Merge pull request #263 from nachoBonafonte/fix-network-instrumentation-interlock
Improve network instrumentation synchronisation Improve network instrumentation synchronisation protecting only the code that cannot be accessed concurrently Modify SimpleSpanProcessor so it exports the spans asynchronously, and synchronises only on shutdown or flush
2 parents 775e7a0 + 6ca2e1f commit af80448

File tree

5 files changed

+160
-137
lines changed

5 files changed

+160
-137
lines changed

Sources/Instrumentation/URLSession/URLSessionInstrumentation.swift

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,11 @@ public class URLSessionInstrumentation {
242242

243243
if let request = argument as? URLRequest, objc_getAssociatedObject(argument, &idKey) == nil {
244244
let instrumentedRequest = URLSessionLogger.processAndLogRequest(request, sessionTaskId: sessionTaskId, instrumentation: self, shouldInjectHeaders: true)
245-
task = castedIMP(session, selector, instrumentedRequest ?? request, completionBlock)
245+
if let instrumentedRequest = instrumentedRequest {
246+
task = castedIMP(session, selector, instrumentedRequest, completionBlock)
247+
} else {
248+
task = castedIMP(session, selector, request, completion)
249+
}
246250
} else {
247251
task = castedIMP(session, selector, argument, completionBlock)
248252
if objc_getAssociatedObject(argument, &idKey) == nil,
@@ -479,57 +483,53 @@ public class URLSessionInstrumentation {
479483
private func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
480484
guard configuration.shouldRecordPayload?(session) ?? false else { return }
481485
let dataCopy = data
482-
queue.async {
483-
let taskId = self.idKeyForTask(dataTask)
484-
if (self.requestMap[taskId]?.request) != nil {
485-
self.createRequestState(for: taskId)
486-
if self.requestMap[taskId]?.dataProcessed == nil {
487-
self.requestMap[taskId]?.dataProcessed = Data()
486+
let taskId = self.idKeyForTask(dataTask)
487+
queue.sync {
488+
if (requestMap[taskId]?.request) != nil {
489+
createRequestState(for: taskId)
490+
if requestMap[taskId]?.dataProcessed == nil {
491+
requestMap[taskId]?.dataProcessed = Data()
488492
}
489-
self.requestMap[taskId]?.dataProcessed?.append(dataCopy)
493+
requestMap[taskId]?.dataProcessed?.append(dataCopy)
490494
}
491495
}
492496
}
493497

494498
private func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) {
495499
guard configuration.shouldRecordPayload?(session) ?? false else { return }
496-
queue.async {
497-
let taskId = self.idKeyForTask(dataTask)
498-
if (self.requestMap[taskId]?.request) != nil {
499-
self.createRequestState(for: taskId)
500+
let taskId = self.idKeyForTask(dataTask)
501+
queue.sync {
502+
if (requestMap[taskId]?.request) != nil {
503+
createRequestState(for: taskId)
500504
if response.expectedContentLength < 0 {
501-
self.requestMap[taskId]?.dataProcessed = Data()
505+
requestMap[taskId]?.dataProcessed = Data()
502506
} else {
503-
self.requestMap[taskId]?.dataProcessed = Data(capacity: Int(response.expectedContentLength))
507+
requestMap[taskId]?.dataProcessed = Data(capacity: Int(response.expectedContentLength))
504508
}
505509
}
506510
}
507511
}
508512

509513
private func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
514+
let taskId = self.idKeyForTask(task)
515+
var requestState: NetworkRequestState?
510516
queue.sync {
511-
let taskId = self.idKeyForTask(task)
512-
513-
let requestState = self.requestMap[taskId]
514-
515-
if let error = error {
516-
let status = (task.response as? HTTPURLResponse)?.statusCode ?? 0
517-
URLSessionLogger.logError(error, dataOrFile: requestState?.dataProcessed, statusCode: status, instrumentation: self, sessionTaskId: taskId)
518-
} else if let response = task.response {
519-
URLSessionLogger.logResponse(response, dataOrFile: requestState?.dataProcessed, instrumentation: self, sessionTaskId: taskId)
520-
}
521-
517+
requestState = requestMap[taskId]
522518
if requestState != nil {
523-
self.requestMap[taskId] = nil
519+
requestMap[taskId] = nil
524520
}
525521
}
522+
if let error = error {
523+
let status = (task.response as? HTTPURLResponse)?.statusCode ?? 0
524+
URLSessionLogger.logError(error, dataOrFile: requestState?.dataProcessed, statusCode: status, instrumentation: self, sessionTaskId: taskId)
525+
} else if let response = task.response {
526+
URLSessionLogger.logResponse(response, dataOrFile: requestState?.dataProcessed, instrumentation: self, sessionTaskId: taskId)
527+
}
526528
}
527529

528530
private func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didBecome downloadTask: URLSessionDownloadTask) {
529-
queue.async {
530-
let id = self.idKeyForTask(dataTask)
531-
self.setIdKey(value: id, for: downloadTask)
532-
}
531+
let id = self.idKeyForTask(dataTask)
532+
self.setIdKey(value: id, for: downloadTask)
533533
}
534534

535535
private func urlSession(_ session: URLSession, task: URLSessionTask, didFinishCollecting metrics: URLSessionTaskMetrics) {
@@ -540,13 +540,11 @@ public class URLSessionInstrumentation {
540540
}
541541

542542
private func urlSessionTaskWillResume(_ session: URLSessionTask) {
543-
queue.sync {
544-
let taskId = self.idKeyForTask(session)
545-
if let request = session.currentRequest {
546-
var state = requestMap[taskId]
547-
if state == nil {
548-
state = NetworkRequestState()
549-
requestMap[taskId] = state
543+
let taskId = self.idKeyForTask(session)
544+
if let request = session.currentRequest {
545+
queue.sync {
546+
if requestMap[taskId] == nil {
547+
requestMap[taskId] = NetworkRequestState()
550548
}
551549
requestMap[taskId]?.setRequest(request)
552550
}

Sources/OpenTelemetrySdk/Trace/SpanProcessors/SimpleSpanProcessor.swift

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,36 @@ import OpenTelemetryApi
99
/// An implementation of the SpanProcessor that converts the ReadableSpan SpanData
1010
/// and passes it to the configured exporter.
1111
public struct SimpleSpanProcessor: SpanProcessor {
12-
private var spanExporter: SpanExporter
12+
private let spanExporter: SpanExporter
1313
private var sampled: Bool = true
14+
private let processorQueue = DispatchQueue(label: "io.opentelemetry.simplespanprocessor")
1415

1516
public let isStartRequired = false
1617
public let isEndRequired = true
17-
18-
public func onStart(parentContext: SpanContext?, span: ReadableSpan) {
19-
}
18+
19+
public func onStart(parentContext: SpanContext?, span: ReadableSpan) {}
2020

2121
public mutating func onEnd(span: ReadableSpan) {
22-
if sampled && !span.context.traceFlags.sampled {
22+
if sampled, !span.context.traceFlags.sampled {
2323
return
2424
}
2525
let span = span.toSpanData()
26-
spanExporter.export(spans: [span])
26+
let spanExporterAux = self.spanExporter
27+
processorQueue.async {
28+
spanExporterAux.export(spans: [span])
29+
}
2730
}
2831

2932
public func shutdown() {
30-
spanExporter.shutdown()
33+
processorQueue.sync {
34+
spanExporter.shutdown()
35+
}
3136
}
3237

3338
public func forceFlush() {
34-
_ = spanExporter.flush()
39+
processorQueue.sync {
40+
_ = spanExporter.flush()
41+
}
3542
}
3643

3744
/// Returns a new SimpleSpansProcessor that converts spans to proto and forwards them to

Tests/ExportersTests/InMemory/InMemoryExporterTests.swift

Lines changed: 92 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -6,93 +6,100 @@
66
import Foundation
77

88
@testable import InMemoryExporter
9-
@testable import OpenTelemetrySdk
109
@testable import OpenTelemetryApi
10+
@testable import OpenTelemetrySdk
1111
import XCTest
1212

1313
final class InMemoryExporterTests: XCTestCase {
14-
private var tracerSdkFactory = TracerProviderSdk()
15-
private var tracer: Tracer!
16-
private var exporter: InMemoryExporter!
17-
18-
override func setUp() {
19-
exporter = InMemoryExporter()
20-
tracerSdkFactory.addSpanProcessor(SimpleSpanProcessor(spanExporter: exporter))
21-
tracer = tracerSdkFactory.get(instrumentationName: "InMemoryExporterTests")
22-
}
23-
24-
func testGetFinishedSpanItems() {
25-
tracer.spanBuilder(spanName: "one").startSpan().end()
26-
tracer.spanBuilder(spanName: "two").startSpan().end()
27-
tracer.spanBuilder(spanName: "three").startSpan().end()
28-
29-
let spans = exporter.getFinishedSpanItems()
30-
XCTAssertEqual(spans.count, 3)
31-
XCTAssertEqual(spans[0].name, "one")
32-
XCTAssertEqual(spans[1].name, "two")
33-
XCTAssertEqual(spans[2].name, "three")
34-
}
35-
36-
func testResetClearsFinishedSpans() {
37-
tracer.spanBuilder(spanName: "one").startSpan().end()
38-
tracer.spanBuilder(spanName: "two").startSpan().end()
39-
tracer.spanBuilder(spanName: "three").startSpan().end()
40-
41-
var spans = exporter.getFinishedSpanItems()
42-
XCTAssertEqual(spans.count, 3)
43-
44-
exporter.reset()
45-
spans = exporter.getFinishedSpanItems()
46-
XCTAssertEqual(spans.count, 0)
47-
}
48-
49-
func testResetDoesNotRestartAfterShutdown() {
50-
tracer.spanBuilder(spanName: "one").startSpan().end()
51-
52-
exporter.shutdown()
53-
exporter.reset()
54-
55-
XCTAssertEqual(exporter.export(spans: []), .failure)
56-
}
57-
58-
func testShutdownClearsFinishedSpans() {
59-
tracer.spanBuilder(spanName: "one").startSpan().end()
60-
tracer.spanBuilder(spanName: "two").startSpan().end()
61-
tracer.spanBuilder(spanName: "three").startSpan().end()
62-
63-
exporter.shutdown()
64-
65-
let spans = exporter.getFinishedSpanItems()
66-
XCTAssertEqual(spans.count, 0)
67-
}
68-
69-
func testShutdownStopsFurtherExports() {
70-
tracer.spanBuilder(spanName: "one").startSpan().end()
71-
tracer.spanBuilder(spanName: "two").startSpan().end()
72-
tracer.spanBuilder(spanName: "three").startSpan().end()
73-
74-
exporter.shutdown()
75-
tracer.spanBuilder(spanName: "four").startSpan().end()
76-
77-
let spans = exporter.getFinishedSpanItems()
78-
XCTAssertEqual(spans.count, 0)
79-
}
80-
81-
func testExportReturnsSuccessWhenStarted() {
82-
XCTAssertEqual(exporter.export(spans: []), .success)
83-
}
84-
85-
func testExportReturnsFailureWhenStopped() {
86-
exporter.shutdown()
87-
XCTAssertEqual(exporter.export(spans: []), .failure)
88-
}
89-
90-
func testFlushReturnsSuccessWhenRunning() {
91-
XCTAssertEqual(exporter.flush(), .success)
92-
}
93-
94-
func testFlushReturnsFailiureWhenStopped() {
95-
exporter.shutdown()
96-
XCTAssertEqual(exporter.flush(), .failure)
97-
}
14+
private var tracerSdkFactory = TracerProviderSdk()
15+
private var spanProcessor: SpanProcessor!
16+
private var tracer: Tracer!
17+
private var exporter: InMemoryExporter!
18+
19+
override func setUp() {
20+
exporter = InMemoryExporter()
21+
spanProcessor = SimpleSpanProcessor(spanExporter: exporter)
22+
tracerSdkFactory.addSpanProcessor(spanProcessor)
23+
tracer = tracerSdkFactory.get(instrumentationName: "InMemoryExporterTests")
24+
}
25+
26+
func testGetFinishedSpanItems() {
27+
tracer.spanBuilder(spanName: "one").startSpan().end()
28+
tracer.spanBuilder(spanName: "two").startSpan().end()
29+
tracer.spanBuilder(spanName: "three").startSpan().end()
30+
spanProcessor.forceFlush()
31+
32+
let spans = exporter.getFinishedSpanItems()
33+
XCTAssertEqual(spans.count, 3)
34+
XCTAssertEqual(spans[0].name, "one")
35+
XCTAssertEqual(spans[1].name, "two")
36+
XCTAssertEqual(spans[2].name, "three")
37+
}
38+
39+
func testResetClearsFinishedSpans() {
40+
tracer.spanBuilder(spanName: "one").startSpan().end()
41+
tracer.spanBuilder(spanName: "two").startSpan().end()
42+
tracer.spanBuilder(spanName: "three").startSpan().end()
43+
spanProcessor.forceFlush()
44+
45+
var spans = exporter.getFinishedSpanItems()
46+
XCTAssertEqual(spans.count, 3)
47+
48+
exporter.reset()
49+
spans = exporter.getFinishedSpanItems()
50+
XCTAssertEqual(spans.count, 0)
51+
}
52+
53+
func testResetDoesNotRestartAfterShutdown() {
54+
tracer.spanBuilder(spanName: "one").startSpan().end()
55+
spanProcessor.forceFlush()
56+
57+
exporter.shutdown()
58+
exporter.reset()
59+
60+
XCTAssertEqual(exporter.export(spans: []), .failure)
61+
}
62+
63+
func testShutdownClearsFinishedSpans() {
64+
tracer.spanBuilder(spanName: "one").startSpan().end()
65+
tracer.spanBuilder(spanName: "two").startSpan().end()
66+
tracer.spanBuilder(spanName: "three").startSpan().end()
67+
spanProcessor.forceFlush()
68+
69+
exporter.shutdown()
70+
71+
let spans = exporter.getFinishedSpanItems()
72+
XCTAssertEqual(spans.count, 0)
73+
}
74+
75+
func testShutdownStopsFurtherExports() {
76+
tracer.spanBuilder(spanName: "one").startSpan().end()
77+
tracer.spanBuilder(spanName: "two").startSpan().end()
78+
tracer.spanBuilder(spanName: "three").startSpan().end()
79+
spanProcessor.forceFlush()
80+
81+
exporter.shutdown()
82+
tracer.spanBuilder(spanName: "four").startSpan().end()
83+
84+
let spans = exporter.getFinishedSpanItems()
85+
XCTAssertEqual(spans.count, 0)
86+
}
87+
88+
func testExportReturnsSuccessWhenStarted() {
89+
XCTAssertEqual(exporter.export(spans: []), .success)
90+
}
91+
92+
func testExportReturnsFailureWhenStopped() {
93+
exporter.shutdown()
94+
XCTAssertEqual(exporter.export(spans: []), .failure)
95+
}
96+
97+
func testFlushReturnsSuccessWhenRunning() {
98+
XCTAssertEqual(exporter.flush(), .success)
99+
}
100+
101+
func testFlushReturnsFailiureWhenStopped() {
102+
exporter.shutdown()
103+
XCTAssertEqual(exporter.flush(), .failure)
104+
}
98105
}

0 commit comments

Comments
 (0)