@@ -125,84 +125,6 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() {
125
125
any (String .class ), any (Schema .class ));
126
126
}
127
127
128
- @ Test
129
- public void testAutoCreateTables () {
130
- final String dataset = "scratch" ;
131
- final String existingTableTopic = "topic-with-existing-table" ;
132
- final String nonExistingTableTopic = "topic-without-existing-table" ;
133
- final TableId existingTable = TableId .of (dataset , "topic_with_existing_table" );
134
- final TableId nonExistingTable = TableId .of (dataset , "topic_without_existing_table" );
135
-
136
- Map <String , String > properties = propertiesFactory .getProperties ();
137
- properties .put (BigQuerySinkConfig .TABLE_CREATE_CONFIG , "true" );
138
- properties .put (BigQuerySinkConfig .SCHEMA_RETRIEVER_CONFIG , BigQuerySinkConnectorTest .MockSchemaRetriever .class .getName ());
139
- properties .put (BigQuerySinkConfig .SANITIZE_TOPICS_CONFIG , "true" );
140
- properties .put (BigQuerySinkConfig .DATASETS_CONFIG , String .format (".*=%s" , dataset ));
141
- properties .put (BigQuerySinkConfig .TOPICS_CONFIG , existingTableTopic );
142
-
143
- BigQuery bigQuery = mock (BigQuery .class );
144
- Table fakeTable = mock (Table .class );
145
- when (bigQuery .getTable (existingTable )).thenReturn (fakeTable );
146
- when (bigQuery .getTable (nonExistingTable )).thenReturn (null );
147
- InsertAllResponse insertAllResponse = mock (InsertAllResponse .class );
148
- when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
149
- when (insertAllResponse .hasErrors ()).thenReturn (false );
150
-
151
- Storage storage = mock (Storage .class );
152
- SchemaRetriever schemaRetriever = mock (SchemaRetriever .class );
153
- SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
154
- SchemaManager schemaManager = mock (SchemaManager .class );
155
-
156
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , schemaRetriever , storage , schemaManager );
157
- testTask .initialize (sinkTaskContext );
158
- testTask .start (properties );
159
-
160
- testTask .put (Collections .singletonList (spoofSinkRecord (nonExistingTableTopic )));
161
- testTask .flush (Collections .emptyMap ());
162
-
163
- verify (schemaManager , never ()).createTable (existingTable , existingTableTopic );
164
- verify (schemaManager ).createTable (nonExistingTable , nonExistingTableTopic );
165
- }
166
-
167
- @ Test
168
- public void testNonAutoCreateTables () {
169
- final String dataset = "scratch" ;
170
- final String existingTableTopic = "topic-with-existing-table" ;
171
- final String nonExistingTableTopic = "topic-without-existing-table" ;
172
- final TableId existingTable = TableId .of (dataset , "topic_with_existing_table" );
173
- final TableId nonExistingTable = TableId .of (dataset , "topic_without_existing_table" );
174
-
175
- Map <String , String > properties = propertiesFactory .getProperties ();
176
- properties .put (BigQuerySinkConfig .SCHEMA_RETRIEVER_CONFIG , BigQuerySinkConnectorTest .MockSchemaRetriever .class .getName ());
177
- properties .put (BigQuerySinkConfig .SANITIZE_TOPICS_CONFIG , "true" );
178
- properties .put (BigQuerySinkConfig .DATASETS_CONFIG , String .format (".*=%s" , dataset ));
179
- properties .put (BigQuerySinkConfig .TOPICS_CONFIG , existingTableTopic );
180
-
181
- BigQuery bigQuery = mock (BigQuery .class );
182
- Table fakeTable = mock (Table .class );
183
- when (bigQuery .getTable (existingTable )).thenReturn (fakeTable );
184
- when (bigQuery .getTable (nonExistingTable )).thenReturn (null );
185
- InsertAllResponse insertAllResponse = mock (InsertAllResponse .class );
186
- when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
187
- when (insertAllResponse .hasErrors ()).thenReturn (false );
188
-
189
- Storage storage = mock (Storage .class );
190
- SchemaRetriever schemaRetriever = mock (SchemaRetriever .class );
191
- SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
192
- SchemaManager schemaManager = mock (SchemaManager .class );
193
-
194
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , schemaRetriever , storage , schemaManager );
195
- testTask .initialize (sinkTaskContext );
196
- testTask .start (properties );
197
-
198
- testTask .put (Collections .singletonList (spoofSinkRecord (nonExistingTableTopic )));
199
- testTask .flush (Collections .emptyMap ());
200
-
201
- verify (schemaManager , never ()).createTable (existingTable , existingTableTopic );
202
- verify (schemaManager , never ()).createTable (nonExistingTable , existingTableTopic );
203
- }
204
-
205
-
206
128
@ Test
207
129
public void testEmptyPut () {
208
130
Map <String , String > properties = propertiesFactory .getProperties ();
0 commit comments