|
43 | 43 |
|
44 | 44 | import java.util.List; |
45 | 45 |
|
| 46 | +import static org.mockito.ArgumentMatchers.any; |
46 | 47 | import static org.mockito.ArgumentMatchers.anyInt; |
47 | 48 | import static org.mockito.ArgumentMatchers.anyString; |
48 | 49 | import static org.mockito.Mockito.mock; |
| 50 | +import static org.mockito.Mockito.never; |
49 | 51 | import static org.mockito.Mockito.times; |
| 52 | +import static org.mockito.Mockito.verify; |
50 | 53 | import static org.mockito.Mockito.when; |
51 | 54 |
|
52 | 55 | /** |
@@ -87,19 +90,21 @@ public void testBigQuerySinkInvalidConfig() { |
87 | 90 |
|
88 | 91 | @Test |
89 | 92 | public void testBigQueryTimePartitionConfig() { |
90 | | - Schema schema = Schema.recordOf("record", |
91 | | - Schema.Field.of("id", Schema.of(Schema.Type.LONG)), |
92 | | - Schema.Field.of("name", Schema.of(Schema.Type.STRING)), |
93 | | - Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)), |
94 | | - Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))), |
95 | | - Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)), |
96 | | - Schema.Field.of("timestamp", |
97 | | - Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)))); |
| 93 | + Schema schema = |
| 94 | + Schema.recordOf("record", |
| 95 | + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), |
| 96 | + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), |
| 97 | + Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)), |
| 98 | + Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))), |
| 99 | + Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)), |
| 100 | + Schema.Field.of("timestamp", |
| 101 | + Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)))); |
98 | 102 |
|
99 | | - BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(), |
100 | | - "TIME", 0L, 100L, 10L, null); |
| 103 | + BigQuerySinkConfig config = |
| 104 | + new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(), |
| 105 | + "TIME", 0L, 100L, 10L, null); |
101 | 106 | config.partitionByField = "dt"; |
102 | | - |
| 107 | + |
103 | 108 | MockFailureCollector collector = new MockFailureCollector("bqsink"); |
104 | 109 | config.validate(collector); |
105 | 110 | Assert.assertEquals(0, collector.getValidationFailures().size()); |
@@ -382,4 +387,19 @@ public void testDatasetWithSpecialCharacters() { |
382 | 387 | Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new!table?2020")); |
383 | 388 | Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new^table|2020")); |
384 | 389 | } |
| 390 | + |
| 391 | + @Test |
| 392 | + public void testInitSQLEngineOutputDoesNotInitOutputWithNullSchema() throws Exception { |
| 393 | + BatchSinkContext sinkContext = mock(BatchSinkContext.class); |
| 394 | + MockFailureCollector collector = new MockFailureCollector("bqsink"); |
| 395 | + |
| 396 | + BigQuerySinkConfig config = |
| 397 | + new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", null, |
| 398 | + null, null, null, null, null); |
| 399 | + BigQuery bigQueryMock = mock(BigQuery.class); |
| 400 | + |
| 401 | + BigQuerySink sink = new BigQuerySink(config); |
| 402 | + sink.initSQLEngineOutput(sinkContext, bigQueryMock, "sink", "sink", "table", null, collector); |
| 403 | + verify(sinkContext, never()).addOutput(any()); |
| 404 | + } |
385 | 405 | } |
0 commit comments