55
66import com .azure .cosmos .ChangeFeedProcessor ;
77import com .azure .cosmos .ChangeFeedProcessorBuilder ;
8- import com .azure .cosmos .ConsistencyLevel ;
98import com .azure .cosmos .CosmosAsyncClient ;
109import com .azure .cosmos .CosmosAsyncContainer ;
1110import com .azure .cosmos .CosmosAsyncDatabase ;
12- import com .azure .cosmos .CosmosClientBuilder ;
13- import com .azure .cosmos .CosmosException ;
1411import com .azure .cosmos .kafka .connect .CosmosDBConfig ;
1512import com .azure .cosmos .kafka .connect .TopicContainerMap ;
13+ import com .azure .cosmos .kafka .connect .implementations .CosmosClientStore ;
1614import com .azure .cosmos .kafka .connect .implementations .CosmosKafkaSchedulers ;
1715import com .azure .cosmos .models .ChangeFeedProcessorOptions ;
18- import com .azure .cosmos .models .CosmosContainerProperties ;
19- import com .azure .cosmos .models .CosmosContainerRequestOptions ;
20- import com .azure .cosmos .models .CosmosContainerResponse ;
21- import com .azure .cosmos .models .ThroughputProperties ;
2216import com .fasterxml .jackson .databind .JsonNode ;
2317import org .apache .kafka .connect .data .Schema ;
2418import org .apache .kafka .connect .data .SchemaAndValue ;
2519import org .apache .kafka .connect .source .SourceRecord ;
2620import org .apache .kafka .connect .source .SourceTask ;
2721import org .slf4j .Logger ;
2822import org .slf4j .LoggerFactory ;
23+ import reactor .core .publisher .Mono ;
2924
3025import java .time .Duration ;
3126import java .util .ArrayList ;
@@ -69,13 +64,13 @@ public void start(Map<String, String> map) {
6964 this .queue = new LinkedTransferQueue <>();
7065
7166 logger .info ("Worker {} Creating the client." , this .config .getWorkerName ());
72- client = getCosmosClient (config );
67+ client = CosmosClientStore . getCosmosClient (this . config , this . getUserAgentSuffix () );
7368
7469 // Initialize the database, feed and lease containers
7570 CosmosAsyncDatabase database = client .getDatabase (config .getDatabaseName ());
7671 String container = config .getAssignedContainer ();
7772 CosmosAsyncContainer feedContainer = database .getContainer (container );
78- leaseContainer = createNewLeaseContainer ( client , config .getDatabaseName (), container + "-leases" );
73+ leaseContainer = database . getContainer ( this . config .getAssignedLeaseContainer () );
7974
8075 // Create source partition map
8176 partitionMap = new HashMap <>();
@@ -212,34 +207,28 @@ public void stop() {
212207 // NOTE: poll() method and stop() method are both called from the same thread,
213208 // so it is important not to include any changes which may block both places forever
214209 running .set (false );
215-
216- // Release all the resources.
217- if (changeFeedProcessor != null ) {
218- changeFeedProcessor .stop ().block ();
219- changeFeedProcessor = null ;
220- }
221210
222- if (this .client != null ) {
223- this .client .close ();
224- }
211+ Mono .just (this )
212+ .flatMap (connectorTask -> {
213+ if (this .changeFeedProcessor != null ) {
214+ return this .changeFeedProcessor .stop ()
215+ .delayElement (Duration .ofMillis (500 )) // delay some time here as the partitionProcessor will release the lease in background
216+ .doOnNext (t -> {
217+ this .changeFeedProcessor = null ;
218+ this .safeCloseClient ();
219+ });
220+ } else {
221+ this .safeCloseClient ();
222+ return Mono .empty ();
223+ }
224+ })
225+ .block ();
225226 }
226227
227- private CosmosAsyncClient getCosmosClient (CosmosDBSourceConfig config ) {
228- logger .info ("Worker {} Creating Cosmos Client." , this .config .getWorkerName ());
229-
230- CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder ()
231- .endpoint (config .getConnEndpoint ())
232- .key (config .getConnKey ())
233- .consistencyLevel (ConsistencyLevel .SESSION )
234- .contentResponseOnWriteEnabled (true )
235- .connectionSharingAcrossClientsEnabled (config .isConnectionSharingEnabled ())
236- .userAgentSuffix (getUserAgentSuffix ());
237-
238- if (config .isGatewayModeEnabled ()) {
239- cosmosClientBuilder .gatewayMode ();
228+ private void safeCloseClient () {
229+ if (this .client != null ) {
230+ this .client .close ();
240231 }
241-
242- return cosmosClientBuilder .buildAsyncClient ();
243232 }
244233
245234 private String getUserAgentSuffix () {
@@ -292,6 +281,7 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
292281 // The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
293282 // until batch size or maxWaitTime reached. Which can cause CFP to checkpoint faster than kafka batch.
294283 // In order to not move CFP checkpoint faster, we are using shouldFillMoreRecords to control the batch flush.
284+ logger .debug ("Transferring document " + this .config .getWorkerName ());
295285 this .queue .transfer (document );
296286 } catch (InterruptedException e ) {
297287 logger .error ("Interrupted! changeFeedReader." , e );
@@ -307,38 +297,4 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
307297 this .shouldFillMoreRecords .set (false );
308298 }
309299 }
310-
311- private CosmosAsyncContainer createNewLeaseContainer (CosmosAsyncClient client , String databaseName , String leaseCollectionName ) {
312- CosmosAsyncDatabase database = client .getDatabase (databaseName );
313- CosmosAsyncContainer leaseCollection = database .getContainer (leaseCollectionName );
314- CosmosContainerResponse leaseContainerResponse = null ;
315-
316- logger .info ("Checking whether the lease container exists." );
317- try {
318- leaseContainerResponse = leaseCollection .read ().block ();
319- } catch (CosmosException ex ) {
320- // Swallowing exceptions when the type is CosmosException and statusCode is 404
321- if (ex .getStatusCode () != 404 ) {
322- throw ex ;
323- }
324- logger .info ("Lease container does not exist {}" , ex .getMessage ());
325- }
326-
327- if (leaseContainerResponse == null ) {
328- logger .info ("Creating the Lease container : {}" , leaseCollectionName );
329- CosmosContainerProperties containerSettings = new CosmosContainerProperties (leaseCollectionName , "/id" );
330- ThroughputProperties throughputProperties = ThroughputProperties .createManualThroughput (400 );
331- CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions ();
332-
333- try {
334- database .createContainer (containerSettings , throughputProperties , requestOptions ).block ();
335- } catch (Exception e ) {
336- logger .error ("Failed to create container {} in database {}" , leaseCollectionName , databaseName );
337- throw e ;
338- }
339- logger .info ("Successfully created new lease container." );
340- }
341-
342- return database .getContainer (leaseCollectionName );
343- }
344300}
0 commit comments