Skip to content

Commit 3d8ccfa

Browse files
authored
Merge pull request #50 from rryam/fix/throttled-concurrent-file-operations
Add throttled concurrency for file operations
2 parents 29f7bd0 + 609c625 commit 3d8ccfa

File tree

4 files changed

+287
-35
lines changed

4 files changed

+287
-35
lines changed

Sources/VecturaKit/Core/VecturaKit.swift

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,17 +141,8 @@ public actor VecturaKit {
141141
documentIds.append(docId)
142142
}
143143

144-
// Save documents to storage (storage handles caching internally)
145-
let storage = self.storageProvider
146-
try await withThrowingTaskGroup(of: Void.self) { group in
147-
for doc in documentsToSave {
148-
group.addTask {
149-
try await storage.saveDocument(doc)
150-
}
151-
}
152-
153-
try await group.waitForAll()
154-
}
144+
// Save documents to storage (storage provider handles batch concurrency)
145+
try await storageProvider.saveDocuments(documentsToSave)
155146

156147
// Notify search engine to index documents
157148
for doc in documentsToSave {

Sources/VecturaKit/Storage/FileStorageProvider.swift

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,63 @@ extension FileStorageProvider: VecturaStorage {
9999
cache[document.id] = document
100100
}
101101
}
102+
103+
/// Saves multiple documents with throttled concurrent file writes.
104+
///
105+
/// Uses nonisolated file I/O to achieve true parallelism, then updates
106+
/// the cache after all writes complete.
107+
public func saveDocuments(_ documents: [VecturaDocument]) async throws {
108+
let directory = storageDirectory
109+
110+
// Perform concurrent file writes outside actor isolation
111+
try await documents.concurrentForEach(maxConcurrency: Self.maxConcurrentFileOperations) { document in
112+
try Self.writeDocumentToFile(document, in: directory)
113+
}
114+
115+
// Update cache after all writes succeed
116+
if cacheEnabled {
117+
for document in documents {
118+
cache[document.id] = document
119+
}
120+
}
121+
}
122+
123+
/// Writes a document to disk without actor isolation.
124+
///
125+
/// This allows concurrent file I/O when called from multiple tasks.
126+
nonisolated private static func writeDocumentToFile(
127+
_ document: VecturaDocument,
128+
in directory: URL
129+
) throws {
130+
let encoder = JSONEncoder()
131+
encoder.outputFormatting = .prettyPrinted
132+
let data = try encoder.encode(document)
133+
let documentURL = directory.appendingPathComponent("\(document.id).json")
134+
135+
#if os(iOS) || os(tvOS) || os(watchOS) || os(visionOS)
136+
try data.write(to: documentURL, options: [.atomic, .completeFileProtection])
137+
#else
138+
try data.write(to: documentURL, options: .atomic)
139+
#endif
140+
141+
try FileManager.default.setAttributes(
142+
[.posixPermissions: 0o600],
143+
ofItemAtPath: documentURL.path(percentEncoded: false)
144+
)
145+
}
102146
}
103147

104148
// MARK: - CachableVecturaStorage
105149

