Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/googleapis_firestore/lib/src/firestore.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ part 'firestore_exception.dart';
part 'firestore_http_client.dart';
part 'geo_point.dart';
part 'path.dart';
part 'query_reader.dart';
part 'reference/aggregate_query.dart';
part 'reference/aggregate_query_snapshot.dart';
part 'reference/collection_reference.dart';
Expand Down
108 changes: 108 additions & 0 deletions packages/googleapis_firestore/lib/src/query_reader.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
part of 'firestore.dart';

/// Response wrapper containing both query results and transaction ID.
class _QueryReaderResponse<T> {
_QueryReaderResponse(this.result, this.transaction);

final QuerySnapshot<T> result;
final String? transaction;
}

/// Reader class for executing queries within transactions.
///
/// Follows the same pattern as [_DocumentReader] to handle:
/// - Lazy transaction initialization via `transactionOptions`
/// - Reusing existing transactions via `transactionId`
/// - Read-only snapshots via `readTime`
/// - Capturing and returning transaction IDs from responses
class _QueryReader<T> {
_QueryReader({
required this.query,
this.transactionId,
this.readTime,
this.transactionOptions,
}) : assert(
[transactionId, readTime, transactionOptions].nonNulls.length <= 1,
'Only transactionId or readTime or transactionOptions must be provided. '
'transactionId = $transactionId, readTime = $readTime, transactionOptions = $transactionOptions',
);

final Query<T> query;
final String? transactionId;
final Timestamp? readTime;
final firestore_v1.TransactionOptions? transactionOptions;

String? _retrievedTransactionId;

/// Executes the query and captures the transaction ID from the response stream.
///
/// Returns a [_QueryReaderResponse] containing both the query results and
/// the transaction ID (if one was started or provided).
Future<_QueryReaderResponse<T>> _get() async {
final request = query._toProto(
transactionId: transactionId,
readTime: readTime,
transactionOptions: transactionOptions,
);

final response = await query.firestore._firestoreClient.v1((
api,
projectId,
) async {
return api.projects.databases.documents.runQuery(
request,
query._buildProtoParentPath(),
);
});

Timestamp? queryReadTime;
final snapshots = <QueryDocumentSnapshot<T>>[];

// Process streaming response
for (final e in response) {
// Capture transaction ID from response (if present)
if (e.transaction?.isNotEmpty ?? false) {
_retrievedTransactionId = e.transaction;
}

final document = e.document;
if (document == null) {
// End of stream marker
queryReadTime = e.readTime.let(Timestamp._fromString);
continue;
}

// Convert proto document to DocumentSnapshot
final snapshot = DocumentSnapshot._fromDocument(
document,
e.readTime,
query.firestore,
);

// Recreate with proper converter
final finalDoc =
_DocumentSnapshotBuilder(
snapshot.ref.withConverter<T>(
fromFirestore: query._queryOptions.converter.fromFirestore,
toFirestore: query._queryOptions.converter.toFirestore,
),
)
..fieldsProto = firestore_v1.MapValue(fields: document.fields)
..readTime = snapshot.readTime
..createTime = snapshot.createTime
..updateTime = snapshot.updateTime;

snapshots.add(finalDoc.build() as QueryDocumentSnapshot<T>);
}

// Return both query results and transaction ID
return _QueryReaderResponse<T>(
QuerySnapshot<T>._(
query: query,
readTime: queryReadTime,
docs: snapshots,
),
_retrievedTransactionId,
);
}
}
17 changes: 15 additions & 2 deletions packages/googleapis_firestore/lib/src/reference/query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,20 @@ base class Query<T> {
firestore_v1.RunQueryRequest _toProto({
required String? transactionId,
required Timestamp? readTime,
firestore_v1.TransactionOptions? transactionOptions,
}) {
if (readTime != null && transactionId != null) {
throw ArgumentError('readTime and transactionId cannot both be set.');
// Validate mutual exclusivity of transaction parameters
final providedParams = [
transactionId,
readTime,
transactionOptions,
].nonNulls.length;

if (providedParams > 1) {
throw ArgumentError(
'Only one of transactionId, readTime, or transactionOptions can be specified. '
'Got: transactionId=$transactionId, readTime=$readTime, transactionOptions=$transactionOptions',
);
}

final structuredQuery = _toStructuredQuery();
Expand Down Expand Up @@ -482,6 +493,8 @@ base class Query<T> {
runQueryRequest.transaction = transactionId;
} else if (readTime != null) {
runQueryRequest.readTime = readTime._toProto().timestampValue;
} else if (transactionOptions != null) {
runQueryRequest.newTransaction = transactionOptions;
}

return runQueryRequest;
Expand Down
60 changes: 58 additions & 2 deletions packages/googleapis_firestore/lib/src/transaction.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class Transaction {
Future<String>? _transactionIdPromise;
String? _prevTransactionId;

// TODO support Query as parameter for [get]

/// Retrieves a single document from the database. Holds a pessimistic lock on
/// the returned document.
///
Expand All @@ -98,6 +96,43 @@ class Transaction {
>(docRef, resultFn: _getSingleFn);
}

/// Executes a query and returns the results. Holds a pessimistic lock on
/// all documents in the result set.
///
/// - [query]: The query to execute.
///
/// Returns a [QuerySnapshot] containing the query results.
///
/// All documents matched by the query will be locked for the duration of
/// the transaction. The query is executed at a consistent snapshot, ensuring
/// that all reads see the same data.
///
/// ```dart
/// firestore.runTransaction((transaction) async {
/// final query = firestore.collection('users')
/// .where('active', WhereFilter.equal, true)
/// .limit(100);
///
/// final snapshot = await transaction.getQuery(query);
///
/// for (final doc in snapshot.docs) {
/// transaction.update(doc.ref, {'processed': true});
/// }
/// });
/// ```
Future<QuerySnapshot<T>> getQuery<T>(Query<T> query) async {
if (_writeBatch != null && _writeBatch._operations.isNotEmpty) {
throw FirestoreException(
FirestoreClientErrorCode.failedPrecondition,
readAfterWriteErrorMsg,
);
}
return _withLazyStartedTransaction<Query<T>, QuerySnapshot<T>>(
query,
resultFn: _getQueryFn,
);
}

/// Retrieve multiple documents from the database by the provided
/// [documentsRefs]. Holds a pessimistic lock on all returned documents.
/// If any of the documents do not exist, the operation throws a
Expand Down Expand Up @@ -387,6 +422,27 @@ class Transaction {
);
}

Future<_TransactionResult<QuerySnapshot<T>>> _getQueryFn<T>(
Query<T> query, {
String? transactionId,
Timestamp? readTime,
firestore_v1.TransactionOptions? transactionOptions,
List<FieldPath>? fieldMask,
}) async {
final reader = _QueryReader(
query: query,
transactionId: transactionId,
readTime: readTime,
transactionOptions: transactionOptions,
);

final result = await reader._get();
return _TransactionResult(
transaction: result.transaction,
result: result.result,
);
}

Future<T> _runTransaction<T>(TransactionHandler<T> updateFunction) async {
// No backoff is set for readonly transactions (i.e. attempts == 1)
if (_writeBatch == null) {
Expand Down
Loading