diff --git a/src/YesSql.Core/Services/DefaultQuery.cs b/src/YesSql.Core/Services/DefaultQuery.cs index acdac79c..8db368b9 100644 --- a/src/YesSql.Core/Services/DefaultQuery.cs +++ b/src/YesSql.Core/Services/DefaultQuery.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Linq.Expressions; using System.Reflection; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using YesSql.Data; @@ -1316,26 +1317,23 @@ protected async Task FirstOrDefaultImpl(CancellationToken cancellationToken = } } - Task> IQuery.ListAsync(CancellationToken cancellationToken) + async Task> IQuery.ListAsync(CancellationToken cancellationToken) { - return ListImpl(cancellationToken); - } - -#pragma warning disable CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed - async IAsyncEnumerable IQuery.ToAsyncEnumerable(CancellationToken cancellationToken) -#pragma warning restore CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed - { - // TODO: [IAsyncEnumerable] Once Dapper supports IAsyncEnumerable we can replace this call by a non-buffered one - foreach (var item in await ListImpl(cancellationToken)) + var results = new List(); + await foreach (var item in ListImpl(cancellationToken)) { - yield return item; + results.Add(item); } + return results; } - internal async Task> ListImpl(CancellationToken cancellationToken) + IAsyncEnumerable IQuery.ToAsyncEnumerable(CancellationToken cancellationToken) { - // TODO: [IAsyncEnumerable] Once Dapper supports IAsyncEnumerable we can return it by default, and buffer it in ListAsync instead + return ListImpl(cancellationToken); + } + internal async IAsyncEnumerable ListImpl([EnumeratorCancellation] CancellationToken cancellationToken) + { // Flush any pending changes before doing a query (auto-flush) await _query._session.FlushAsync(cancellationToken); @@ -1369,19 +1367,17 @@ internal async Task> ListImpl(CancellationToken cancellationToken } var sql = sqlBuilder.ToSqlString(); - var key = new WorkerQueryKey(sql, _query._queryState._sqlBuilder.Parameters); + var logger = _query._session._store.Configuration.Logger; - return await _query._session._store.ProduceAsync(key, static (key, state) => + if (logger.IsEnabled(LogLevel.Debug)) { - var logger = state.Query._session._store.Configuration.Logger; - - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug(state.Sql); - } + logger.LogDebug(sql); + } - return state.Connection.QueryAsync(new CommandDefinition(state.Sql, state.Query._queryState._sqlBuilder.Parameters, state.Transaction, flags: CommandFlags.Buffered, cancellationToken: state.CancellationToken)); - }, new { Query = _query, Sql = sql, Connection = connection, Transaction = transaction, CancellationToken = cancellationToken }); + await foreach (var item in connection.QueryUnbufferedAsync(sql, _query._queryState._sqlBuilder.Parameters, transaction).WithCancellation(cancellationToken)) + { + yield return item; + } } else { @@ -1398,37 +1394,32 @@ internal async Task> ListImpl(CancellationToken cancellationToken // TODO: This could potentially be detected automically, for instance by creating a MultiMapIndex, but might require breaking changes var sql = _query._queryState._deduplicate ? GetDeduplicatedQuery() : sqlBuilder.ToSqlString(); + var logger = _query._session._store.Configuration.Logger; - var key = new WorkerQueryKey(sql, sqlBuilder.Parameters); - - var documents = await _query._session._store.ProduceAsync(key, static (key, state) => + if (logger.IsEnabled(LogLevel.Debug)) { - var logger = state.Query._session._store.Configuration.Logger; - - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug(state.Sql); - } + logger.LogDebug(sql); + } - return state.Connection.QueryAsync(new CommandDefinition(state.Sql, state.Query._queryState._sqlBuilder.Parameters, state.Transaction, flags: CommandFlags.Buffered, cancellationToken: state.CancellationToken)); - }, new { Query = _query, Sql = sql, Connection = connection, Transaction = transaction, CancellationToken = cancellationToken }); + var documents = new List(); + + await foreach (var document in connection.QueryUnbufferedAsync(sql, _query._queryState._sqlBuilder.Parameters, transaction).WithCancellation(cancellationToken)) + { + documents.Add(document); + } - if (!documents.Any()) + if (documents.Count == 0) { - return []; + yield break; } - // Clone documents returned from ProduceAsync as they might be shared across sessions - return _query._session.Get(documents.Select(x => x.Clone()), _query._collection).ToArray(); + // Clone documents as they might be shared across sessions + foreach (var item in _query._session.Get(documents.Select(x => x.Clone()), _query._collection)) + { + yield return item; + } } } - catch - { - // Don't use CancelAsync as we don't want to trigger a thread safety check, it's done in the finally block - await _query._session.CancelAsyncInternal(); - - throw; - } finally { _query._session.ExitAsyncExecution(); @@ -1631,20 +1622,19 @@ Task IQueryIndex.FirstOrDefaultAsync(CancellationToken cancellationToken) return FirstOrDefaultImpl(cancellationToken); } - Task> IQueryIndex.ListAsync(CancellationToken cancellationToken) + async Task> IQueryIndex.ListAsync(CancellationToken cancellationToken) { - return ListImpl(cancellationToken); + var results = new List(); + await foreach (var item in ListImpl(cancellationToken)) + { + results.Add(item); + } + return results; } -#pragma warning disable CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed - async IAsyncEnumerable IQueryIndex.ToAsyncEnumerable(CancellationToken cancellationToken) -#pragma warning restore CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed + IAsyncEnumerable IQueryIndex.ToAsyncEnumerable(CancellationToken cancellationToken) { - // TODO: [IAsyncEnumerable] Once Dapper supports IAsyncEnumerable we can replace this call by a non-buffered one - foreach (var item in await ListImpl(cancellationToken)) - { - yield return item; - } + return ListImpl(cancellationToken); } IQueryIndex IQueryIndex.Skip(int count)