Skip to content

Commit ed818f7

Browse files
committed
Add thread safety tests for withLatestFrom operator
- Add comprehensive thread safety tests for issues #163 and #171 - Test concurrent emissions from multiple threads - Test rapid emissions stress scenario - Test self-reference edge case - Extract UnsafeSendableBox helper to TestHelpers.swift for reuse - All 192 tests pass
1 parent 43b7ceb commit ed818f7

File tree

3 files changed

+181
-3
lines changed

3 files changed

+181
-3
lines changed

Tests/ReplaySubjectTests.swift

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,6 @@ final class ReplaySubjectTests: XCTestCase {
404404
// The new subscription should always receive [1, 2, 3]
405405

406406
// Wrap in @unchecked Sendable to explicitly acknowledge intentional concurrent access for testing
407-
struct UnsafeSendableBox<T>: @unchecked Sendable {
408-
let value: T
409-
}
410407
let subject = UnsafeSendableBox(value: replaySubject)
411408

412409
await withTaskGroup(of: Void.self) { taskGroup in

Tests/TestHelpers.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
//
2+
// TestHelpers.swift
3+
// CombineExtTests
4+
//
5+
// Created by Shai Mishali on 20/01/26.
6+
// Copyright © 2026 Combine Community. All rights reserved.
7+
//
8+
9+
#if !os(watchOS)
10+
import Foundation
11+
12+
/// A wrapper to explicitly mark values as @unchecked Sendable for testing purposes.
13+
/// This is used in concurrency tests where we intentionally access Combine publishers
14+
/// from multiple threads to verify thread-safety of operators.
15+
///
16+
/// - Warning: This should only be used in tests where concurrent access is intentional
17+
/// and the operator under test is expected to handle thread-safety internally.
18+
public struct UnsafeSendableBox<T>: @unchecked Sendable {
19+
public let value: T
20+
21+
public init(value: T) {
22+
self.value = value
23+
}
24+
}
25+
#endif

Tests/WithLatestFromTests.swift

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,5 +605,161 @@ class WithLatestFromTests: XCTestCase {
605605
XCTAssertNil(weakSubject3)
606606
XCTAssertNil(weakSubject4)
607607
}
608+
609+
// MARK: - Thread Safety Tests (Issue #163, #171)
610+
611+
func testThreadSafetyWithConcurrentEmissions() async {
612+
// Test for issue #163 - withLatestFrom should be thread safe
613+
// when subscribing to publishers emitting from different threads
614+
let iterations = 100
615+
616+
actor ResultCollector {
617+
var results: [String] = []
618+
619+
func append(_ value: String) {
620+
results.append(value)
621+
}
622+
623+
func getCount() -> Int {
624+
results.count
625+
}
626+
}
627+
628+
for _ in 0 ..< iterations {
629+
let subject1 = PassthroughSubject<Int, Never>()
630+
let subject2 = PassthroughSubject<String, Never>()
631+
let collector = ResultCollector()
632+
633+
// Wrap in Sendable box for intentional concurrent access in tests
634+
let box1 = UnsafeSendableBox(value: subject1)
635+
let box2 = UnsafeSendableBox(value: subject2)
636+
637+
subscription = subject1
638+
.withLatestFrom(subject2) { "\($0)-\($1)" }
639+
.sink { value in
640+
Task { await collector.append(value) }
641+
}
642+
643+
// Emit from different threads concurrently
644+
await withTaskGroup(of: Void.self) { group in
645+
group.addTask {
646+
for i in 0 ..< 10 {
647+
box2.value.send("value\(i)")
648+
try? await Task.sleep(nanoseconds: 1000)
649+
}
650+
}
651+
652+
group.addTask {
653+
// Small delay to ensure subject2 has emitted first
654+
try? await Task.sleep(nanoseconds: 10000)
655+
for i in 0 ..< 10 {
656+
box1.value.send(i)
657+
try? await Task.sleep(nanoseconds: 1000)
658+
}
659+
}
660+
}
661+
662+
// Small delay to allow sink to process
663+
try? await Task.sleep(nanoseconds: 100_000)
664+
665+
let count = await collector.getCount()
666+
XCTAssertGreaterThan(count, 0, "Should have received at least one result")
667+
}
668+
}
669+
670+
func testThreadSafetyWithSelfReference() async {
671+
// Test for issue #171 - withLatestFrom with self-reference should not crash
672+
// This tests thread-safety, not timing guarantees (which aren't promised for self-reference)
673+
let iterations = 50
674+
675+
for _ in 0 ..< iterations {
676+
let nodes = CurrentValueSubject<[Int], Never>([])
677+
let box = UnsafeSendableBox(value: nodes)
678+
var didReceiveValue = false
679+
680+
subscription = nodes
681+
.dropFirst()
682+
.filter { !$0.isEmpty }
683+
.withLatestFrom(nodes)
684+
.sink { _ in
685+
didReceiveValue = true
686+
}
687+
688+
// Emit from different threads concurrently
689+
await withTaskGroup(of: Void.self) { group in
690+
group.addTask {
691+
box.value.send([1, 2, 3])
692+
}
693+
694+
group.addTask {
695+
try? await Task.sleep(nanoseconds: 10000)
696+
box.value.send([1, 2, 3, 4])
697+
}
698+
699+
group.addTask {
700+
try? await Task.sleep(nanoseconds: 20000)
701+
box.value.send([1, 2, 3, 4, 5])
702+
}
703+
}
704+
705+
// Small delay to allow sink to process
706+
try? await Task.sleep(nanoseconds: 200_000)
707+
708+
// The key test is that we don't crash - receiving values is a bonus
709+
XCTAssertTrue(didReceiveValue || !didReceiveValue, "Test completed without crashing")
710+
}
711+
}
712+
713+
func testThreadSafetyWithRapidEmissions() async {
714+
// Stress test with rapid emissions from multiple threads
715+
let subject1 = PassthroughSubject<Int, Never>()
716+
let subject2 = PassthroughSubject<Int, Never>()
717+
718+
actor ResultCollector {
719+
var results: [Int] = []
720+
721+
func append(_ value: Int) {
722+
results.append(value)
723+
}
724+
725+
func getCount() -> Int {
726+
results.count
727+
}
728+
}
729+
730+
let collector = ResultCollector()
731+
let box1 = UnsafeSendableBox(value: subject1)
732+
let box2 = UnsafeSendableBox(value: subject2)
733+
734+
subscription = subject1
735+
.withLatestFrom(subject2) { $0 + $1 }
736+
.sink { value in
737+
Task { await collector.append(value) }
738+
}
739+
740+
await withTaskGroup(of: Void.self) { group in
741+
// Rapidly emit from subject2
742+
group.addTask {
743+
for i in 0 ..< 1000 {
744+
box2.value.send(i)
745+
}
746+
}
747+
748+
// Rapidly emit from subject1
749+
group.addTask {
750+
try? await Task.sleep(nanoseconds: 100_000) // Small delay
751+
for i in 0 ..< 1000 {
752+
box1.value.send(i)
753+
}
754+
}
755+
}
756+
757+
// Small delay to allow sink to process
758+
try? await Task.sleep(nanoseconds: 1_000_000)
759+
760+
let count = await collector.getCount()
761+
XCTAssertGreaterThan(count, 0, "Should have received results")
762+
XCTAssertLessThanOrEqual(count, 1000, "Should not receive more results than emissions")
763+
}
608764
}
609765
#endif

0 commit comments

Comments
 (0)