1+ import Testing
2+ @testable import AWSLambdaRuntime
3+
4+ struct PoolTests {
5+
6+ @Test
7+ func testBasicPushAndIteration( ) async throws {
8+ let pool = LambdaHTTPServer . Pool < String > ( )
9+
10+ // Push values
11+ await pool. push ( " first " )
12+ await pool. push ( " second " )
13+
14+ // Iterate and verify order
15+ var values = [ String] ( )
16+ for try await value in pool {
17+ values. append ( value)
18+ if values. count == 2 { break }
19+ }
20+
21+ #expect( values == [ " first " , " second " ] )
22+ }
23+
24+ @Test
25+ func testCancellation( ) async throws {
26+ let pool = LambdaHTTPServer . Pool < String > ( )
27+
28+ // Create a task that will be cancelled
29+ let task = Task {
30+ for try await _ in pool {
31+ Issue . record ( " Should not receive any values after cancellation " )
32+ }
33+ }
34+
35+ // Cancel the task immediately
36+ task. cancel ( )
37+
38+ // This should complete without receiving any values
39+ try await task. value
40+ }
41+
42+ @Test
43+ func testConcurrentPushAndIteration( ) async throws {
44+ let pool = LambdaHTTPServer . Pool < Int > ( )
45+ let iterations = 1000
46+ var receivedValues = Set < Int > ( )
47+
48+ // Start consumer task first
49+ let consumer = Task {
50+ var count = 0
51+ for try await value in pool {
52+ receivedValues. insert ( value)
53+ count += 1
54+ if count >= iterations { break }
55+ }
56+ }
57+
58+ // Create multiple producer tasks
59+ try await withThrowingTaskGroup ( of: Void . self) { group in
60+ for i in 0 ..< iterations {
61+ group. addTask {
62+ await pool. push ( i)
63+ }
64+ }
65+ try await group. waitForAll ( )
66+ }
67+
68+ // Wait for consumer to complete
69+ try await consumer. value
70+
71+ // Verify all values were received exactly once
72+ #expect( receivedValues. count == iterations)
73+ #expect( Set ( 0 ..< iterations) == receivedValues)
74+ }
75+
76+ @Test
77+ func testPushToWaitingConsumer( ) async throws {
78+ let pool = LambdaHTTPServer . Pool < String > ( )
79+ let expectedValue = " test value "
80+
81+ // Start a consumer that will wait for a value
82+ let consumer = Task {
83+ for try await value in pool {
84+ #expect( value == expectedValue)
85+ break
86+ }
87+ }
88+
89+ // Give consumer time to start waiting
90+ try await Task . sleep ( nanoseconds: 100_000_000 ) // 0.1 seconds
91+
92+ // Push a value
93+ await pool. push ( expectedValue)
94+
95+ // Wait for consumer to complete
96+ try await consumer. value
97+ }
98+
99+ @Test
100+ func testStressTest( ) async throws {
101+ let pool = LambdaHTTPServer . Pool < Int > ( )
102+ let producerCount = 10
103+ let messagesPerProducer = 1000
104+ var receivedValues = [ Int] ( )
105+
106+ // Start consumer
107+ let consumer = Task {
108+ var count = 0
109+ for try await value in pool {
110+ receivedValues. append ( value)
111+ count += 1
112+ if count >= producerCount * messagesPerProducer { break }
113+ }
114+ }
115+
116+ // Create multiple producers
117+ try await withThrowingTaskGroup ( of: Void . self) { group in
118+ for p in 0 ..< producerCount {
119+ group. addTask {
120+ for i in 0 ..< messagesPerProducer {
121+ await pool. push ( p * messagesPerProducer + i)
122+ }
123+ }
124+ }
125+ try await group. waitForAll ( )
126+ }
127+
128+ // Wait for consumer to complete
129+ try await consumer. value
130+
131+ // Verify we received all values
132+ #expect( receivedValues. count == producerCount * messagesPerProducer)
133+ #expect( Set ( receivedValues) . count == producerCount * messagesPerProducer)
134+ }
135+ }
0 commit comments