Skip to content

Commit 2adcd72

Browse files
committed
Make thread safe
1 parent a5adf26 commit 2adcd72

File tree

2 files changed

+99
-11
lines changed

2 files changed

+99
-11
lines changed

stdlib/public/Distributed/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ add_swift_target_library(swift_Distributed ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
2626
SWIFT_MODULE_DEPENDS_OPENBSD Glibc
2727
SWIFT_MODULE_DEPENDS_CYGWIN Glibc
2828
SWIFT_MODULE_DEPENDS_HAIKU Glibc
29-
SWIFT_MODULE_DEPENDS_WINDOWS CRT
29+
SWIFT_MODULE_DEPENDS_WINDOWS CRT WinSDK
3030

3131
LINK_LIBRARIES ${swift_distributed_link_libraries}
3232

stdlib/public/Distributed/LocalTestingDistributedActorSystem.swift

Lines changed: 98 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212

1313
import Swift
1414

15+
#if canImport(Glibc)
16+
import Glibc
17+
#elseif os(Windows)
18+
import WinSDK
19+
#endif
20+
1521
public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
1622
public let address: String
1723

@@ -30,7 +36,6 @@ public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
3036
}
3137
}
3238

33-
// TODO(distributed): not thread safe...
3439
@available(SwiftStdlib 5.7, *)
3540
public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable {
3641
public typealias ActorID = LocalTestingActorAddress
@@ -39,15 +44,17 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
3944
public typealias SerializationRequirement = Codable
4045

4146
private var activeActors: [ActorID: DistributedActor] = [:]
47+
private let activeActorsLock = _Lock()
4248

4349
private var idProvider: ActorIDProvider = ActorIDProvider()
4450
private var assignedIDs: Set<ActorID> = []
51+
private let assignedIDsLock = _Lock()
4552

4653
public init() {}
4754

4855
public func resolve<Act>(id: ActorID, as actorType: Act.Type)
4956
throws -> Act? where Act: DistributedActor {
50-
guard let anyActor = self.activeActors[id] else {
57+
guard let anyActor = self.activeActorsLock.withLock({ self.activeActors[id] }) else {
5158
throw LocalTestingDistributedActorSystemError(message: "Unable to locate id '\(id)' locally")
5259
}
5360
guard let actor = anyActor as? Act else {
@@ -59,24 +66,30 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
5966
public func assignID<Act>(_ actorType: Act.Type) -> ActorID
6067
where Act: DistributedActor {
6168
let id = self.idProvider.next()
62-
self.assignedIDs.insert(id)
69+
self.assignedIDsLock.withLock {
70+
self.assignedIDs.insert(id)
71+
}
6372
return id
6473
}
6574

6675
public func actorReady<Act>(_ actor: Act)
6776
where Act: DistributedActor,
6877
Act.ID == ActorID {
69-
guard self.assignedIDs.contains(actor.id) else {
78+
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(actor.id) }) else {
7079
fatalError("Attempted to mark an unknown actor '\(actor.id)' ready")
7180
}
72-
self.activeActors[actor.id] = actor
81+
self.activeActorsLock.withLock {
82+
self.activeActors[actor.id] = actor
83+
}
7384
}
7485

7586
public func resignID(_ id: ActorID) {
76-
guard self.assignedIDs.contains(id) else {
87+
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(id) }) else {
7788
fatalError("Attempted to resign unknown id '\(id)'")
7889
}
79-
self.activeActors.removeValue(forKey: id)
90+
self.activeActorsLock.withLock {
91+
self.activeActors.removeValue(forKey: id)
92+
}
8093
}
8194

8295
public func makeInvocationEncoder() -> InvocationEncoder {
@@ -109,15 +122,18 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
109122
fatalError("Attempted to make remote call on actor \(actor) in a local-only actor system")
110123
}
111124

112-
// TODO(distributed): not thread safe...
113125
private struct ActorIDProvider {
114126
private var counter: Int = 0
127+
private let counterLock = _Lock()
115128

116129
init() {}
117130

118131
mutating func next() -> LocalTestingActorAddress {
119-
self.counter += 1
120-
return LocalTestingActorAddress(parse: "\(self.counter)")
132+
let id: Int = self.counterLock.withLock {
133+
self.counter += 1
134+
return self.counter
135+
}
136+
return LocalTestingActorAddress(parse: "\(id)")
121137
}
122138
}
123139
}
@@ -176,3 +192,75 @@ public struct LocalTestingDistributedActorSystemError: DistributedActorSystemErr
176192
self.message = message
177193
}
178194
}
195+
196+
// === lock ----------------------------------------------------------------
197+
198+
fileprivate class _Lock {
199+
#if os(Windows)
200+
private let underlying: UnsafeMutablePointer<SRWLOCK>
201+
#elseif os(Cygwin) || os(FreeBSD) || os(OpenBSD)
202+
private let underlying: UnsafeMutablePointer<pthread_mutex_t?>
203+
#elseif os(WASI)
204+
// pthread is currently not available on WASI
205+
#else
206+
private let underlying: UnsafeMutablePointer<pthread_mutex_t>
207+
#endif
208+
209+
deinit {
210+
#if os(Windows)
211+
// Mutexes do not need to be explicitly destroyed
212+
#elseif os(WASI)
213+
// WASI environment has only a single thread
214+
#else
215+
guard pthread_mutex_destroy(self.underlying) == 0 else {
216+
fatalError("pthread_mutex_destroy failed")
217+
}
218+
#endif
219+
220+
#if !os(WASI)
221+
self.underlying.deinitialize(count: 1)
222+
self.underlying.deallocate()
223+
#endif
224+
}
225+
226+
init() {
227+
#if os(Windows)
228+
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
229+
InitializeSRWLock(self.underlying)
230+
#elseif os(WASI)
231+
// WASI environment has only a single thread
232+
#else
233+
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
234+
guard pthread_mutex_init(self.underlying, nil) == 0 else {
235+
fatalError("pthread_mutex_init failed")
236+
}
237+
#endif
238+
}
239+
240+
@discardableResult
241+
func withLock<T>(_ body: () -> T) -> T {
242+
#if os(Windows)
243+
AcquireSRWLockExclusive(self.underlying)
244+
#elseif os(WASI)
245+
// WASI environment has only a single thread
246+
#else
247+
guard pthread_mutex_lock(self.underlying) == 0 else {
248+
fatalError("pthread_mutex_lock failed")
249+
}
250+
#endif
251+
252+
defer {
253+
#if os(Windows)
254+
ReleaseSRWLockExclusive(self.underlying)
255+
#elseif os(WASI)
256+
// WASI environment has only a single thread
257+
#else
258+
guard pthread_mutex_unlock(self.underlying) == 0 else {
259+
fatalError("pthread_mutex_unlock failed")
260+
}
261+
#endif
262+
}
263+
264+
return body()
265+
}
266+
}

0 commit comments

Comments
 (0)