Skip to content

Commit 22a59cd

Browse files
committed
Update remaining unit tests
1 parent 71b003e commit 22a59cd

9 files changed

+104
-50
lines changed

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ abstract class BaseFirestoreV1ReadFnTest<InT, OutT>
4545
public final void attemptsExhaustedForRetryableError() throws Exception {
4646
BaseFirestoreV1ReadFn<InT, OutT> fn = getFn(clock, ff, rpcQosOptions);
4747
V1RpcFnTestCtx ctx = newCtx();
48-
when(ff.getFirestoreStub(any())).thenReturn(stub);
48+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
4949
when(ff.getRpcQos(any())).thenReturn(rpcQos);
5050
when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt);
5151
ctx.mockRpcToCallable(stub);
@@ -79,7 +79,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
7979
public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
8080
BaseFirestoreV1ReadFn<InT, OutT> fn = getFn(clock, ff, rpcQosOptions);
8181
V1RpcFnTestCtx ctx = newCtx();
82-
when(ff.getFirestoreStub(any())).thenReturn(stub);
82+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
8383
when(ff.getRpcQos(any())).thenReturn(rpcQos);
8484
when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt);
8585

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
129129
Write write = newWrite();
130130
Element<Write> element1 = new WriteElement(0, write, window);
131131

132-
when(ff.getFirestoreStub(any())).thenReturn(stub);
132+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
133133
when(ff.getRpcQos(any())).thenReturn(rpcQos);
134134
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
135135
.thenReturn(attempt);
@@ -175,7 +175,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
175175
@Override
176176
@Test
177177
public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
178-
when(ff.getFirestoreStub(any())).thenReturn(stub);
178+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
179179
when(ff.getRpcQos(any())).thenReturn(rpcQos);
180180
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
181181
.thenReturn(attempt);
@@ -369,7 +369,7 @@ public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottlin
369369
LOG.debug("options = {}", options);
370370

371371
FirestoreStatefulComponentFactory ff = mock(FirestoreStatefulComponentFactory.class);
372-
when(ff.getFirestoreStub(any())).thenReturn(stub);
372+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
373373
Random random = new Random(12345);
374374
TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1));
375375
Sleeper sleeper = millis -> clock.setNext(advanceClockBy(Duration.millis(millis)));

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchGetDocumentsTest.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public final class FirestoreV1FnBatchGetDocumentsTest
5959
public Instant readTime;
6060

6161
@Parameterized.Parameter(1)
62-
public boolean setDatabaseOnReadFn;
62+
public boolean setDatabaseOnFn;
6363

6464
@Rule public MockitoRule rule = MockitoJUnit.rule();
6565

@@ -70,7 +70,7 @@ public final class FirestoreV1FnBatchGetDocumentsTest
7070
@Mock private ServerStream<BatchGetDocumentsResponse> responseStream2;
7171
@Mock private ServerStream<BatchGetDocumentsResponse> responseStream3;
7272

