Skip to content
This repository was archived by the owner on Mar 11, 2022. It is now read-only.

Commit f93dd18

Browse files
authored
Fix #599 - Record checkpoint on empty _changes result (#600)
* Set check point when the changes are empty (e.g. in filtered pull replications) * Update checkpoint only when modified from previous recorded checkpoint * Added checkpoint tests
1 parent b4374e4 commit f93dd18

File tree

4 files changed

+127
-15
lines changed

4 files changed

+127
-15
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# 2.x.y (Unreleased)
2+
- [IMPROVED] Record checkpoint on empty `_changes` result in pull replications. This change optimizes
3+
filtered replications when changes in remote database doesn't match the replication filter.
14
# 2.4.0 (2019-01-15)
25
- [NEW] `Database` methods `read`, `contains`, `create`, and `delete` now accept local
36
(non-replicating documents). These documents must have their document ID prefixed with `_local/`

cloudant-sync-datastore-core/src/main/java/com/cloudant/sync/internal/replication/PullStrategy.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,10 @@ private void replicate()
236236
}
237237

238238
this.state.documentCounter = 0;
239+
239240
while (!this.state.cancel) {
240241
this.state.batchCounter++;
241-
242+
final Object lastKnownCheckpoint = this.targetDb.getCheckpoint(this.getReplicationId());
242243
String msg = String.format(
243244
"Batch %s started (completed %s changes so far)",
244245
this.state.batchCounter,
@@ -247,7 +248,7 @@ private void replicate()
247248
logger.info(msg);
248249
long batchStartTime = System.currentTimeMillis();
249250

250-
ChangesResultWrapper changeFeeds = this.nextBatch();
251+
ChangesResultWrapper changeFeeds = this.nextBatch(lastKnownCheckpoint);
251252
int batchChangesProcessed = 0;
252253

253254
// So we can check whether all changes were processed during
@@ -264,6 +265,16 @@ private void replicate()
264265
state.documentCounter += batchChangesProcessed;
265266
}
266267

268+
if (!this.state.cancel && (lastKnownCheckpoint == null || !lastKnownCheckpoint.equals(changeFeeds.getLastSeq()))) {
269+
try {
270+
this.targetDb.putCheckpoint(this.getReplicationId(), changeFeeds.getLastSeq());
271+
} catch (DocumentStoreException e) {
272+
logger.log(Level.WARNING, "Failed to put checkpoint doc, next replication " +
273+
"will " +
274+
"start from previous checkpoint", e);
275+
}
276+
}
277+
267278
long batchEndTime = System.currentTimeMillis();
268279
msg = String.format(
269280
"Batch %s completed in %sms (batch was %s changes)",
@@ -435,15 +446,6 @@ private int processOneChangesBatch(ChangesResultWrapper changeFeeds)
435446
}
436447
}
437448

438-
if (!this.state.cancel) {
439-
try {
440-
this.targetDb.putCheckpoint(this.getReplicationId(), changeFeeds.getLastSeq());
441-
} catch (DocumentStoreException e) {
442-
logger.log(Level.WARNING, "Failed to put checkpoint doc, next replication will " +
443-
"start from previous checkpoint", e);
444-
}
445-
}
446-
447449
return changesProcessed;
448450
}
449451

@@ -470,8 +472,7 @@ public String getReplicationId() throws DocumentStoreException {
470472
}
471473
}
472474

