Skip to content

Commit ba8f3da

Browse files
authored
SWIFT-718 Add a toArray method to async cursor and change stream (#406)
1 parent 0bef140 commit ba8f3da

File tree

7 files changed

+222
-104
lines changed

7 files changed

+222
-104
lines changed

Sources/MongoSwift/ChangeStream.swift

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import NIO
66
private struct MongocChangeStream: MongocCursorWrapper {
77
internal let pointer: OpaquePointer
88

9+
internal static var isLazy: Bool { return false }
10+
911
fileprivate init(stealing ptr: OpaquePointer) {
1012
self.pointer = ptr
1113
}
@@ -63,11 +65,16 @@ public class ChangeStream<T: Codable>: CursorProtocol {
6365
/// The cursor this change stream is wrapping.
6466
private let wrappedCursor: Cursor<MongocChangeStream>
6567

66-
/// Process an event before returning it to the user.
68+
/// Process an event before returning it to the user, or does nothing and returns nil if the provided event is nil.
6769
private func processEvent(_ event: Document?) throws -> T? {
6870
guard let event = event else {
6971
return nil
7072
}
73+
return try self.processEvent(event)
74+
}
75+
76+
/// Process an event before returning it to the user.
77+
private func processEvent(_ event: Document) throws -> T {
7178
// Update the resumeToken with the `_id` field from the document.
7279
guard let resumeToken = event["_id"]?.documentValue else {
7380
throw InternalError(message: "_id field is missing from the change stream document.")
@@ -167,6 +174,27 @@ public class ChangeStream<T: Codable>: CursorProtocol {
167174
}
168175
}
169176

177+
/**
178+
* Consolidate the currently available results of the change stream into an array of type `T`.
179+
*
180+
* Since `toArray` will only fetch the currently available results, it may return more data if it is called again
181+
* while the change stream is still alive.
182+
*
183+
* - Returns:
184+
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this change stream, or an error.
185+
*
186+
* If the future evaluates to an error, that error is likely one of the following:
187+
* - `CommandError` if an error occurs while fetching more results from the server.
188+
* - `LogicError` if this function is called after the change stream has died.
189+
* - `LogicError` if this function is called and the session associated with this change stream is inactive.
190+
* - `DecodingError` if an error occurs decoding the server's responses.
191+
*/
192+
public func toArray() -> EventLoopFuture<[T]> {
193+
return self.client.operationExecutor.execute {
194+
try self.wrappedCursor.toArray().map(self.processEvent)
195+
}
196+
}
197+
170198
/**
171199
* Kill this change stream.
172200
*

Sources/MongoSwift/CursorCommon.swift

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ internal protocol CursorProtocol {
4040
*/
4141
func tryNext() -> EventLoopFuture<T?>
4242

43+
/// Retrieves all the documents currently available in this cursor. If the cursor is not tailable, exhausts it. If
44+
/// the cursor is tailable or is a change stream, this method may return more data if it is called again while the
45+
/// cursor is still alive.
46+
func toArray() -> EventLoopFuture<[T]>
47+
4348
/**
4449
* Kills this cursor.
4550
*
@@ -67,6 +72,9 @@ internal protocol MongocCursorWrapper {
6772
/// The underlying libmongoc pointer.
6873
var pointer: OpaquePointer { get }
6974

75+
/// Indicates whether this type lazily sends its corresponding initial command to the server.
76+
static var isLazy: Bool { get }
77+
7078
/// Method wrapping the appropriate libmongoc "error" function (e.g. `mongoc_cursor_error_document`).
7179
func errorDocument(bsonError: inout bson_error_t, replyPtr: UnsafeMutablePointer<BSONPointer?>) -> Bool
7280

@@ -96,10 +104,28 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
96104
/// The state of this cursor.
97105
private var state: State
98106

107+
/// Used to store a cached next value to return, if one exists.
108+
private enum CachedDocument {
109+
/// Indicates that the associated value is the next value to return. This value may be nil.
110+
case cached(Document?)
111+
/// Indicates that there is no value cached.
112+
case none
113+
114+
/// Get the contents of the cache and clear it.
115+
fileprivate mutating func clear() -> CachedDocument {
116+
let copy = self
117+
self = .none
118+
return copy
119+
}
120+
}
121+
122+
/// Tracks the caching status of this cursor.
123+
private var cached: CachedDocument
124+
99125
/// The type of this cursor. Useful for indicating whether or not it is tailable.
100126
private let type: CursorType
101127

102-
/// Lock used to synchronize usage of the internal state.
128+
/// Lock used to synchronize usage of the internal state: specifically the `state` and `cached` properties.
103129
/// This lock should only be acquired in the bodies of non-private methods.
104130
private let lock: Lock
105131

@@ -202,16 +228,34 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
202228
self.type = type
203229
self.lock = Lock()
204230
self.isClosing = NIOAtomic.makeAtomic(value: false)
231+
self.cached = .none
205232

206233
// If there was an error constructing the cursor, throw it.
207234
if let error = self.getMongocError() {
208235
self.close()
209236
throw error
210237
}
238+
239+
// if this type lazily sends its initial command, retrieve and cache the first document so that we start I/O.
240+
if CursorKind.isLazy {
241+
self.cached = try .cached(self.tryNext())
242+
}
211243
}
212244

213245
/// Whether the cursor is alive.
214246
internal var isAlive: Bool {
247+
return self.lock.withLock {
248+
self._isAlive
249+
}
250+
}
251+
252+
/// Checks whether the cursor is alive. Meant for private use only.
253+
/// This property should only be read while the lock is held.
254+
private var _isAlive: Bool {
255+
if case .cached = self.cached {
256+
return true
257+
}
258+
215259
switch self.state {
216260
case .open:
217261
return true
@@ -224,11 +268,20 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
224268
/// This method is blocking and should only be run via the executor.
225269
internal func next() throws -> Document? {
226270
return try self.lock.withLock {
227-
guard self.isAlive else {
271+
guard self._isAlive else {
228272
throw ClosedCursorError
229273
}
274+
275+
if case let .cached(result) = self.cached.clear() {
276+
// If there are no more results forthcoming after clearing the cache, or the cache had a non-nil
277+
// result in it, return that.
278+
if !self._isAlive || result != nil {
279+
return result
280+
}
281+
}
282+
230283
// Keep trying until either the cursor is killed or a notification has been sent by close
231-
while self.isAlive && !self.isClosing.load() {
284+
while self._isAlive && !self.isClosing.load() {
232285
if let doc = try self.getNextDocument() {
233286
return doc
234287
}
@@ -241,16 +294,31 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
241294
/// This method is blocking and should only be run via the executor.
242295
internal func tryNext() throws -> Document? {
243296
return try self.lock.withLock {
244-
try self.getNextDocument()
297+
if case let .cached(result) = self.cached.clear() {
298+
return result
299+
}
300+
return try self.getNextDocument()
245301
}
246302
}
247303

248304
/// Retreive all the currently available documents in the result set.
249305
/// This will not exhaust the cursor.
250306
/// This method is blocking and should only be run via the executor.
251-
internal func all() throws -> [Document] {
307+
internal func toArray() throws -> [Document] {
252308
return try self.lock.withLock {
309+
guard self._isAlive else {
310+
throw ClosedCursorError
311+
}
312+
253313
var results: [Document] = []
314+
if case let .cached(result) = self.cached.clear(), let unwrappedResult = result {
315+
results.append(unwrappedResult)
316+
}
317+
// the only value left was the cached one
318+
guard self._isAlive else {
319+
return results
320+
}
321+
254322
while let result = try self.getNextDocument() {
255323
results.append(result)
256324
}
@@ -264,6 +332,7 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
264332
internal func kill() {
265333
self.isClosing.store(true)
266334
self.lock.withLock {
335+
self.cached = .none
267336
self.close()
268337
}
269338
self.isClosing.store(false)

Sources/MongoSwift/MongoCollection+Indexes.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ extension MongoCollection {
408408
*/
409409
public func listIndexNames(session _: ClientSession? = nil) -> EventLoopFuture<[String]> {
410410
return self.listIndexes().flatMap { cursor in
411-
cursor.all()
411+
cursor.toArray()
412412
}.flatMapThrowing { models in
413413
try models.map { model in
414414
guard let name = model.options?.name else {

Sources/MongoSwift/MongoCursor.swift

Lines changed: 29 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import NIOConcurrencyHelpers
77
internal struct MongocCursor: MongocCursorWrapper {
88
internal let pointer: OpaquePointer
99

10+
internal static var isLazy: Bool { return true }
11+
1012
internal init(referencing pointer: OpaquePointer) {
1113
self.pointer = pointer
1214
}
@@ -39,24 +41,6 @@ public class MongoCursor<T: Codable>: CursorProtocol {
3941
/// Decoder from the client, database, or collection that created this cursor.
4042
internal let decoder: BSONDecoder
4143

42-
/// Used to store a cached next value to return, if one exists.
43-
private enum CachedDocument {
44-
/// Indicates that the associated value is the next value to return. This value may be nil.
45-
case cached(T?)
46-
/// Indicates that there is no value cached.
47-
case none
48-
49-
/// Get the contents of the cache and clear it.
50-
fileprivate mutating func clear() -> CachedDocument {
51-
let copy = self
52-
self = .none
53-
return copy
54-
}
55-
}
56-
57-
/// Tracks the caching status of this cursor.
58-
private var cached: CachedDocument
59-
6044
/**
6145
* Initializes a new `MongoCursor` instance. Not meant to be instantiated directly by a user. When `forceIO` is
6246
* true, this initializer will force a connection to the server if one is not already established.
@@ -75,25 +59,13 @@ public class MongoCursor<T: Codable>: CursorProtocol {
7559
) throws {
7660
self.client = client
7761
self.decoder = decoder
78-
self.cached = .none
7962

8063
self.wrappedCursor = try Cursor(
8164
mongocCursor: MongocCursor(referencing: cursorPtr),
8265
connection: connection,
8366
session: session,
8467
type: cursorType ?? .nonTailable
8568
)
86-
87-
let next = try self.decode(result: self.wrappedCursor.tryNext())
88-
self.cached = .cached(next)
89-
}
90-
91-
/// Close this cursor
92-
///
93-
/// This method should only be called while the lock is held.
94-
internal func blockingKill() {
95-
self.cached = .none
96-
self.wrappedCursor.kill()
9769
}
9870

9971
/// Asserts that the cursor was closed.
@@ -111,12 +83,7 @@ public class MongoCursor<T: Codable>: CursorProtocol {
11183

11284
/// Decodes the given document to the generic type.
11385
private func decode(doc: Document) throws -> T {
114-
do {
115-
return try self.decoder.decode(T.self, from: doc)
116-
} catch {
117-
self.blockingKill()
118-
throw error
119-
}
86+
return try self.decoder.decode(T.self, from: doc)
12087
}
12188

12289
/**
@@ -130,9 +97,6 @@ public class MongoCursor<T: Codable>: CursorProtocol {
13097
* This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`.
13198
*/
13299
public var isAlive: Bool {
133-
if case .cached = self.cached {
134-
return true
135-
}
136100
return self.wrappedCursor.isAlive
137101
}
138102

@@ -147,37 +111,6 @@ public class MongoCursor<T: Codable>: CursorProtocol {
147111
}
148112
}
149113

150-
/**
151-
* Consolidate the currently available results of the cursor into an array of type `T`.
152-
*
153-
* If this cursor is not tailable, this method will exhaust it.
154-
*
155-
* If this cursor is tailable, `all` will only fetch the currently available results, and it
156-
* may return more data if it is called again while the cursor is still alive.
157-
*
158-
* - Returns:
159-
* An `EventLoopFuture<[T]>` evaluating to the results currently available to this cursor or an error.
160-
*
161-
* If the future evaluates to an error, that error is likely one of the following:
162-
* - `CommandError` if an error occurs while fetching more results from the server.
163-
* - `LogicError` if this function is called after the cursor has died.
164-
* - `LogicError` if this function is called and the session associated with this cursor is inactive.
165-
* - `DecodingError` if an error occurs decoding the server's responses.
166-
*/
167-
internal func all() -> EventLoopFuture<[T]> {
168-
return self.client.operationExecutor.execute {
169-
var results: [T] = []
170-
if case let .cached(result) = self.cached.clear(), let unwrappedResult = result {
171-
results.append(unwrappedResult)
172-
}
173-
// If the cursor still could have more results after clearing the cache, collect them too.
174-
if self.isAlive {
175-
results += try self.wrappedCursor.all().map { try self.decode(doc: $0) }
176-
}
177-
return results
178-
}
179-
}
180-
181114
/**
182115
* Attempt to get the next `T` from the cursor, returning `nil` if there are no results.
183116
*
@@ -200,10 +133,7 @@ public class MongoCursor<T: Codable>: CursorProtocol {
200133
*/
201134
public func tryNext() -> EventLoopFuture<T?> {
202135
return self.client.operationExecutor.execute {
203-
if case let .cached(result) = self.cached.clear() {
204-
return result
205-
}
206-
return try self.decode(result: self.wrappedCursor.tryNext())
136+
try self.decode(result: self.wrappedCursor.tryNext())
207137
}
208138
}
209139

@@ -226,15 +156,30 @@ public class MongoCursor<T: Codable>: CursorProtocol {
226156
*/
227157
public func next() -> EventLoopFuture<T?> {
228158
return self.client.operationExecutor.execute {
229-
if case let .cached(result) = self.cached.clear() {
230-
// If there are no more results forthcoming after clearing the cache, or the cache had a non-nil
231-
// result in it, return that.
232-
if !self.isAlive || result != nil {
233-
return result
234-
}
235-
}
236-
// Otherwise iterate until a result is received.
237-
return try self.decode(result: self.wrappedCursor.next())
159+
try self.decode(result: self.wrappedCursor.next())
160+
}
161+
}
162+
163+
/**
164+
* Consolidate the currently available results of the cursor into an array of type `T`.
165+
*
166+
* If this cursor is not tailable, this method will exhaust it.
167+
*
168+
* If this cursor is tailable, `toArray` will only fetch the currently available results, and it
169+
* may return more data if it is called again while the cursor is still alive.
170+
*
171+
* - Returns:
172+
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error.
173+
*
174+
* If the future evaluates to an error, that error is likely one of the following:
175+
* - `CommandError` if an error occurs while fetching more results from the server.
176+
* - `LogicError` if this function is called after the cursor has died.
177+
* - `LogicError` if this function is called and the session associated with this cursor is inactive.
178+
* - `DecodingError` if an error occurs decoding the server's responses.
179+
*/
180+
public func toArray() -> EventLoopFuture<[T]> {
181+
return self.client.operationExecutor.execute {
182+
try self.wrappedCursor.toArray().map { try self.decode(doc: $0) }
238183
}
239184
}
240185

@@ -249,7 +194,7 @@ public class MongoCursor<T: Codable>: CursorProtocol {
249194
*/
250195
public func kill() -> EventLoopFuture<Void> {
251196
return self.client.operationExecutor.execute {
252-
self.blockingKill()
197+
self.wrappedCursor.kill()
253198
}
254199
}
255200
}

0 commit comments

Comments
 (0)