@@ -30,8 +30,8 @@ public final class JSONRPCConnection: Connection {
30
30
/// The message handler that handles requests and notifications sent through this connection.
31
31
///
32
32
/// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
33
- /// - `init`: Can trivially only be called once
34
- /// - `start`: Is required to be call in the same sequential code region as the initializer, so
33
+ /// - `init`: Reference to `JSONRPCConnection` trivially can't have escaped to other isolation domains yet.
34
+ /// - `start`: Is required to be call in the same serial code region as the initializer, so
35
35
/// `JSONRPCConnection` can't have escaped to other isolation domains yet.
36
36
/// - `deinit`: Can also only trivially be called once.
37
37
nonisolated ( unsafe) private var receiveHandler : MessageHandler ?
@@ -56,35 +56,54 @@ public final class JSONRPCConnection: Connection {
56
56
/// Current state of the connection, used to ensure correct usage.
57
57
///
58
58
/// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
59
- /// - `init`: Can trivially only be called once
60
- /// - `start`: Is required to be called in the same sequential region as the initializer, so
59
+ /// - `init`: Reference to `JSONRPCConnection` trivially can't have escaped to other isolation domains yet.
60
+ /// - `start`: Is required to be called in the same serial region as the initializer, so
61
61
/// `JSONRPCConnection` can't have escaped to other isolation domains yet.
62
62
/// - `_close`: Synchronized on `queue`.
63
63
/// - `readyToSend`: Synchronized on `queue`.
64
64
/// - `deinit`: Can also only trivially be called once.
65
- nonisolated ( unsafe) private var state : State
65
+ private nonisolated ( unsafe) var state: State
66
66
67
67
/// Buffer of received bytes that haven't been parsed.
68
+ ///
69
+ /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
70
+ /// - The `receiveIO` handler: This is synchronized on `queue`.
71
+ /// - `requestBufferIsEmpty`: Also synchronized on `queue`.
72
+ private nonisolated ( unsafe) var requestBuffer: [ UInt8 ] = [ ]
73
+
68
74
@_spi ( Testing)
69
- public var _requestBuffer : [ UInt8 ] = [ ]
75
+ public var requestBufferIsEmpty : Bool {
76
+ queue. sync {
77
+ requestBuffer. isEmpty
78
+ }
79
+ }
70
80
71
- private var _nextRequestID : Int = 0
81
+ /// An integer that hasn't been used for a request ID yet.
82
+ ///
83
+ /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are
84
+ /// - `nextRequestID()`: This is synchronized on `queue`.
85
+ private nonisolated ( unsafe) var nextRequestIDStorage: Int = 0
72
86
73
- struct OutstandingRequest {
87
+ struct OutstandingRequest : Sendable {
74
88
var responseType : ResponseType . Type
75
- var replyHandler : ( LSPResult < Any > ) -> Void
89
+ var replyHandler : @ Sendable ( LSPResult < Any > ) -> Void
76
90
}
77
91
78
- /// The set of currently outstanding outgoing requests along with information about how to decode and handle their responses.
79
- private var outstandingRequests : [ RequestID : OutstandingRequest ] = [ : ]
92
+ /// The set of currently outstanding outgoing requests along with information about how to decode and handle their
93
+ /// responses.
94
+ ///
95
+ /// All accesses to `outstandingRequests` must be on `queue` to avoid race conditions.
96
+ private nonisolated ( unsafe) var outstandingRequests: [ RequestID : OutstandingRequest ] = [ : ]
80
97
81
98
/// A handler that will be called asynchronously when the connection is being
82
99
/// closed.
83
- private var closeHandler : ( ( ) async -> Void ) ? = nil
100
+ ///
101
+ /// There are no race conditions to `closeHandler` because it is only set from `start`, which is required to be called
102
+ /// in the same serial code region domain as the initializer, so it's serial and the `JSONRPCConnection` can't
103
+ /// have escaped to other isolation domains yet.
104
+ private nonisolated ( unsafe) var closeHandler: ( @Sendable ( ) async -> Void ) ? = nil
84
105
85
- /// - Important: `start` must be called within the same sequential code region that created the `JSONRPCConnection`.
86
- /// This means that no `await` calls must exist between the creation of the connection and the call to `start` and
87
- /// the `JSONRPCConnection` must not be accessible to any other threads yet.
106
+ /// - Important: `start` must be called before sending any data over the `JSONRPCConnection`.
88
107
public init (
89
108
name: String ,
90
109
protocol messageRegistry: MessageRegistry ,
@@ -95,7 +114,8 @@ public final class JSONRPCConnection: Connection {
95
114
self . name = name
96
115
self . receiveHandler = nil
97
116
#if os(Linux) || os(Android)
98
- // We receive a `SIGPIPE` if we write to a pipe that points to a crashed process. This in particular happens if the target of a `JSONRPCConnection` has crashed and we try to send it a message.
117
+ // We receive a `SIGPIPE` if we write to a pipe that points to a crashed process. This in particular happens if the
118
+ // target of a `JSONRPCConnection` has crashed and we try to send it a message.
99
119
// On Darwin, `DispatchIO` ignores `SIGPIPE` for the pipes handled by it, but that features is not available on Linux.
100
120
// Instead, globally ignore `SIGPIPE` on Linux to prevent us from crashing if the `JSONRPCConnection`'s target crashes.
101
121
globallyDisableSigpipe ( )
@@ -168,49 +188,49 @@ public final class JSONRPCConnection: Connection {
168
188
///
169
189
/// - parameter receiveHandler: The message handler to invoke for requests received on the `inFD`.
170
190
///
171
- /// - Important: `start` must be called within the same sequential code region that created the `JSONRPCConnection`.
172
- /// This means that no `await` calls must exist between the creation of the connection and the call to `start` and
173
- /// the `JSONRPCConnection` must not be accessible to any other threads yet.
174
- public func start( receiveHandler: MessageHandler , closeHandler: @escaping ( ) async -> Void = { } ) {
175
- precondition ( state == . created)
176
- state = . running
177
- self . receiveHandler = receiveHandler
178
- self . closeHandler = closeHandler
179
-
180
- receiveIO. read ( offset: 0 , length: Int . max, queue: queue) { done, data, errorCode in
181
- guard errorCode == 0 else {
182
- #if !os(Windows)
183
- if errorCode != POSIXError . ECANCELED. rawValue {
184
- logger. error ( " IO error reading \( errorCode) " )
191
+ /// - Important: `start` must be called before sending any data over the `JSONRPCConnection`.
192
+ public func start( receiveHandler: MessageHandler , closeHandler: @escaping @Sendable ( ) async -> Void = { } ) {
193
+ queue. sync {
194
+ precondition ( state == . created)
195
+ state = . running
196
+ self . receiveHandler = receiveHandler
197
+ self . closeHandler = closeHandler
198
+
199
+ receiveIO. read ( offset: 0 , length: Int . max, queue: queue) { done, data, errorCode in
200
+ guard errorCode == 0 else {
201
+ #if !os(Windows)
202
+ if errorCode != POSIXError . ECANCELED. rawValue {
203
+ logger. error ( " IO error reading \( errorCode) " )
204
+ }
205
+ #endif
206
+ if done { self . closeAssumingOnQueue ( ) }
207
+ return
185
208
}
186
- #endif
187
- if done { self . closeOnQueue ( ) }
188
- return
189
- }
190
-
191
- if done {
192
- self . closeOnQueue ( )
193
- return
194
- }
195
209
196
- guard let data = data, !data. isEmpty else {
197
- return
198
- }
210
+ if done {
211
+ self . closeAssumingOnQueue ( )
212
+ return
213
+ }
199
214
200
- // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`.
201
- if self . _requestBuffer. isEmpty {
202
- data. withUnsafeBytes { ( pointer: UnsafePointer < UInt8 > ) in
203
- let rest = self . parseAndHandleMessages ( from: UnsafeBufferPointer ( start: pointer, count: data. count) )
204
- self . _requestBuffer. append ( contentsOf: rest)
215
+ guard let data = data, !data. isEmpty else {
216
+ return
205
217
}
206
- } else {
207
- self . _requestBuffer. append ( contentsOf: data)
208
- var unused = 0
209
- self . _requestBuffer. withUnsafeBufferPointer { buffer in
210
- let rest = self . parseAndHandleMessages ( from: buffer)
211
- unused = rest. count
218
+
219
+ // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`.
220
+ if self . requestBuffer. isEmpty {
221
+ data. withUnsafeBytes { ( pointer: UnsafePointer < UInt8 > ) in
222
+ let rest = self . parseAndHandleMessages ( from: UnsafeBufferPointer ( start: pointer, count: data. count) )
223
+ self . requestBuffer. append ( contentsOf: rest)
224
+ }
225
+ } else {
226
+ self . requestBuffer. append ( contentsOf: data)
227
+ var unused = 0
228
+ self . requestBuffer. withUnsafeBufferPointer { buffer in
229
+ let rest = self . parseAndHandleMessages ( from: buffer)
230
+ unused = rest. count
231
+ }
232
+ self . requestBuffer. removeFirst ( self . requestBuffer. count - unused)
212
233
}
213
- self . _requestBuffer. removeFirst ( self . _requestBuffer. count - unused)
214
234
}
215
235
}
216
236
}
@@ -231,7 +251,10 @@ public final class JSONRPCConnection: Connection {
231
251
}
232
252
233
253
/// Parse and handle all messages in `bytes`, returning a slice containing any remaining incomplete data.
254
+ ///
255
+ /// - Important: Must be called on `queue`
234
256
func parseAndHandleMessages( from bytes: UnsafeBufferPointer < UInt8 > ) -> UnsafeBufferPointer < UInt8 > . SubSequence {
257
+ dispatchPrecondition ( condition: . onQueue( queue) )
235
258
let decoder = JSONDecoder ( )
236
259
237
260
// Set message registry to use for model decoding.
@@ -299,7 +322,10 @@ public final class JSONRPCConnection: Connection {
299
322
}
300
323
301
324
/// Handle a single message by dispatching it to `receiveHandler` or an appropriate reply handler.
325
+ ///
326
+ /// - Important: Must be called on `queue`
302
327
func handle( _ message: JSONRPCMessage ) {
328
+ dispatchPrecondition ( condition: . onQueue( queue) )
303
329
switch message {
304
330
case . notification( let notification) :
305
331
notification. _handle ( self . receiveHandler!)
@@ -341,11 +367,11 @@ public final class JSONRPCConnection: Connection {
341
367
sendIO. write ( offset: 0 , data: dispatchData, queue: sendQueue) { [ weak self] done, _, errorCode in
342
368
if errorCode != 0 {
343
369
logger. error ( " IO error sending message \( errorCode) " )
344
- if done {
370
+ if done, let self {
345
371
// An unrecoverable error occurs on the channel’s file descriptor.
346
372
// Close the connection.
347
- self ? . queue. async {
348
- self ? . closeOnQueue ( )
373
+ self . queue. async {
374
+ self . closeAssumingOnQueue ( )
349
375
}
350
376
}
351
377
}
@@ -398,13 +424,13 @@ public final class JSONRPCConnection: Connection {
398
424
/// The user-provided close handler will be called *asynchronously* when all outstanding I/O
399
425
/// operations have completed. No new I/O will be accepted after `close` returns.
400
426
public func close( ) {
401
- queue. sync { closeOnQueue ( ) }
427
+ queue. sync { closeAssumingOnQueue ( ) }
402
428
}
403
429
404
430
/// Close the connection, assuming that the code is already executing on `queue`.
405
431
///
406
432
/// - Important: Must be called on `queue`.
407
- func closeOnQueue ( ) {
433
+ private func closeAssumingOnQueue ( ) {
408
434
dispatchPrecondition ( condition: . onQueue( queue) )
409
435
sendQueue. sync {
410
436
guard state == . running else { return }
@@ -419,9 +445,12 @@ public final class JSONRPCConnection: Connection {
419
445
}
420
446
421
447
/// Request id for the next outgoing request.
422
- func nextRequestID( ) -> RequestID {
423
- _nextRequestID += 1
424
- return . number( _nextRequestID)
448
+ ///
449
+ /// - Important: Must be called on `queue`
450
+ private func nextRequestID( ) -> RequestID {
451
+ dispatchPrecondition ( condition: . onQueue( queue) )
452
+ nextRequestIDStorage += 1
453
+ return . number( nextRequestIDStorage)
425
454
}
426
455
427
456
// MARK: Connection interface
@@ -444,7 +473,7 @@ public final class JSONRPCConnection: Connection {
444
473
/// When the receiving end replies to the request, execute `reply` with the response.
445
474
public func send< Request: RequestType > (
446
475
_ request: Request ,
447
- reply: @escaping ( LSPResult < Request . Response > ) -> Void
476
+ reply: @escaping @ Sendable ( LSPResult < Request . Response > ) -> Void
448
477
) -> RequestID {
449
478
let id : RequestID = self . queue. sync {
450
479
let id = nextRequestID ( )
0 commit comments