473-
private ChangesResultWrapper nextBatch() throws DocumentStoreException {
474-
final Object lastCheckpoint = this.targetDb.getCheckpoint(this.getReplicationId());
475+
private ChangesResultWrapper nextBatch(final Object lastCheckpoint) {
475476
logger.fine("last checkpoint " + lastCheckpoint);
476477

477478
ChangesResult changeFeeds = null;

cloudant-sync-datastore-core/src/test/java/com/cloudant/sync/internal/replication/PullStrategyMockTest.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.cloudant.sync.internal.replication;
1818

19+
import static org.junit.Assume.assumeNoException;
20+
import static org.mockito.Matchers.eq;
1921
import static org.mockito.Mockito.any;
2022
import static org.mockito.Mockito.anyInt;
2123
import static org.mockito.Mockito.anyString;
@@ -26,12 +28,12 @@
2628
import static org.mockito.Mockito.when;
2729

2830
import com.cloudant.common.RequireRunningCouchDB;
31+
import com.cloudant.sync.event.Subscribe;
32+
import com.cloudant.sync.internal.documentstore.DocumentRevsList;
2933
import com.cloudant.sync.internal.mazha.ChangesResult;
3034
import com.cloudant.sync.internal.mazha.DocumentRevs;
3135
import com.cloudant.sync.internal.mazha.OkOpenRevision;
3236
import com.cloudant.sync.internal.mazha.OpenRevision;
33-
import com.cloudant.sync.internal.documentstore.DocumentRevsList;
34-
import com.cloudant.sync.event.Subscribe;
3537
import com.cloudant.sync.internal.util.JSONUtils;
3638
import com.cloudant.sync.replication.PullFilter;
3739
import com.cloudant.sync.util.TestUtils;
@@ -231,6 +233,107 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
231233
verify(mockListener,never()).error(any(ReplicationStrategyErrored.class));
232234
}
233235

236+
@Test
237+
public void testSetCheckpointWhenEmpty() throws Exception {
238+
CouchDB mockRemoteDb = mock(CouchDB.class);
239+
when(mockRemoteDb.changes((PullFilter) null, null, 1000)).then(new Answer<Object>() {
240+
@Override
241+
public Object answer(InvocationOnMock invocation) throws Throwable {
242+
FileReader fr = new FileReader(TestUtils.loadFixture
243+
("fixture/empty_changes.json"));
244+
return JSONUtils.fromJson(fr, ChangesResult.class);
245+
}
246+
});
247+
when(mockRemoteDb.exists()).thenReturn(true);
248+
249+
StrategyListener mockListener = mock(StrategyListener.class);
250+
PullStrategy pullStrategy = super.getPullStrategy();
251+
pullStrategy.sourceDb = mockRemoteDb;
252+
pullStrategy.getEventBus().register(mockListener);
253+
pullStrategy.run();
254+
255+
//should have 0 document
256+
Assert.assertEquals(this.datastore.getDocumentCount(), 0);
257+
//Checkpoint should be created in targetDb
258+
String checkpoint =
259+
(String) pullStrategy.targetDb.getCheckpoint(pullStrategy.getReplicationId());
260+
Assert.assertEquals(checkpoint, "10-d9e5b0147af143e5b6d1979378ad957b");
261+
//make sure the correct events were fired
262+
verify(mockListener).complete(any(ReplicationStrategyCompleted.class));
263+
verify(mockListener, never()).error(any(ReplicationStrategyErrored.class));
264+
}
265+
266+
@Test
267+
public void testDoNotSetCheckpointWhenNotModified() throws Exception {
268+
try {
269+
CouchDB mockRemoteDb = mock(CouchDB.class);
270+
when(mockRemoteDb.changes((PullFilter) null, "10-d9e5b0147af143e5b6d1979378ad957b", 1000)).then(new Answer<Object>() {
271+
@Override
272+
public Object answer(InvocationOnMock invocation) throws Throwable {
273+
FileReader fr = new FileReader(TestUtils.loadFixture
274+
("fixture/empty_changes.json"));
275+
return JSONUtils.fromJson(fr, ChangesResult.class);
276+
}
277+
});
278+
when(mockRemoteDb.exists()).thenReturn(true);
279+
280+
DatastoreWrapper mockLocalDb = mock(DatastoreWrapper.class);
281+
when(mockLocalDb.getCheckpoint(any(String.class))).thenReturn("10" +
282+
"-d9e5b0147af143e5b6d1979378ad957b");
283+
284+
StrategyListener mockListener = mock(StrategyListener.class);
285+
PullStrategy pullStrategy = super.getPullStrategy();
286+
pullStrategy.sourceDb = mockRemoteDb;
287+
pullStrategy.targetDb = mockLocalDb;
288+
pullStrategy.getEventBus().register(mockListener);
289+
pullStrategy.run();
290+
291+
//make sure the correct events were fired
292+
verify(mockListener).complete(any(ReplicationStrategyCompleted.class));
293+
verify(mockListener, never()).error(any(ReplicationStrategyErrored.class));
294+
verify(mockLocalDb, never()).putCheckpoint(anyString(), any());
295+
} catch(UnsupportedOperationException uoe) {
296+
assumeNoException("Cannot proxy DatastoreWrapper on Dalvik", uoe);
297+
}
298+
}
299+
300+
@Test
301+
public void testSetCheckpointWhenModified() throws Exception {
302+
try {
303+
CouchDB mockRemoteDb = mock(CouchDB.class);
304+
when(mockRemoteDb.changes((PullFilter) null, "9-d9e5b0147af143e5b6d1979378ad957b",
305+
1000)).then(new Answer<Object>() {
306+
@Override
307+
public Object answer(InvocationOnMock invocation) throws Throwable {
308+
FileReader fr = new FileReader(TestUtils.loadFixture
309+
("fixture/empty_changes.json"));
310+
return JSONUtils.fromJson(fr, ChangesResult.class);
311+
}
312+
});
313+
when(mockRemoteDb.exists()).thenReturn(true);
314+
315+
DatastoreWrapper mockLocalDb = mock(DatastoreWrapper.class);
316+
when(mockLocalDb.getCheckpoint(any(String.class))).thenReturn("9" +
317+
"-d9e5b0147af143e5b6d1979378ad957b");
318+
319+
StrategyListener mockListener = mock(StrategyListener.class);
320+
PullStrategy pullStrategy = super.getPullStrategy();
321+
pullStrategy.sourceDb = mockRemoteDb;
322+
pullStrategy.targetDb = mockLocalDb;
323+
pullStrategy.getEventBus().register(mockListener);
324+
pullStrategy.run();
325+
326+
//make sure the correct events were fired
327+
verify(mockListener).complete(any(ReplicationStrategyCompleted.class));
328+
verify(mockListener, never()).error(any(ReplicationStrategyErrored.class));
329+
verify(mockLocalDb).putCheckpoint(anyString(), eq("10" +
330+
"-d9e5b0147af143e5b6d1979378ad957b"));
331+
} catch (UnsupportedOperationException uoe) {
332+
assumeNoException("Cannot proxy DatastoreWrapper on Dalvik", uoe);
333+
}
334+
}
335+
336+
234337
public class StrategyListener {
235338

236339
@Subscribe

fixture/empty_changes.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"last_seq": "10-d9e5b0147af143e5b6d1979378ad957b",
3+
"pending": 0,
4+
"results": []
5+
}

0 commit comments

Comments
 (0)