Skip to content

Commit 7ccd7ac

Browse files
authored
SWIFT-719 Add a forEach method to async cursors and change streams (#408)
1 parent e99e5f5 commit 7ccd7ac

File tree

11 files changed

+281
-85
lines changed

11 files changed

+281
-85
lines changed

Sources/MongoSwift/ChangeStream.swift

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,11 @@ public class ChangeStream<T: Codable>: CursorProtocol {
108108
}
109109
}
110110

111-
deinit {
112-
assert(!self.isAlive, "change stream wasn't closed")
113-
}
114-
115111
/// Indicates whether this change stream has the potential to return more data.
116-
public var isAlive: Bool {
117-
return self.wrappedCursor.isAlive
112+
public func isAlive() -> EventLoopFuture<Bool> {
113+
return self.client.operationExecutor.execute {
114+
self.wrappedCursor.isAlive
115+
}
118116
}
119117

120118
/// The `ResumeToken` associated with the most recent event seen by the change stream.
@@ -128,10 +126,13 @@ public class ChangeStream<T: Codable>: CursorProtocol {
128126
* (specified on the `ChangeStreamOptions` passed to the method that created this change stream) before trying
129127
* again.
130128
*
131-
* A thread from the pool will be occupied until the returned future is completed, so performance degradation
132-
* is possible if the number of polling change streams is too close to the total number of threads in the thread
133-
* pool. To configure the total number of threads in the pool, set the `ClientOptions.threadPoolSize` option
134-
* during client creation.
129+
* A thread from the driver's internal thread pool will be occupied until the returned future is completed, so
130+
* performance degradation is possible if the number of polling change streams is too close to the total number of
131+
* threads in the thread pool. To configure the total number of threads in the pool, set the
132+
* `ClientOptions.threadPoolSize` option during client creation.
133+
*
134+
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
135+
* this method is unresolved. Doing so will result in undefined behavior.
135136
*
136137
* - Returns:
137138
* An `EventLoopFuture<T?>` evaluating to the next `T` in this change stream, `nil` if the change stream is
@@ -158,6 +159,9 @@ public class ChangeStream<T: Codable>: CursorProtocol {
158159
*
159160
* This method may be called repeatedly while `isAlive` is true to retrieve new data.
160161
*
162+
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
163+
* this method is unresolved. Doing so will result in undefined behavior.
164+
*
161165
* - Returns:
162166
* An `EventLoopFuture<T?>` containing the next `T` in this change stream, an error if one occurred, or `nil` if
163167
* there was no data.
@@ -180,6 +184,9 @@ public class ChangeStream<T: Codable>: CursorProtocol {
180184
* Since `toArray` will only fetch the currently available results, it may return more data if it is called again
181185
* while the change stream is still alive.
182186
*
187+
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
188+
* this method is unresolved. Doing so will result in undefined behavior.
189+
*
183190
* - Returns:
184191
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this change stream, or an error.
185192
*
@@ -195,6 +202,35 @@ public class ChangeStream<T: Codable>: CursorProtocol {
195202
}
196203
}
197204

205+
/**
206+
* Calls the provided closure with each event in the change stream as it arrives.
207+
*
208+
* A thread from the driver's internal thread pool will be occupied until the returned future is completed, so
209+
* performance degradation is possible if the number of polling change streams is too close to the total number of
210+
* threads in the thread pool. To configure the total number of threads in the pool, set the
211+
* `ClientOptions.threadPoolSize` option during client creation.
212+
*
213+
* Note: You *must not* call any change stream methods besides `kill` and `isAlive` while the future returned from
214+
* this method is unresolved. Doing so will result in undefined behavior.
215+
*
216+
* - Returns:
217+
* An `EventLoopFuture<Void>` which will complete once the change stream is closed or once an error is
218+
* encountered.
219+
*
220+
* If the future evaluates to an error, that error is likely one of the following:
221+
* - `CommandError` if an error occurs while fetching more results from the server.
222+
* - `LogicError` if this function is called after the change stream has died.
223+
* - `LogicError` if this function is called and the session associated with this change stream is inactive.
224+
* - `DecodingError` if an error occurs decoding the server's responses.
225+
*/
226+
public func forEach(_ body: @escaping (T) throws -> Void) -> EventLoopFuture<Void> {
227+
return self.client.operationExecutor.execute {
228+
while let next = try self.processEvent(self.wrappedCursor.next()) {
229+
try body(next)
230+
}
231+
}
232+
}
233+
198234
/**
199235
* Kill this change stream.
200236
*

Sources/MongoSwift/CursorCommon.swift

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ internal protocol CursorProtocol {
1111
/**
1212
* Indicates whether this cursor has the potential to return more data.
1313
*
14-
* This property is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
14+
* This method is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
1515
* even after returning `nil`.
1616
*
1717
* If this cursor is non-tailable, it will always be dead as soon as either `tryNext` returns `nil` or an error.
1818
*
1919
* This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`.
2020
*/
21-
var isAlive: Bool { get }
21+
func isAlive() -> EventLoopFuture<Bool>
2222

2323
/**
2424
* Get the next `T` from the cursor.
@@ -242,7 +242,13 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
242242
}
243243
}
244244

245-
/// Whether the cursor is alive.
245+
/// Asserts that the cursor was closed.
246+
deinit {
247+
assert(!self._isAlive, "cursor or change stream wasn't closed before it went out of scope")
248+
}
249+
250+
/// Whether the cursor is alive. This property can block while waiting for the lock and should only be accessed
251+
/// from within the executor.
246252
internal var isAlive: Bool {
247253
return self.lock.withLock {
248254
self._isAlive
@@ -301,7 +307,7 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
301307
}
302308
}
303309

304-
/// Retreive all the currently available documents in the result set.
310+
/// Retrieve all the currently available documents in the result set.
305311
/// This will not exhaust the cursor.
306312
/// This method is blocking and should only be run via the executor.
307313
internal func toArray() throws -> [Document] {

Sources/MongoSwift/MongoCursor.swift

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public class MongoCursor<T: Codable>: CursorProtocol {
4141
/// Decoder from the client, database, or collection that created this cursor.
4242
internal let decoder: BSONDecoder
4343

44+
/// The ID used by the server to track the cursor over time. If all of the cursor's results were returnable in a
45+
/// single batch, or if the cursor contained no results, this value will be nil.
46+
public let id: Int64?
47+
4448
/**
4549
* Initializes a new `MongoCursor` instance. Not meant to be instantiated directly by a user. When `forceIO` is
4650
* true, this initializer will force a connection to the server if one is not already established.
@@ -66,11 +70,14 @@ public class MongoCursor<T: Codable>: CursorProtocol {
6670
session: session,
6771
type: cursorType ?? .nonTailable
6872
)
69-
}
7073

71-
/// Asserts that the cursor was closed.
72-
deinit {
73-
assert(!self.isAlive, "cursor wasn't closed before it went out of scope")
74+
self.id = self.wrappedCursor.withUnsafeMongocPointer { ptr in
75+
guard let ptr = ptr else {
76+
return nil
77+
}
78+
let id = mongoc_cursor_get_id(ptr)
79+
return id == 0 ? nil : id
80+
}
7481
}
7582

7683
/// Decodes a result to the generic type or `nil` if no result were returned.
@@ -89,25 +96,16 @@ public class MongoCursor<T: Codable>: CursorProtocol {
8996
/**
9097
* Indicates whether this cursor has the potential to return more data.
9198
*
92-
* This property is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
99+
* This method is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
93100
* even after returning `nil`.
94101
*
95102
* If this cursor is non-tailable, it will always be dead as soon as either `tryNext` returns `nil` or an error.
96103
*
97104
* This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`.
98105
*/
99-
public var isAlive: Bool {
100-
return self.wrappedCursor.isAlive
101-
}
102-
103-
/// Returns the ID used by the server to track the cursor. `nil` once all results have been fetched from the server.
104-
public var id: Int64? {
105-
return self.wrappedCursor.withUnsafeMongocPointer { ptr in
106-
guard let ptr = ptr else {
107-
return nil
108-
}
109-
let id = mongoc_cursor_get_id(ptr)
110-
return id == 0 ? nil : id
106+
public func isAlive() -> EventLoopFuture<Bool> {
107+
return self.client.operationExecutor.execute {
108+
self.wrappedCursor.isAlive
111109
}
112110
}
113111

@@ -120,6 +118,9 @@ public class MongoCursor<T: Codable>: CursorProtocol {
120118
* before evaluating to `nil`. This option can be configured via options passed to the method that created this
121119
* cursor (e.g. the `maxAwaitTimeMS` option on the `FindOptions` passed to `find`).
122120
*
121+
* Note: You *must not* call any cursor methods besides `kill` and `isAlive` while the future returned from this
122+
* method is unresolved. Doing so will result in undefined behavior.
123+
*
123124
* - Returns:
124125
* An `EventLoopFuture<T?>` containing the next `T` in this cursor, an error if one occurred, or `nil` if
125126
* there was no data.
@@ -143,6 +144,14 @@ public class MongoCursor<T: Codable>: CursorProtocol {
143144
* If this cursor is tailable, this method will continue polling until a non-empty batch is returned from the server
144145
* or the cursor is closed.
145146
*
147+
* A thread from the driver's internal thread pool will be occupied until the returned future is completed, so
148+
* performance degradation is possible if the number of polling cursors is too close to the total number of threads
149+
* in the thread pool. To configure the total number of threads in the pool, set the `ClientOptions.threadPoolSize`
150+
* option during client creation.
151+
*
152+
* Note: You *must not* call any cursor methods besides `kill` and `isAlive` while the future returned from this
153+
* method is unresolved. Doing so will result in undefined behavior.
154+
*
146155
* - Returns:
147156
* An `EventLoopFuture<T?>` evaluating to the next `T` in this cursor, or `nil` if the cursor is exhausted. If
148157
* the underlying cursor is tailable, the future will not resolve until data is returned (potentially after
@@ -168,6 +177,9 @@ public class MongoCursor<T: Codable>: CursorProtocol {
168177
* If this cursor is tailable, `toArray` will only fetch the currently available results, and it
169178
* may return more data if it is called again while the cursor is still alive.
170179
*
180+
* Note: You *must not* call any cursor methods besides `kill` and `isAlive` while the future returned from this
181+
* method is unresolved. Doing so will result in undefined behavior.
182+
*
171183
* - Returns:
172184
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error.
173185
*
@@ -183,6 +195,39 @@ public class MongoCursor<T: Codable>: CursorProtocol {
183195
}
184196
}
185197

198+
/**
199+
* Calls the provided closure with each element in the cursor.
200+
*
201+
* If the cursor is not tailable, this method will exhaust it, calling the closure with every document.
202+
*
203+
* If the cursor is tailable, the method will call the closure with each new document as it arrives.
204+
*
205+
* A thread from the driver's internal thread pool will be occupied until the returned future is completed, so
206+
* performance degradation is possible if the number of polling cursors is too close to the total number of threads
207+
* in the thread pool. To configure the total number of threads in the pool, set the `ClientOptions.threadPoolSize`
208+
* option during client creation.
209+
*
210+
* Note: You *must not* call any cursor methods besides `kill` and `isAlive` while the future returned from this
211+
* method is unresolved. Doing so will result in undefined behavior.
212+
*
213+
* - Returns:
214+
* An `EventLoopFuture<Void>` which will succeed when the end of the cursor is reached, or in the case of a
215+
* tailable cursor, when the cursor is killed via `kill`.
216+
*
217+
* If the future evaluates to an error, that error is likely one of the following:
218+
* - `CommandError` if an error occurs while fetching more results from the server.
219+
* - `LogicError` if this function is called after the cursor has died.
220+
* - `LogicError` if this function is called and the session associated with this cursor is inactive.
221+
* - `DecodingError` if an error occurs decoding the server's responses.
222+
*/
223+
public func forEach(_ body: @escaping (T) throws -> Void) -> EventLoopFuture<Void> {
224+
return self.client.operationExecutor.execute {
225+
while let next = try self.decode(result: self.wrappedCursor.next()) {
226+
try body(next)
227+
}
228+
}
229+
}
230+
186231
/**
187232
* Kill this cursor.
188233
*

Sources/MongoSwiftSync/ChangeStream.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ public class ChangeStream<T: Codable>: CursorProtocol {
2929
* This change stream will be dead if `next` returns `nil` or an error. It will also be dead if `tryNext` returns
3030
* an error, but will still be alive if `tryNext` returns `nil`.
3131
*/
32-
public var isAlive: Bool {
33-
return self.asyncChangeStream.isAlive
32+
public func isAlive() -> Bool {
33+
do {
34+
return try self.asyncChangeStream.isAlive().wait()
35+
} catch {
36+
return false
37+
}
3438
}
3539

3640
/**

Sources/MongoSwiftSync/CursorCommon.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ internal protocol CursorProtocol: LazySequenceProtocol, IteratorProtocol {
66
/**
77
* Indicates whether this cursor has the potential to return more data.
88
*
9-
* This property is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
9+
* This method is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
1010
* even after returning `nil`.
1111
*
1212
* If this cursor is non-tailable, it will always be dead as soon as either `tryNext` returns `nil` or an error.
1313
*
1414
* This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`.
1515
*/
16-
var isAlive: Bool { get }
16+
func isAlive() throws -> Bool
1717

1818
/**
1919
* Get the next `T` from the cursor.

Sources/MongoSwiftSync/MongoCursor.swift

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@ public class MongoCursor<T: Codable>: CursorProtocol {
1616
/**
1717
* Indicates whether this cursor has the potential to return more data.
1818
*
19-
* This property is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
19+
* This method is mainly useful if this cursor is tailable, since in that case `tryNext` may return more results
2020
* even after returning `nil`.
2121
*
2222
* If this cursor is non-tailable, it will always be dead as soon as either `tryNext` returns `nil` or an error.
2323
*
2424
* This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`.
2525
*/
26-
public var isAlive: Bool {
27-
return self.asyncCursor.isAlive
26+
public func isAlive() -> Bool {
27+
do {
28+
return try self.asyncCursor.isAlive().wait()
29+
} catch {
30+
return false
31+
}
2832
}
2933

3034
/// Returns the ID used by the server to track the cursor. `nil` once all results have been fetched from the server.

Tests/LinuxMain.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ extension AsyncMongoCursorTests {
1414
("testTailableAsyncCursor", testTailableAsyncCursor),
1515
("testAsyncNext", testAsyncNext),
1616
("testCursorToArray", testCursorToArray),
17+
("testForEach", testForEach),
18+
("testCursorId", testCursorId),
1719
]
1820
}
1921

@@ -51,6 +53,7 @@ extension ChangeStreamTests {
5153
("testChangeStreamError", testChangeStreamError),
5254
("testChangeStreamEmpty", testChangeStreamEmpty),
5355
("testChangeStreamToArray", testChangeStreamToArray),
56+
("testChangeStreamForEach", testChangeStreamForEach),
5457
]
5558
}
5659

0 commit comments

Comments
 (0)