-
Notifications
You must be signed in to change notification settings - Fork 123
[MBL-19677][S/P/T] Structured Concurrency foundations #3852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
f0c61ea
a179ad8
1595afa
666c444
4abcbb4
6d1c18a
2126f83
7050c72
c2da741
035d971
18a0a74
b2a42b2
5dbb3ae
d481da6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| // | ||
| // This file is part of Canvas. | ||
| // Copyright (C) 2026-present Instructure, Inc. | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Affero General Public License as | ||
| // published by the Free Software Foundation, either version 3 of the | ||
| // License, or (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Affero General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Affero General Public License | ||
| // along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
| // | ||
|
|
||
| import Foundation | ||
| @preconcurrency import CoreData | ||
|
|
||
| public final class AsyncFetchedResults<ResultType: NSFetchRequestResult> { | ||
| private let request: NSFetchRequest<ResultType> | ||
| private let context: NSManagedObjectContext | ||
|
|
||
| public init( | ||
| request: NSFetchRequest<ResultType>, | ||
| context: NSManagedObjectContext | ||
| ) { | ||
| self.request = request | ||
| self.context = context | ||
| } | ||
|
|
||
| public func fetch() async throws -> [ResultType] { | ||
| try await context.fetch(request) | ||
| } | ||
|
|
||
| public func stream() -> AsyncThrowingStream<[ResultType], Error> { | ||
| AsyncThrowingStream { continuation in | ||
| let observer = FetchedResultsObserver( | ||
| request: request, | ||
| context: context, | ||
| continuation: continuation | ||
| ) | ||
|
|
||
| continuation.onTermination = { _ in | ||
| observer.cancel() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private final class FetchedResultsObserver<ResultType: NSFetchRequestResult>: NSObject, NSFetchedResultsControllerDelegate { | ||
| private var controller: NSFetchedResultsController<ResultType>? | ||
| private let continuation: AsyncThrowingStream<[ResultType], Error>.Continuation | ||
| private let context: NSManagedObjectContext | ||
|
|
||
| init( | ||
| request: NSFetchRequest<ResultType>, | ||
| context: NSManagedObjectContext, | ||
| continuation: AsyncThrowingStream<[ResultType], Error>.Continuation | ||
| ) { | ||
| self.continuation = continuation | ||
| self.context = context | ||
| super.init() | ||
|
|
||
| context.perform { [weak self] in | ||
| guard let self else { return } | ||
|
|
||
| self.controller = NSFetchedResultsController( | ||
| fetchRequest: request, | ||
| managedObjectContext: context, | ||
| sectionNameKeyPath: nil, | ||
| cacheName: nil | ||
| ) | ||
| self.controller?.delegate = self | ||
|
|
||
| do { | ||
| try self.controller?.performFetch() | ||
| self.sendElement() | ||
| } catch { | ||
| continuation.finish(throwing: error) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private func sendElement() { | ||
| context.perform { [weak self] in | ||
| guard let self else { return } | ||
| let entities = self.controller?.fetchedObjects ?? [] | ||
| self.continuation.yield(entities) | ||
| } | ||
| } | ||
|
|
||
| func controllerDidChangeContent(_ controller: NSFetchedResultsController<NSFetchRequestResult>) { | ||
| sendElement() | ||
| } | ||
|
|
||
| func cancel() { | ||
| context.perform { [weak self] in | ||
| self?.controller?.delegate = nil | ||
| self?.controller = nil | ||
| self?.continuation.finish() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension NSManagedObjectContext { | ||
| public func fetch<R: NSFetchRequestResult>(_ request: NSFetchRequest<R>) async throws -> [R] { | ||
| try await perform { | ||
| try self.fetch(request) | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| // | ||
| // This file is part of Canvas. | ||
| // Copyright (C) 2026-present Instructure, Inc. | ||
| // | ||
| // This program is free software: you can redistribute it and/or modify | ||
| // it under the terms of the GNU Affero General Public License as | ||
| // published by the Free Software Foundation, either version 3 of the | ||
| // License, or (at your option) any later version. | ||
| // | ||
| // This program is distributed in the hope that it will be useful, | ||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| // GNU Affero General Public License for more details. | ||
| // | ||
| // You should have received a copy of the GNU Affero General Public License | ||
| // along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
| // | ||
|
|
||
| import Foundation | ||
| import CoreData | ||
|
|
||
| public struct AsyncStore<U: UseCase> { | ||
| internal let useCase: U | ||
| private let offlineModeInteractor: OfflineModeInteractor? | ||
| private let context: NSManagedObjectContext | ||
| private let environment: AppEnvironment | ||
| private let request: NSFetchRequest<U.Model> | ||
|
|
||
| public init( | ||
| offlineModeInteractor: OfflineModeInteractor? = OfflineModeAssembly.make(), | ||
| context: NSManagedObjectContext = AppEnvironment.shared.database.viewContext, | ||
| useCase: U, | ||
| environment: AppEnvironment = .shared | ||
| ) { | ||
| self.offlineModeInteractor = offlineModeInteractor | ||
| self.useCase = useCase.modified(for: environment) | ||
| self.context = context | ||
| self.environment = environment | ||
|
|
||
| request = NSFetchRequest<U.Model>(entityName: String(describing: U.Model.self)) | ||
| let scope = useCase.scope | ||
| request.predicate = scope.predicate | ||
| request.sortDescriptors = scope.order | ||
| } | ||
|
|
||
| /// Produces one entity for the given UseCase. | ||
| /// When the device is connected to the internet and there's no valid cache, it makes a request to the API and saves the response to the database. If there's valid cache, it returns it. | ||
| /// By default it downloads all pages, and validates cache unless specificied differently. | ||
| /// When the device is offline, it will read data from Core Data. | ||
| /// - Parameters: | ||
| /// - ignoreCache: Indicates if the request should check the available cache first. | ||
| /// If it's set to **false**, it will validate the cache's expiration and return it if it's still valid. If the cache has expired it will make a request to the API. | ||
| /// If it's set to **true**, it will make a request to the API. | ||
| /// Defaults to **false**. | ||
| /// - loadAllPages: Tells the request if it should load all the pages or just the first one. Defaults to **true**. | ||
| /// - assertOnlyOneEntityFound: Indicates if the request should assert that only one entity is found. Defaults to **true**. | ||
| /// - Returns: The first fetched entity. | ||
| /// - Throws: `AsyncStoreError.noEntityFound` if no entity is found. | ||
| /// - Throws: `AsyncStoreError.moreThanOneEntityFound` if more than one entity is found and `assertOnlyOneEntityFound` is set to true or emitted. | ||
| public func getFirstEntity(ignoreCache: Bool = false, loadAllPages: Bool = true, assertOnlyOneEntityFound: Bool = true) async throws -> U.Model { | ||
| let entities = try await getEntities(ignoreCache: ignoreCache, loadAllPages: loadAllPages) | ||
|
|
||
| if assertOnlyOneEntityFound, entities.count > 1 { throw AsyncStoreError.moreThanOneEntityFound(entities.count) } | ||
| guard let entity = entities.first else { throw AsyncStoreError.noEntityFound } | ||
|
|
||
| return entity | ||
| } | ||
|
|
||
| /// Produces a list of entities for the given UseCase. | ||
| /// When the device is connected to the internet and there's no valid cache, it makes a request to the API and saves the response to the database. If there's valid cache, it returns it. | ||
| /// By default it downloads all pages, and validates cache unless specificied differently. | ||
| /// When the device is offline, it will read data from Core Data. | ||
| /// - Parameters: | ||
| /// - ignoreCache: Indicates if the request should check the available cache first. | ||
| /// If it's set to **false**, it will validate the cache's expiration and return it if it's still valid. If the cache has expired it will make a request to the API. | ||
| /// If it's set to **true**, it will make a request to the API. | ||
| /// Defaults to **false**. | ||
| /// - loadAllPages: Tells the request if it should load all the pages or just the first one. Defaults to **true**. | ||
| /// - Returns: A list of entities. | ||
| public func getEntities(ignoreCache: Bool = false, loadAllPages: Bool = true) async throws -> [U.Model] { | ||
rh12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if offlineModeInteractor?.isOfflineModeEnabled() == true { | ||
| return try await fetchEntitiesFromDatabase() | ||
| } else { | ||
| let hasExpired = await useCase.hasCacheExpired(environment: environment) | ||
|
|
||
| if ignoreCache || hasExpired { | ||
| return try await fetchEntitiesFromAPI(loadAllPages: loadAllPages) | ||
| } else { | ||
| return try await fetchEntitiesFromDatabase() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Produces an async sequence of entities for the given UseCase keeping track of database changes. | ||
| /// When the device is connected to the internet and there's no valid cache, it makes a request to the API and saves the response to the database. If there's valid cache, it returns it. | ||
| /// By default it downloads all pages, and validates cache unless specificied differently. | ||
| /// When the device is offline, it will read data from Core Data. | ||
|
|
||
| /// - Warning: This stream **does not terminate**. Ensure proper cancellation of its consuming task. | ||
| /// - Parameters: | ||
| /// - ignoreCache: Indicates if the request should check the available cache first. | ||
| /// If it's set to **false**, it will validate the cache's expiration and return it if it's still valid. If the cache has expired it will make a request to the API. | ||
| /// If it's set to **true**, it will make a request to the API. | ||
| /// Defaults to **false**. | ||
| /// - loadAllPages: Tells the request if it should load all the pages or just the first one. Defaults to **true**. | ||
| /// - Returns: An async sequence of list of entities. | ||
| public func updates(ignoreCache: Bool = false, loadAllPages: Bool = true) async throws -> AsyncThrowingStream<[U.Model], Error> { | ||
| if offlineModeInteractor?.isOfflineModeEnabled() == true { | ||
| return streamEntitiesFromDatabase() | ||
| } else { | ||
| let hasExpired = await useCase.hasCacheExpired(environment: environment) | ||
|
|
||
| if ignoreCache || hasExpired { | ||
| try await updateEntitiesFromAPI(loadAllPages: loadAllPages) | ||
| } | ||
|
|
||
| return streamEntitiesFromDatabase() | ||
| } | ||
| } | ||
|
|
||
| public func getEntitiesFromDatabase() async throws -> [U.Model] { | ||
| try await fetchEntitiesFromDatabase() | ||
| } | ||
|
|
||
| /// - Warning: This stream **does not terminate**. Ensure proper cancellation of its consuming task. | ||
| public func updatesFromDatabase() -> AsyncThrowingStream<[U.Model], Error> { | ||
| streamEntitiesFromDatabase() | ||
| } | ||
|
|
||
| /// Refreshes the entities by requesting the latest data from the API. | ||
rh12 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public func forceRefresh(loadAllPages: Bool = true) async { | ||
| _ = try? await getEntities(ignoreCache: true, loadAllPages: loadAllPages) | ||
| } | ||
|
|
||
| private func fetchEntitiesFromAPI(getNextUseCase: GetNextUseCase<U>? = nil, loadAllPages: Bool) async throws -> [U.Model] { | ||
| let urlResponse = if let getNextUseCase { | ||
| try await getNextUseCase.fetch(environment: environment) | ||
| } else { | ||
| try await useCase.fetch(environment: environment) | ||
| } | ||
petkybenedek marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| let nextResponse = urlResponse.flatMap { useCase.getNext(from: $0) } | ||
| try await fetchAllPagesIfNeeded(loadAllPages: loadAllPages, nextResponse: nextResponse) | ||
|
|
||
| return try await fetchEntitiesFromDatabase() | ||
| } | ||
|
|
||
| private func updateEntitiesFromAPI(getNextUseCase: GetNextUseCase<U>? = nil, loadAllPages: Bool) async throws { | ||
| let urlResponse = if let getNextUseCase { | ||
| try await getNextUseCase.fetch(environment: environment) | ||
| } else { | ||
| try await useCase.fetch(environment: environment) | ||
| } | ||
petkybenedek marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| let nextResponse = urlResponse.flatMap { useCase.getNext(from: $0) } | ||
| try await fetchAllPagesIfNeeded(loadAllPages: loadAllPages, nextResponse: nextResponse) | ||
| } | ||
|
|
||
| private func fetchAllPagesIfNeeded(loadAllPages: Bool, nextResponse: GetNextRequest<U.Response>?) async throws { | ||
| guard loadAllPages else { return } | ||
| let nextPageUseCase = getNextPage(nextResponse: nextResponse) | ||
|
|
||
| if let nextPageUseCase { | ||
| _ = try await fetchEntitiesFromAPI(getNextUseCase: nextPageUseCase, loadAllPages: true) | ||
| } | ||
| } | ||
|
|
||
| private func getNextPage(nextResponse: GetNextRequest<U.Response>?) -> GetNextUseCase<U>? { | ||
| if let nextResponse { | ||
| GetNextUseCase(parent: useCase, request: nextResponse) | ||
| } else { | ||
| nil | ||
| } | ||
| } | ||
|
|
||
| private func fetchEntitiesFromDatabase() async throws -> [U.Model] { | ||
| try await AsyncFetchedResults(request: request, context: context) | ||
| .fetch() | ||
| } | ||
|
|
||
| private func streamEntitiesFromDatabase() -> AsyncThrowingStream<[U.Model], Error> { | ||
| AsyncFetchedResults(request: request, context: context) | ||
| .stream() | ||
| } | ||
| } | ||
|
|
||
| public enum AsyncStoreError: Error, Equatable { | ||
| case noEntityFound | ||
| case moreThanOneEntityFound(Int) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -134,6 +134,13 @@ public extension UseCase { | |
| } | ||
| } | ||
|
|
||
| /// Cache expiration check used by the `AsyncStore`. | ||
| func hasCacheExpired(environment: AppEnvironment = .shared) async -> Bool { | ||
| await environment.database.performWriteTask { context in | ||
| self.hasExpired(in: context) | ||
| } | ||
| } | ||
|
|
||
| /// Reactive `fetch()`, used by the `ReactiveStore` and directly from other places. | ||
| /// Returns the URLResponse after writing to the database. | ||
| func fetchWithFuture(environment: AppEnvironment = .shared) -> Future<URLResponse?, Error> { | ||
|
|
@@ -149,6 +156,12 @@ public extension UseCase { | |
| } | ||
| } | ||
|
|
||
| /// Async `fetch()`, used by the `AsyncStore` and directly from other places. | ||
| /// Returns the URLResponse after writing to the database. | ||
| func fetch(environment: AppEnvironment = .shared) async throws -> URLResponse? { | ||
| try await fetchWithAPIResponse(environment: environment).1 | ||
| } | ||
|
|
||
| /// Reactive `fetch()` that returns both the API response and URLResponse. | ||
| /// Use this when you need access to the API response data directly. | ||
| /// The response is optional - it will be nil if the API returned no response body. | ||
|
|
@@ -160,6 +173,20 @@ public extension UseCase { | |
| } | ||
| } | ||
|
|
||
| /// Async `fetch()` that returns both the API response and URLResponse. | ||
| /// Use this when you need access to the API response data directly. | ||
| /// The response is optional - it will be nil if the API returned no response body. | ||
| /// Handles task cancellation as it is not possible to propagate it down further. | ||
| func fetchWithAPIResponse(environment: AppEnvironment = .shared) async throws -> (Response?, URLResponse?) { | ||
| try Task.checkCancellation() | ||
|
|
||
| return try await withCheckedThrowingContinuation { continuation in | ||
| self.executeFetch(environment: environment) { result in | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps for later occasion:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we could also propagate down task cancellation to |
||
| continuation.resume(with: result) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Private helper method that executes the fetch and write logic. | ||
| private func executeFetch( | ||
| environment: AppEnvironment, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, here we are treating the empty data (no entities) as failure. Wouldn't that confuse code at call sites? As that would often be treated as a separate case in terms of UI ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other option would be to return nil, which is equivalent of just calling
firstongetEntities. This function is specifically created for cases when we expect an entity.You can catch the specific error if you want to handle that case specifically, or just use
first.But I'm open to discussion, but if we want to return an optional, we can get rid of this function and just use
first.