Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 45 additions & 55 deletions src/YesSql.Core/Services/DefaultQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using YesSql.Data;
Expand Down Expand Up @@ -1316,26 +1317,23 @@ protected async Task<T> FirstOrDefaultImpl(CancellationToken cancellationToken =
}
}

Task<IEnumerable<T>> IQuery<T>.ListAsync(CancellationToken cancellationToken)
async Task<IEnumerable<T>> IQuery<T>.ListAsync(CancellationToken cancellationToken)
{
return ListImpl(cancellationToken);
}

#pragma warning disable CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed
async IAsyncEnumerable<T> IQuery<T>.ToAsyncEnumerable(CancellationToken cancellationToken)
#pragma warning restore CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed
{
// TODO: [IAsyncEnumerable] Once Dapper supports IAsyncEnumerable we can replace this call by a non-buffered one
foreach (var item in await ListImpl(cancellationToken))
var results = new List<T>();
await foreach (var item in ListImpl(cancellationToken))
{
yield return item;
results.Add(item);
}
return results;
}

internal async Task<IEnumerable<T>> ListImpl(CancellationToken cancellationToken)
IAsyncEnumerable<T> IQuery<T>.ToAsyncEnumerable(CancellationToken cancellationToken)
{
// TODO: [IAsyncEnumerable] Once Dapper supports IAsyncEnumerable we can return it by default, and buffer it in ListAsync instead
return ListImpl(cancellationToken);
}

internal async IAsyncEnumerable<T> ListImpl([EnumeratorCancellation] CancellationToken cancellationToken)
{
// Flush any pending changes before doing a query (auto-flush)
await _query._session.FlushAsync(cancellationToken);

Expand Down Expand Up @@ -1369,19 +1367,17 @@ internal async Task<IEnumerable<T>> ListImpl(CancellationToken cancellationToken
}

var sql = sqlBuilder.ToSqlString();
var key = new WorkerQueryKey(sql, _query._queryState._sqlBuilder.Parameters);
var logger = _query._session._store.Configuration.Logger;

return await _query._session._store.ProduceAsync(key, static (key, state) =>
if (logger.IsEnabled(LogLevel.Debug))
{
var logger = state.Query._session._store.Configuration.Logger;

if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug(state.Sql);
}
logger.LogDebug(sql);
}

return state.Connection.QueryAsync<T>(new CommandDefinition(state.Sql, state.Query._queryState._sqlBuilder.Parameters, state.Transaction, flags: CommandFlags.Buffered, cancellationToken: state.CancellationToken));
}, new { Query = _query, Sql = sql, Connection = connection, Transaction = transaction, CancellationToken = cancellationToken });
await foreach (var item in connection.QueryUnbufferedAsync<T>(sql, _query._queryState._sqlBuilder.Parameters, transaction).WithCancellation(cancellationToken))
{
yield return item;
}
}
else
{
Expand All @@ -1398,37 +1394,32 @@ internal async Task<IEnumerable<T>> ListImpl(CancellationToken cancellationToken
// TODO: This could potentially be detected automically, for instance by creating a MultiMapIndex, but might require breaking changes

var sql = _query._queryState._deduplicate ? GetDeduplicatedQuery() : sqlBuilder.ToSqlString();
var logger = _query._session._store.Configuration.Logger;

var key = new WorkerQueryKey(sql, sqlBuilder.Parameters);

var documents = await _query._session._store.ProduceAsync(key, static (key, state) =>
if (logger.IsEnabled(LogLevel.Debug))
{
var logger = state.Query._session._store.Configuration.Logger;

if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug(state.Sql);
}
logger.LogDebug(sql);
}

return state.Connection.QueryAsync<Document>(new CommandDefinition(state.Sql, state.Query._queryState._sqlBuilder.Parameters, state.Transaction, flags: CommandFlags.Buffered, cancellationToken: state.CancellationToken));
}, new { Query = _query, Sql = sql, Connection = connection, Transaction = transaction, CancellationToken = cancellationToken });
var documents = new List<Document>();

await foreach (var document in connection.QueryUnbufferedAsync<Document>(sql, _query._queryState._sqlBuilder.Parameters, transaction).WithCancellation(cancellationToken))
{
documents.Add(document);
}

if (!documents.Any())
if (documents.Count == 0)
{
return [];
yield break;
}

// Clone documents returned from ProduceAsync as they might be shared across sessions
return _query._session.Get<T>(documents.Select(x => x.Clone()), _query._collection).ToArray();
// Clone documents as they might be shared across sessions
foreach (var item in _query._session.Get<T>(documents.Select(x => x.Clone()), _query._collection))
{
yield return item;
}
}
}
catch
{
// Don't use CancelAsync as we don't want to trigger a thread safety check, it's done in the finally block
await _query._session.CancelAsyncInternal();

throw;
}
finally
{
_query._session.ExitAsyncExecution();
Expand Down Expand Up @@ -1631,20 +1622,19 @@ Task<T> IQueryIndex<T>.FirstOrDefaultAsync(CancellationToken cancellationToken)
return FirstOrDefaultImpl(cancellationToken);
}

Task<IEnumerable<T>> IQueryIndex<T>.ListAsync(CancellationToken cancellationToken)
async Task<IEnumerable<T>> IQueryIndex<T>.ListAsync(CancellationToken cancellationToken)
{
return ListImpl(cancellationToken);
var results = new List<T>();
await foreach (var item in ListImpl(cancellationToken))
{
results.Add(item);
}
return results;
}

#pragma warning disable CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed
async IAsyncEnumerable<T> IQueryIndex<T>.ToAsyncEnumerable(CancellationToken cancellationToken)
#pragma warning restore CS8425 // Async-iterator member has one or more parameters of type 'CancellationToken' but none of them is decorated with the 'EnumeratorCancellation' attribute, so the cancellation token parameter from the generated 'IAsyncEnumerable<>.GetAsyncEnumerator' will be unconsumed
IAsyncEnumerable<T> IQueryIndex<T>.ToAsyncEnumerable(CancellationToken cancellationToken)
{
// TODO: [IAsyncEnumerable] Once Dapper supports IAsyncEnumerable we can replace this call by a non-buffered one
foreach (var item in await ListImpl(cancellationToken))
{
yield return item;
}
return ListImpl(cancellationToken);
}

IQueryIndex<T> IQueryIndex<T>.Skip(int count)
Expand Down
Loading