73-
@Parameterized.Parameters(name = "readTime = {0}, setDatabaseOnReadFn = {1}")
73+
@Parameterized.Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}")
7474
public static Collection<Object[]> data() {
7575
return Arrays.asList(
7676
new Object[][] {
@@ -85,14 +85,6 @@ private BatchGetDocumentsRequest withReadTime(
8585
: request.toBuilder().setReadTime(Timestamps.fromMillis(readTime.getMillis())).build();
8686
}
8787

88-
private void mockFirestoreStub() {
89-
if (setDatabaseOnReadFn) {
90-
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
91-
} else {
92-
when(ff.getFirestoreStub(any())).thenReturn(stub);
93-
}
94-
}
95-
9688
@Test
9789
public void endToEnd() throws Exception {
9890
final BatchGetDocumentsRequest request =
@@ -114,7 +106,7 @@ public void endToEnd() throws Exception {
114106

115107
when(stub.batchGetDocumentsCallable()).thenReturn(callable);
116108

117-
mockFirestoreStub();
109+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
118110
when(ff.getRpcQos(any()))
119111
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(rpcQosOptions));
120112

@@ -200,7 +192,7 @@ protected BatchGetDocumentsResponse computeNext() {
200192

201193
when(stub.batchGetDocumentsCallable()).thenReturn(callable);
202194

203-
mockFirestoreStub();
195+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
204196
when(ff.getRpcQos(any())).thenReturn(rpcQos);
205197
when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
206198
when(attempt.awaitSafeToProceed(any())).thenReturn(true);
@@ -263,7 +255,7 @@ protected BatchGetDocumentsFn getFn(
263255
}
264256

265257
private BatchGetDocumentsFn getFnWithParameters() {
266-
if (setDatabaseOnReadFn) {
258+
if (setDatabaseOnFn) {
267259
return new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)");
268260
} else {
269261
return new BatchGetDocumentsFn(clock, ff, rpcQosOptions, readTime);

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception {
6767
int maxBytes = 50;
6868
RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
6969

70-
when(ff.getFirestoreStub(any())).thenReturn(stub);
70+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
7171
when(ff.getRpcQos(any()))
7272
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
7373

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception {
7676
int maxBytes = 50;
7777
RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
7878

79-
when(ff.getFirestoreStub(any())).thenReturn(stub);
79+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
8080
when(ff.getRpcQos(any()))
8181
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
8282

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListCollectionIdsTest.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public final class FirestoreV1FnListCollectionIdsTest
5757

5858
@Parameter public Instant readTime;
5959

60+
@Parameter(1)
61+
public boolean setDatabaseOnFn;
62+
6063
@Rule public MockitoRule rule = MockitoJUnit.rule();
6164

6265
@Mock private UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> callable;
@@ -65,9 +68,12 @@ public final class FirestoreV1FnListCollectionIdsTest
6568
@Mock private ListCollectionIdsPagedResponse pagedResponse2;
6669
@Mock private ListCollectionIdsPage page2;
6770

68-
@Parameters(name = "readTime = {0}")
69-
public static Collection<Object> data() {
70-
return Arrays.asList(null, Instant.now());
71+
@Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}")
72+
public static Collection<Object[]> data() {
73+
return Arrays.asList(
74+
new Object[][] {
75+
{null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true}
76+
});
7177
}
7278

7379
private ListCollectionIdsRequest withReadTime(ListCollectionIdsRequest input, Instant readTime) {
@@ -104,7 +110,7 @@ public void endToEnd() throws Exception {
104110

105111
when(stub.listCollectionIdsPagedCallable()).thenReturn(callable);
106112

107-
when(ff.getFirestoreStub(any())).thenReturn(stub);
113+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
108114
RpcQosOptions options = RpcQosOptions.defaultOptions();
109115
when(ff.getRpcQos(any()))
110116
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
@@ -116,7 +122,7 @@ public void endToEnd() throws Exception {
116122

117123
when(processContext.element()).thenReturn(request1);
118124

119-
ListCollectionIdsFn fn = new ListCollectionIdsFn(clock, ff, options, readTime);
125+
ListCollectionIdsFn fn = getFnWithParameters();
120126

121127
runFunction(fn);
122128

@@ -127,7 +133,7 @@ public void endToEnd() throws Exception {
127133

128134
@Override
129135
public void resumeFromLastReadValue() throws Exception {
130-
when(ff.getFirestoreStub(any())).thenReturn(stub);
136+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
131137
when(ff.getRpcQos(any())).thenReturn(rpcQos);
132138
when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
133139
when(attempt.awaitSafeToProceed(any())).thenReturn(true);
@@ -186,7 +192,7 @@ protected ListCollectionIdsPage computeNext() {
186192

187193
when(stub.listCollectionIdsPagedCallable()).thenReturn(callable);
188194

189-
when(ff.getFirestoreStub(any())).thenReturn(stub);
195+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
190196

191197
ArgumentCaptor<ListCollectionIdsResponse> responses =
192198
ArgumentCaptor.forClass(ListCollectionIdsResponse.class);
@@ -195,7 +201,7 @@ protected ListCollectionIdsPage computeNext() {
195201

196202
when(processContext.element()).thenReturn(request1);
197203

198-
ListCollectionIdsFn fn = new ListCollectionIdsFn(clock, ff, rpcQosOptions, readTime);
204+
ListCollectionIdsFn fn = getFnWithParameters();
199205

200206
runFunction(fn);
201207

@@ -238,4 +244,12 @@ protected ListCollectionIdsFn getFn(
238244
RpcQosOptions rpcQosOptions) {
239245
return new ListCollectionIdsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
240246
}
247+
248+
private ListCollectionIdsFn getFnWithParameters() {
249+
if (setDatabaseOnFn) {
250+
return new ListCollectionIdsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)");
251+
} else {
252+
return new ListCollectionIdsFn(clock, ff, rpcQosOptions, readTime);
253+
}
254+
}
241255
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnListDocumentsTest.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.junit.Test;
4848
import org.junit.runner.RunWith;
4949
import org.junit.runners.Parameterized;
50+
import org.junit.runners.Parameterized.Parameter;
51+
import org.junit.runners.Parameterized.Parameters;
5052
import org.mockito.ArgumentCaptor;
5153
import org.mockito.Mock;
5254
import org.mockito.junit.MockitoJUnit;
@@ -58,6 +60,9 @@ public final class FirestoreV1FnListDocumentsTest
5860

5961
@Parameterized.Parameter public Instant readTime;
6062

63+
@Parameter(1)
64+
public boolean setDatabaseOnFn;
65+
6166
@Rule public MockitoRule rule = MockitoJUnit.rule();
6267

6368
@Mock private UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> callable;
@@ -66,9 +71,12 @@ public final class FirestoreV1FnListDocumentsTest
6671
@Mock private ListDocumentsPagedResponse pagedResponse2;
6772
@Mock private ListDocumentsPage page2;
6873

69-
@Parameterized.Parameters(name = "readTime = {0}")
70-
public static Collection<Object> data() {
71-
return Arrays.asList(null, Instant.now());
74+
@Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}")
75+
public static Collection<Object[]> data() {
76+
return Arrays.asList(
77+
new Object[][] {
78+
{null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true}
79+
});
7280
}
7381

7482
private ListDocumentsRequest withReadTime(ListDocumentsRequest request, Instant readTime) {
@@ -127,7 +135,7 @@ public void endToEnd() throws Exception {
127135

128136
when(stub.listDocumentsPagedCallable()).thenReturn(callable);
129137

130-
when(ff.getFirestoreStub(any())).thenReturn(stub);
138+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
131139
RpcQosOptions options = RpcQosOptions.defaultOptions();
132140
when(ff.getRpcQos(any()))
133141
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
@@ -139,7 +147,7 @@ public void endToEnd() throws Exception {
139147

140148
when(processContext.element()).thenReturn(request1);
141149

142-
ListDocumentsFn fn = new ListDocumentsFn(clock, ff, options, readTime);
150+
ListDocumentsFn fn = getFnWithParameters();
143151

144152
runFunction(fn);
145153

@@ -150,7 +158,7 @@ public void endToEnd() throws Exception {
150158

151159
@Override
152160
public void resumeFromLastReadValue() throws Exception {
153-
when(ff.getFirestoreStub(any())).thenReturn(stub);
161+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
154162
when(ff.getRpcQos(any())).thenReturn(rpcQos);
155163
when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
156164
when(attempt.awaitSafeToProceed(any())).thenReturn(true);
@@ -231,7 +239,7 @@ protected ListDocumentsPage computeNext() {
231239

232240
when(stub.listDocumentsPagedCallable()).thenReturn(callable);
233241

234-
when(ff.getFirestoreStub(any())).thenReturn(stub);
242+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
235243

236244
ArgumentCaptor<ListDocumentsResponse> responses =
237245
ArgumentCaptor.forClass(ListDocumentsResponse.class);
@@ -283,4 +291,12 @@ protected ListDocumentsFn getFn(
283291
RpcQosOptions rpcQosOptions) {
284292
return new ListDocumentsFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
285293
}
294+
295+
private ListDocumentsFn getFnWithParameters() {
296+
if (setDatabaseOnFn) {
297+
return new ListDocumentsFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)");
298+
} else {
299+
return new ListDocumentsFn(clock, ff, rpcQosOptions, readTime);
300+
}
301+
}
286302
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import org.junit.Test;
4848
import org.junit.runner.RunWith;
4949
import org.junit.runners.Parameterized;
50+
import org.junit.runners.Parameterized.Parameter;
51+
import org.junit.runners.Parameterized.Parameters;
5052
import org.mockito.ArgumentCaptor;
5153
import org.mockito.Mock;
5254
import org.mockito.junit.MockitoJUnit;
@@ -58,6 +60,9 @@ public final class FirestoreV1FnPartitionQueryTest
5860

5961
@Parameterized.Parameter public Instant readTime;
6062

63+
@Parameter(1)
64+
public boolean setDatabaseOnFn;
65+
6166
@Rule public MockitoRule rule = MockitoJUnit.rule();
6267

6368
@Mock private UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse> callable;
@@ -66,9 +71,12 @@ public final class FirestoreV1FnPartitionQueryTest
6671
@Mock private PartitionQueryPagedResponse pagedResponse2;
6772
@Mock private PartitionQueryPage page2;
6873

69-
@Parameterized.Parameters(name = "readTime = {0}")
70-
public static Collection<Object> data() {
71-
return Arrays.asList(null, Instant.now());
74+
@Parameters(name = "readTime = {0}, setDatabaseOnFn = {1}")
75+
public static Collection<Object[]> data() {
76+
return Arrays.asList(
77+
new Object[][] {
78+
{null, false}, {null, true}, {Instant.now(), false}, {Instant.now(), true}
79+
});
7280
}
7381

7482
private PartitionQueryRequest withReadTime(PartitionQueryRequest request, Instant readTime) {
@@ -101,7 +109,7 @@ public void endToEnd() throws Exception {
101109

102110
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
103111

104-
when(ff.getFirestoreStub(any())).thenReturn(stub);
112+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
105113
RpcQosOptions options = RpcQosOptions.defaultOptions();
106114
when(ff.getRpcQos(any()))
107115
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
@@ -113,7 +121,7 @@ public void endToEnd() throws Exception {
113121

114122
when(processContext.element()).thenReturn(request1);
115123

116-
PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options, readTime);
124+
PartitionQueryFn fn = getFnWithParameters();
117125

118126
runFunction(fn);
119127

@@ -136,7 +144,7 @@ public void endToEnd_emptyCursors() throws Exception {
136144

137145
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
138146

139-
when(ff.getFirestoreStub(any())).thenReturn(stub);
147+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
140148
RpcQosOptions options = RpcQosOptions.defaultOptions();
141149
when(ff.getRpcQos(any()))
142150
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
@@ -148,7 +156,7 @@ public void endToEnd_emptyCursors() throws Exception {
148156

149157
when(processContext.element()).thenReturn(request1);
150158

151-
PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options, readTime);
159+
PartitionQueryFn fn = getFnWithParameters();
152160

153161
runFunction(fn);
154162

@@ -159,7 +167,7 @@ public void endToEnd_emptyCursors() throws Exception {
159167

160168
@Override
161169
public void resumeFromLastReadValue() throws Exception {
162-
when(ff.getFirestoreStub(any())).thenReturn(stub);
170+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
163171
when(ff.getRpcQos(any())).thenReturn(rpcQos);
164172
when(rpcQos.newReadAttempt(any())).thenReturn(attempt);
165173
when(attempt.awaitSafeToProceed(any())).thenReturn(true);
@@ -230,7 +238,7 @@ protected PartitionQueryPage computeNext() {
230238

231239
when(stub.partitionQueryPagedCallable()).thenReturn(callable);
232240

233-
when(ff.getFirestoreStub(any())).thenReturn(stub);
241+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
234242

235243
ArgumentCaptor<PartitionQueryPair> responses =
236244
ArgumentCaptor.forClass(PartitionQueryPair.class);
@@ -283,4 +291,12 @@ protected PartitionQueryFn getFn(
283291
RpcQosOptions rpcQosOptions) {
284292
return new PartitionQueryFn(clock, firestoreStatefulComponentFactory, rpcQosOptions);
285293
}
294+
295+
private PartitionQueryFn getFnWithParameters() {
296+
if (setDatabaseOnFn) {
297+
return new PartitionQueryFn(clock, ff, rpcQosOptions, readTime, projectId, "(default)");
298+
} else {
299+
return new PartitionQueryFn(clock, ff, rpcQosOptions, readTime);
300+
}
301+
}
286302
}

0 commit comments

Comments
 (0)