106150
extension FileStorageProvider: CachableVecturaStorage {
151+
152+
/// Maximum number of concurrent file operations to prevent resource exhaustion.
153+
private static let maxConcurrentFileOperations = 50
154+
107155
/// Loads all documents from disk by reading JSON files (bypasses cache).
156+
///
157+
/// Uses throttled concurrency to prevent file descriptor exhaustion
158+
/// when loading large numbers of documents.
108159
public func loadDocumentsFromStorage() async throws -> [VecturaDocument] {
109160
let fileURLs = try FileManager.default.contentsOfDirectory(
110161
at: storageDirectory,
@@ -113,31 +164,17 @@ extension FileStorageProvider: CachableVecturaStorage {
113164

114165
let jsonFileURLs = fileURLs.filter { $0.pathExtension.lowercased() == "json" }
115166

116-
return await withTaskGroup(of: VecturaDocument?.self, returning: [VecturaDocument].self) { group in
117-
for fileURL in jsonFileURLs {
118-
group.addTask {
119-
do {
120-
let data = try Data(contentsOf: fileURL)
121-
let decoder = JSONDecoder()
122-
return try decoder.decode(VecturaDocument.self, from: data)
123-
} catch {
124-
let path = fileURL.path(percentEncoded: false)
125-
Self.logger.warning(
126-
"Failed to load document at \(path): \(error.localizedDescription)"
127-
)
128-
return nil
129-
}
130-
}
131-
}
132-
133-
var documents: [VecturaDocument] = []
134-
documents.reserveCapacity(jsonFileURLs.count)
135-
for await document in group {
136-
if let document {
137-
documents.append(document)
138-
}
167+
return await jsonFileURLs.concurrentMap(maxConcurrency: Self.maxConcurrentFileOperations) { fileURL in
168+
do {
169+
let data = try Data(contentsOf: fileURL)
170+
return try JSONDecoder().decode(VecturaDocument.self, from: data)
171+
} catch {
172+
let path = fileURL.path(percentEncoded: false)
173+
Self.logger.warning(
174+
"Failed to load document at \(path): \(error.localizedDescription)"
175+
)
176+
return nil
139177
}
140-
return documents
141178
}
142179
}
143180

Sources/VecturaKit/Storage/VecturaStorage.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ public protocol VecturaStorage: Sendable {
3333
///
3434
/// - Returns: The total document count.
3535
func getTotalDocumentCount() async throws -> Int
36+
37+
/// Saves multiple documents in batch.
38+
///
39+
/// Storage providers can override this for optimized batch operations.
40+
/// The default implementation calls saveDocument sequentially.
41+
///
42+
/// - Parameter documents: The documents to save.
43+
func saveDocuments(_ documents: [VecturaDocument]) async throws
3644
}
3745

3846
// MARK: - Default Implementation
@@ -45,4 +53,13 @@ extension VecturaStorage {
4553
public func getTotalDocumentCount() async throws -> Int {
4654
return try await loadDocuments().count
4755
}
56+
57+
/// Default implementation that saves documents sequentially.
58+
///
59+
/// Storage implementations can override this for concurrent I/O.
60+
public func saveDocuments(_ documents: [VecturaDocument]) async throws {
61+
for document in documents {
62+
try await saveDocument(document)
63+
}
64+
}
4865
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
import Foundation
2+
3+
// MARK: - Concurrent Collection Processing
4+
5+
extension Sequence where Element: Sendable {
6+
7+
/// Concurrently maps elements with controlled parallelism using a sliding window pattern.
8+
///
9+
/// This method processes elements concurrently while limiting the number of simultaneous
10+
/// operations to prevent resource exhaustion (file handles, memory, etc.).
11+
///
12+
/// - Parameters:
13+
/// - maxConcurrency: Maximum number of concurrent operations (default: 50)
14+
/// - transform: Async throwing closure that transforms each element. Return `nil` to skip.
15+
/// - Returns: Array of non-nil transformed results
16+
/// - Throws: Rethrows any error from the transform closure
17+
///
18+
/// ## Example
19+
///
20+
/// ```swift
21+
/// let urls: [URL] = getFileURLs()
22+
/// let documents = try await urls.concurrentMap(maxConcurrency: 50) { url in
23+
/// try? JSONDecoder().decode(Document.self, from: Data(contentsOf: url))
24+
/// }
25+
/// ```
26+
///
27+
/// ## Performance Characteristics
28+
///
29+
/// - Maintains exactly `maxConcurrency` tasks running at any time
30+
/// - Uses structured concurrency with no semaphores or locks
31+
/// - Memory efficient: processes results as they complete
32+
@inlinable
33+
public func concurrentMap<T: Sendable>(
34+
maxConcurrency: Int = 50,
35+
_ transform: @Sendable @escaping (Element) async throws -> T?
36+
) async rethrows -> [T] {
37+
try await withThrowingTaskGroup(of: T?.self) { group in
38+
var results: [T] = []
39+
var iterator = makeIterator()
40+
41+
// Seed initial batch of tasks up to maxConcurrency
42+
var activeCount = 0
43+
while activeCount < maxConcurrency, let element = iterator.next() {
44+
group.addTask { try await transform(element) }
45+
activeCount += 1
46+
}
47+
48+
// As each task completes, add the next one (sliding window)
49+
while let result = try await group.next() {
50+
if let value = result {
51+
results.append(value)
52+
}
53+
54+
// Add next task if elements remain
55+
if let element = iterator.next() {
56+
group.addTask { try await transform(element) }
57+
}
58+
}
59+
60+
return results
61+
}
62+
}
63+
64+
/// Concurrently maps elements with controlled parallelism (non-throwing version).
65+
///
66+
/// - Parameters:
67+
/// - maxConcurrency: Maximum number of concurrent operations (default: 50)
68+
/// - transform: Async closure that transforms each element. Return `nil` to skip.
69+
/// - Returns: Array of non-nil transformed results
70+
@inlinable
71+
public func concurrentMap<T: Sendable>(
72+
maxConcurrency: Int = 50,
73+
_ transform: @Sendable @escaping (Element) async -> T?
74+
) async -> [T] {
75+
await withTaskGroup(of: T?.self) { group in
76+
var results: [T] = []
77+
var iterator = makeIterator()
78+
79+
// Seed initial batch
80+
var activeCount = 0
81+
while activeCount < maxConcurrency, let element = iterator.next() {
82+
group.addTask { await transform(element) }
83+
activeCount += 1
84+
}
85+
86+
// Sliding window: add new task as each completes
87+
for await result in group {
88+
if let value = result {
89+
results.append(value)
90+
}
91+
92+
if let element = iterator.next() {
93+
group.addTask { await transform(element) }
94+
}
95+
}
96+
97+
return results
98+
}
99+
}
100+
101+
/// Concurrently executes a side-effect closure on each element with controlled parallelism.
102+
///
103+
/// Use this when you need to perform async operations for their side effects
104+
/// (e.g., saving files, network requests) without collecting results.
105+
///
106+
/// - Parameters:
107+
/// - maxConcurrency: Maximum number of concurrent operations (default: 50)
108+
/// - body: Async throwing closure to execute for each element
109+
/// - Throws: Rethrows the first error encountered
110+
///
111+
/// ## Example
112+
///
113+
/// ```swift
114+
/// try await documents.concurrentForEach(maxConcurrency: 20) { doc in
115+
/// try await storage.saveDocument(doc)
116+
/// }
117+
/// ```
118+
@inlinable
119+
public func concurrentForEach(
120+
maxConcurrency: Int = 50,
121+
_ body: @Sendable @escaping (Element) async throws -> Void
122+
) async throws {
123+
try await withThrowingTaskGroup(of: Void.self) { group in
124+
var iterator = makeIterator()
125+
126+
// Seed initial batch
127+
var activeCount = 0
128+
while activeCount < maxConcurrency, let element = iterator.next() {
129+
group.addTask { try await body(element) }
130+
activeCount += 1
131+
}
132+
133+
// Sliding window
134+
while try await group.next() != nil {
135+
if let element = iterator.next() {
136+
group.addTask { try await body(element) }
137+
}
138+
}
139+
}
140+
}
141+
142+
/// Concurrently maps elements while preserving the original order.
143+
///
144+
/// Unlike `concurrentMap`, this method guarantees that output order matches input order.
145+
/// Slightly higher memory overhead due to tracking indices.
146+
///
147+
/// - Parameters:
148+
/// - maxConcurrency: Maximum number of concurrent operations (default: 50)
149+
/// - transform: Async throwing closure that transforms each element
150+
/// - Returns: Array of transformed results in the same order as input
151+
/// - Throws: Rethrows any error from the transform closure
152+
///
153+
/// ## Example
154+
///
155+
/// ```swift
156+
/// let urls = [url1, url2, url3]
157+
/// let data = try await urls.orderedConcurrentMap(maxConcurrency: 10) { url in
158+
/// try await fetchData(from: url)
159+
/// }
160+
/// // data[0] corresponds to url1, data[1] to url2, etc.
161+
/// ```
162+
@inlinable
163+
public func orderedConcurrentMap<T: Sendable>(
164+
maxConcurrency: Int = 50,
165+
_ transform: @Sendable @escaping (Element) async throws -> T
166+
) async rethrows -> [T] {
167+
let indexed = Array(self.enumerated())
168+
169+
let results = try await indexed.concurrentMap(maxConcurrency: maxConcurrency) { item -> (Int, T)? in
170+
let result = try await transform(item.element)
171+
return (item.offset, result)
172+
}
173+
174+
// Sort by original index and extract values
175+
return results
176+
.sorted { $0.0 < $1.0 }
177+
.map(\.1)
178+
}
179+
}
180+
181+
// MARK: - Collection Chunking
182+
183+
extension Collection {
184+
185+
/// Splits the collection into chunks of the specified size.
186+
///
187+
/// - Parameter size: Maximum size of each chunk
188+
/// - Returns: Array of array slices, each containing up to `size` elements
189+
///
190+
/// ## Example
191+
///
192+
/// ```swift
193+
/// let items = [1, 2, 3, 4, 5, 6, 7]
194+
/// let chunks = items.chunked(into: 3)
195+
/// // [[1, 2, 3], [4, 5, 6], [7]]
196+
/// ```
197+
@inlinable
198+
public func chunked(into size: Int) -> [[Element]] {
199+
guard size > 0 else { return [] }
200+
201+
return stride(from: 0, to: count, by: size).map { startOffset in
202+
let start = index(startIndex, offsetBy: startOffset)
203+
let end = index(start, offsetBy: size, limitedBy: endIndex) ?? endIndex
204+
return Array(self[start..<end])
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)