Skip to content

Commit 32c3add

Browse files
committed
use threadlocal instead of synchronization
1 parent 4f45dbd commit 32c3add

File tree

4 files changed

+87
-71
lines changed

4 files changed

+87
-71
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteDocumentOverlayCache.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,26 @@ public Overlay getOverlay(DocumentKey key) {
6161
@Override
6262
public Map<DocumentKey, Overlay> getOverlays(SortedSet<DocumentKey> keys) {
6363
hardAssert(keys.comparator() == null, "getOverlays() requires natural order");
64-
Map<DocumentKey, Overlay> result = new HashMap<>();
6564

66-
BackgroundQueue backgroundQueue = new BackgroundQueue();
65+
BackgroundQueue<DocumentKey, Overlay> backgroundQueue = new BackgroundQueue<>();
6766
ResourcePath currentCollection = ResourcePath.EMPTY;
6867
List<Object> accumulatedDocumentIds = new ArrayList<>();
6968
for (DocumentKey key : keys) {
7069
if (!currentCollection.equals(key.getCollectionPath())) {
71-
processSingleCollection(result, backgroundQueue, currentCollection, accumulatedDocumentIds);
70+
processSingleCollection(backgroundQueue, currentCollection, accumulatedDocumentIds);
7271
currentCollection = key.getCollectionPath();
7372
accumulatedDocumentIds.clear();
7473
}
7574
accumulatedDocumentIds.add(key.getDocumentId());
7675
}
7776

78-
processSingleCollection(result, backgroundQueue, currentCollection, accumulatedDocumentIds);
79-
backgroundQueue.drain();
80-
return result;
77+
processSingleCollection(backgroundQueue, currentCollection, accumulatedDocumentIds);
78+
return backgroundQueue.drain();
8179
}
8280

8381
/** Reads the overlays for the documents in a single collection. */
8482
private void processSingleCollection(
85-
Map<DocumentKey, Overlay> result,
86-
BackgroundQueue backgroundQueue,
83+
BackgroundQueue<DocumentKey, Overlay> backgroundQueue,
8784
ResourcePath collectionPath,
8885
List<Object> documentIds) {
8986
if (documentIds.isEmpty()) {
@@ -101,7 +98,7 @@ private void processSingleCollection(
10198
while (longQuery.hasMoreSubqueries()) {
10299
longQuery
103100
.performNextSubquery()
104-
.forEach(row -> processOverlaysInBackground(backgroundQueue, result, row));
101+
.forEach(row -> processOverlaysInBackground(backgroundQueue, row));
105102
}
106103
}
107104

@@ -138,26 +135,23 @@ public void removeOverlaysForBatchId(int batchId) {
138135

139136
@Override
140137
public Map<DocumentKey, Overlay> getOverlays(ResourcePath collection, int sinceBatchId) {
141-
Map<DocumentKey, Overlay> result = new HashMap<>();
142-
BackgroundQueue backgroundQueue = new BackgroundQueue();
138+
BackgroundQueue<DocumentKey, Overlay> backgroundQueue = new BackgroundQueue<>();
143139
db.query(
144140
"SELECT overlay_mutation, largest_batch_id FROM document_overlays "
145141
+ "WHERE uid = ? AND collection_path = ? AND largest_batch_id > ?")
146142
.binding(uid, EncodedPath.encode(collection), sinceBatchId)
147-
.forEach(row -> processOverlaysInBackground(backgroundQueue, result, row));
148-
backgroundQueue.drain();
149-
return result;
143+
.forEach(row -> processOverlaysInBackground(backgroundQueue, row));
144+
return backgroundQueue.drain();
150145
}
151146

152147
@Override
153148
public Map<DocumentKey, Overlay> getOverlays(
154149
String collectionGroup, int sinceBatchId, int count) {
155-
Map<DocumentKey, Overlay> result = new HashMap<>();
156150
String[] lastCollectionPath = new String[1];
157151
String[] lastDocumentPath = new String[1];
158152
int[] lastLargestBatchId = new int[1];
159153

160-
BackgroundQueue backgroundQueue = new BackgroundQueue();
154+
BackgroundQueue<DocumentKey, Overlay> backgroundQueue1 = new BackgroundQueue<>();
161155
db.query(
162156
"SELECT overlay_mutation, largest_batch_id, collection_path, document_id "
163157
+ " FROM document_overlays "
@@ -169,17 +163,19 @@ public Map<DocumentKey, Overlay> getOverlays(
169163
lastLargestBatchId[0] = row.getInt(1);
170164
lastCollectionPath[0] = row.getString(2);
171165
lastDocumentPath[0] = row.getString(3);
172-
processOverlaysInBackground(backgroundQueue, result, row);
166+
processOverlaysInBackground(backgroundQueue1, row);
173167
});
174168

169+
HashMap<DocumentKey, Overlay> results = backgroundQueue1.drain();
175170
if (lastCollectionPath[0] == null) {
176-
return result;
171+
return results;
177172
}
178173

179174
// This function should not return partial batch overlays, even if the number of overlays in the
180175
// result set exceeds the given `count` argument. Since the `LIMIT` in the above query might
181176
// result in a partial batch, the following query appends any remaining overlays for the last
182177
// batch.
178+
BackgroundQueue<DocumentKey, Overlay> backgroundQueue2 = new BackgroundQueue<>();
183179
db.query(
184180
"SELECT overlay_mutation, largest_batch_id FROM document_overlays "
185181
+ "WHERE uid = ? AND collection_group = ? "
@@ -192,22 +188,21 @@ public Map<DocumentKey, Overlay> getOverlays(
192188
lastCollectionPath[0],
193189
lastDocumentPath[0],
194190
lastLargestBatchId[0])
195-
.forEach(row -> processOverlaysInBackground(backgroundQueue, result, row));
196-
backgroundQueue.drain();
197-
return result;
191+
.forEach(row -> processOverlaysInBackground(backgroundQueue2, row));
192+
193+
backgroundQueue2.drainInto(results);
194+
return results;
198195
}
199196

200197
private void processOverlaysInBackground(
201-
BackgroundQueue backgroundQueue, Map<DocumentKey, Overlay> results, Cursor row) {
198+
BackgroundQueue<DocumentKey, Overlay> backgroundQueue, Cursor row) {
202199
byte[] rawMutation = row.getBlob(0);
203200
int largestBatchId = row.getInt(1);
204201

205202
backgroundQueue.submit(
206-
() -> {
203+
results -> {
207204
Overlay overlay = decodeOverlay(rawMutation, largestBatchId);
208-
synchronized (results) {
209-
results.put(overlay.getKey(), overlay);
210-
}
205+
results.put(overlay.getKey(), overlay);
211206
});
212207
}
213208

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteRemoteDocumentCache.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -168,21 +168,19 @@ public Map<DocumentKey, MutableDocument> getAll(Iterable<DocumentKey> documentKe
168168
bindVars,
169169
") ORDER BY path");
170170

171-
BackgroundQueue backgroundQueue = new BackgroundQueue();
171+
BackgroundQueue<DocumentKey, MutableDocument> backgroundQueue = new BackgroundQueue<>();
172172
while (longQuery.hasMoreSubqueries()) {
173173
longQuery
174174
.performNextSubquery()
175-
.forEach(row -> processRowInBackground(backgroundQueue, results, row, /*filter*/ null));
175+
.forEach(row -> processRowInBackground(backgroundQueue, row, /*filter*/ null));
176176
}
177-
backgroundQueue.drain();
177+
178+
backgroundQueue.drainInto(results);
178179

179180
// Backfill any rows with null "document_type" discovered by processRowInBackground().
180181
documentTypeBackfiller.backfill(db);
181182

182-
// Synchronize on `results` to avoid a data race with the background queue.
183-
synchronized (results) {
184-
return results;
185-
}
183+
return results;
186184
}
187185

188186
@Override
@@ -264,26 +262,23 @@ private Map<DocumentKey, MutableDocument> getAll(
264262
}
265263
bindVars[i] = count;
266264

267-
BackgroundQueue backgroundQueue = new BackgroundQueue();
268-
Map<DocumentKey, MutableDocument> results = new HashMap<>();
265+
BackgroundQueue<DocumentKey, MutableDocument> backgroundQueue = new BackgroundQueue<>();
269266
db.query(sql.toString())
270267
.binding(bindVars)
271268
.forEach(
272269
row -> {
273-
processRowInBackground(backgroundQueue, results, row, filter);
270+
processRowInBackground(backgroundQueue, row, filter);
274271
if (context != null) {
275272
context.incrementDocumentReadCount();
276273
}
277274
});
278-
backgroundQueue.drain();
275+
276+
HashMap<DocumentKey, MutableDocument> results = backgroundQueue.drain();
279277

280278
// Backfill any null "document_type" columns discovered by processRowInBackground().
281279
documentTypeBackfiller.backfill(db);
282280

283-
// Synchronize on `results` to avoid a data race with the background queue.
284-
synchronized (results) {
285-
return results;
286-
}
281+
return results;
287282
}
288283

289284
private Map<DocumentKey, MutableDocument> getAll(
@@ -296,8 +291,7 @@ private Map<DocumentKey, MutableDocument> getAll(
296291
}
297292

298293
private void processRowInBackground(
299-
BackgroundQueue backgroundQueue,
300-
Map<DocumentKey, MutableDocument> results,
294+
BackgroundQueue<DocumentKey, MutableDocument> backgroundQueue,
301295
Cursor row,
302296
@Nullable Function<MutableDocument, Boolean> filter) {
303297
byte[] rawDocument = row.getBlob(0);
@@ -307,16 +301,14 @@ private void processRowInBackground(
307301
String path = row.getString(4);
308302

309303
backgroundQueue.submit(
310-
() -> {
304+
results -> {
311305
MutableDocument document =
312306
decodeMaybeDocument(rawDocument, readTimeSeconds, readTimeNanos);
313307
if (documentTypeIsNull) {
314308
documentTypeBackfiller.enqueue(path, readTimeSeconds, readTimeNanos, document);
315309
}
316310
if (filter == null || filter.apply(document)) {
317-
synchronized (results) {
318-
results.put(document.getKey(), document);
319-
}
311+
results.put(document.getKey(), document);
320312
}
321313
});
322314
}

firebase-firestore/src/main/java/com/google/firebase/firestore/util/BackgroundQueue.kt

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,29 @@ import kotlinx.coroutines.asExecutor
2727
* called from the same thread. The behavior of an instance is undefined if any of the methods are
2828
* called from multiple threads.
2929
*/
30-
internal class BackgroundQueue {
30+
internal class BackgroundQueue<K, V> {
3131

32-
private var state: State = State.Submitting()
32+
private var state: State<K, V> = State.Submitting()
33+
34+
/** Overload for convenience of being called from Java code. */
35+
fun submit(consumer: Consumer<HashMap<K, V>>) = this.submit { consumer.accept(it) }
3336

3437
/**
3538
* Submit a task for asynchronous execution on the executor of the owning [BackgroundQueue].
3639
*
3740
* @throws IllegalStateException if [drain] has been called.
3841
*/
39-
fun submit(runnable: Runnable) {
42+
fun submit(runnable: (results: HashMap<K, V>) -> Unit) {
4043
val submittingState = this.state
41-
check(submittingState is State.Submitting) { "submit() may not be called after drain()" }
44+
check(submittingState is State.Submitting) {
45+
"submit() may not be called after drain() or drainInto()"
46+
}
4247

4348
submittingState.run {
4449
taskCount++
4550
executor.execute {
4651
try {
47-
runnable.run()
52+
runnable(threadLocalResults.get()!!)
4853
} finally {
4954
completedTasks.release()
5055
}
@@ -55,22 +60,53 @@ internal class BackgroundQueue {
5560
/**
5661
* Blocks until all tasks submitted via calls to [submit] have completed.
5762
*
58-
* @throws IllegalStateException if called more than once.
63+
* The results produced by each thread are merged into a new [HashMap] and returned.
64+
*
65+
* @throws IllegalStateException if [drain] or [drainInto] has already been called.
66+
*/
67+
fun drain(): HashMap<K, V> = HashMap<K, V>().also { drainInto(it) }
68+
69+
/**
70+
* Blocks until all tasks submitted via calls to [submit] have completed.
71+
*
72+
* The results produced by each thread are merged into the given map.
73+
*
74+
* @throws IllegalStateException if [drain] or [drainInto] has already been called.
5975
*/
60-
fun drain() {
76+
fun drainInto(results: MutableMap<K, V>) {
6177
val submittingState = this.state
62-
check(submittingState is State.Submitting) { "drain() may not be called more than once" }
63-
this.state = State.Draining
78+
check(submittingState is State.Submitting) { "drain() or drainInto() has already been called" }
79+
this.state = State.Draining()
80+
return submittingState.run {
81+
completedTasks.acquire(taskCount)
82+
threadLocalResults.mergeResultsInto(results)
83+
}
84+
}
85+
86+
private class ThreadLocalResults<K, V> : ThreadLocal<HashMap<K, V>>() {
87+
88+
private val results = mutableListOf<HashMap<K, V>>()
6489

65-
submittingState.run { completedTasks.acquire(taskCount) }
90+
override fun initialValue(): HashMap<K, V> {
91+
synchronized(results) {
92+
val result = HashMap<K, V>()
93+
results.add(result)
94+
return result
95+
}
96+
}
97+
98+
fun mergeResultsInto(mergedResults: MutableMap<K, V>) {
99+
synchronized(results) { results.forEach { mergedResults.putAll(it) } }
100+
}
66101
}
67102

68-
private sealed interface State {
69-
class Submitting : State {
103+
private sealed interface State<K, V> {
104+
class Submitting<K, V> : State<K, V> {
70105
val completedTasks = Semaphore(0)
106+
val threadLocalResults = ThreadLocalResults<K, V>()
71107
var taskCount: Int = 0
72108
}
73-
object Draining : State
109+
class Draining<K, V> : State<K, V>
74110
}
75111

76112
companion object {

firebase-firestore/src/test/java/com/google/firebase/firestore/core/QueryTest.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.HashMap;
4646
import java.util.Iterator;
4747
import java.util.List;
48-
import java.util.Map;
4948
import org.junit.Assert;
5049
import org.junit.Test;
5150
import org.junit.runner.RunWith;
@@ -984,25 +983,19 @@ public void testSynchronousMatchesOrderBy() {
984983
// SQLiteRemoteDocumentCache.getAll(...), where `query.matches(doc)` is performed
985984
// for many different docs concurrently on the BackgroundQueue.
986985
Iterator<MutableDocument> iterator = docs.iterator();
987-
BackgroundQueue backgroundQueue = new BackgroundQueue();
988-
Map<DocumentKey, Boolean> results = new HashMap<>();
986+
BackgroundQueue<DocumentKey, Boolean> backgroundQueue = new BackgroundQueue<>();
989987

990988
while (iterator.hasNext()) {
991989
MutableDocument doc = iterator.next();
992990
backgroundQueue.submit(
993-
() -> {
991+
results -> {
994992
// We call query.matches() to indirectly test query.matchesOrderBy()
995993
boolean result = query.matches(doc);
996-
997-
// We will include a synchronized block in our command to simulate
998-
// the implementation in SQLiteRemoteDocumentCache.getAll(...)
999-
synchronized (results) {
1000-
results.put(doc.getKey(), result);
1001-
}
994+
results.put(doc.getKey(), result);
1002995
});
1003996
}
1004997

1005-
backgroundQueue.drain();
998+
HashMap<DocumentKey, Boolean> results = backgroundQueue.drain();
1006999

10071000
Assert.assertEquals(101, results.keySet().size());
10081001
for (DocumentKey key : results.keySet()) {

0 commit comments

Comments
 (0)