1919import com .google .common .base .Preconditions ;
2020import com .google .common .collect .ImmutableMap ;
2121import io .debezium .config .Configuration ;
22- import io .debezium .embedded .EmbeddedEngine ;
22+ import io .debezium .embedded .Connect ;
23+ import io .debezium .engine .ChangeEvent ;
24+ import io .debezium .engine .DebeziumEngine ;
2325import io .debezium .relational .HistorizedRelationalDatabaseConnectorConfig ;
2426import io .debezium .relational .RelationalDatabaseConnectorConfig ;
25- import io .debezium .relational .history .FileDatabaseHistory ;
26- import io .debezium .relational . history .MemoryDatabaseHistory ;
27- import io .debezium . util . Clock ;
27+ import io .debezium .relational .history .MemorySchemaHistory ;
28+ import io .debezium .storage . file . history .FileSchemaHistory ;
29+ import java . io .IOException ;
2830import java .util .Iterator ;
2931import java .util .Set ;
3032import java .util .concurrent .CancellationException ;
3638import java .util .concurrent .TimeoutException ;
3739import java .util .function .Function ;
3840import java .util .stream .Collectors ;
41+ import org .apache .kafka .connect .source .SourceRecord ;
3942import org .slf4j .Logger ;
4043import org .slf4j .LoggerFactory ;
4144
@@ -74,7 +77,7 @@ public class DebeziumToPubSubDataSender implements Runnable {
7477
7578 private final Set <String > whitelistedTables ;
7679
77- private EmbeddedEngine engine ;
80+ private DebeziumEngine < ChangeEvent < SourceRecord , SourceRecord >> engine ;
7881 private ExecutorService executorService ;
7982
8083 public DebeziumToPubSubDataSender (
@@ -122,8 +125,8 @@ public DebeziumToPubSubDataSender(
122125 Configuration .empty ()
123126 .withSystemProperties (Function .identity ())
124127 .edit ()
125- .with (EmbeddedEngine . CONNECTOR_CLASS , RDBMS_TO_CONNECTOR_MAP .get (rdbms ))
126- .with (EmbeddedEngine . ENGINE_NAME , APP_NAME )
128+ .with ("connector.class" , RDBMS_TO_CONNECTOR_MAP .get (rdbms ))
129+ .with ("name" , APP_NAME )
127130 // Database connection information.
128131 .with ("database.hostname" , this .databaseAddress )
129132 .with ("database.port" , this .databasePort )
@@ -132,35 +135,33 @@ public DebeziumToPubSubDataSender(
132135 .with ("database.server.name" , databaseName )
133136 .with ("decimal.handling.mode" , "string" )
134137 .with (
135- HistorizedRelationalDatabaseConnectorConfig .DATABASE_HISTORY ,
136- MemoryDatabaseHistory .class .getName ());
138+ HistorizedRelationalDatabaseConnectorConfig .SCHEMA_HISTORY . name () ,
139+ MemorySchemaHistory .class .getName ());
137140
138141 if (!whitelistedTables .isEmpty ()) {
139142 LOG .info ("Whitelisting tables: {}" , dbzWhitelistedTables );
140143 configBuilder =
141144 configBuilder .with (
142- RelationalDatabaseConnectorConfig .TABLE_WHITELIST , dbzWhitelistedTables );
145+ RelationalDatabaseConnectorConfig .TABLE_INCLUDE_LIST . name () , dbzWhitelistedTables );
143146 }
144147
145148 if (this .inMemoryOffsetStorage ) {
146149 LOG .info ("Setting up in memory offset storage." );
147150 configBuilder =
148151 configBuilder .with (
149- EmbeddedEngine . OFFSET_STORAGE ,
152+ "offset.storage" ,
150153 "org.apache.kafka.connect.storage.MemoryOffsetBackingStore" );
151154 } else {
152155 LOG .info ("Setting up in File-based offset storage in {}." , this .offsetStorageFile );
153156 configBuilder =
154157 configBuilder
158+ .with ("offset.storage" , "org.apache.kafka.connect.storage.FileOffsetBackingStore" )
159+ .with ("offset.storage.file.filename" , this .offsetStorageFile )
160+ .with (DebeziumEngine .OFFSET_FLUSH_INTERVAL_MS_PROP , DEFAULT_FLUSH_INTERVAL_MS )
155161 .with (
156- EmbeddedEngine .OFFSET_STORAGE ,
157- "org.apache.kafka.connect.storage.FileOffsetBackingStore" )
158- .with (EmbeddedEngine .OFFSET_STORAGE_FILE_FILENAME , this .offsetStorageFile )
159- .with (EmbeddedEngine .OFFSET_FLUSH_INTERVAL_MS , DEFAULT_FLUSH_INTERVAL_MS )
160- .with (
161- HistorizedRelationalDatabaseConnectorConfig .DATABASE_HISTORY ,
162- FileDatabaseHistory .class .getName ())
163- .with ("database.history.file.filename" , this .databaseHistoryFile );
162+ HistorizedRelationalDatabaseConnectorConfig .SCHEMA_HISTORY .name (),
163+ FileSchemaHistory .class .getName ())
164+ .with (FileSchemaHistory .FILE_PATH .name (), this .databaseHistoryFile );
164165 }
165166
166167 Iterator <String > keys = debeziumConfig .getKeys ();
@@ -182,10 +183,10 @@ public void run() {
182183 PubSubChangeConsumer .DEFAULT_PUBLISHER_FACTORY );
183184
184185 engine =
185- EmbeddedEngine .create ()
186- .using (config )
186+ DebeziumEngine .create (Connect . class )
187+ .using (config . asProperties () )
187188 .using (this .getClass ().getClassLoader ())
188- .using (Clock .SYSTEM )
189+ .using (java . time . Clock .systemUTC () )
189190 .notifying (changeConsumer )
190191 .build ();
191192
@@ -198,14 +199,22 @@ public void run() {
198199 new Thread (
199200 () -> {
200201 LOG .info ("Requesting embedded engine to shut down" );
201- engine .stop ();
202+ try {
203+ engine .close ();
204+ } catch (IOException e ) {
205+ LOG .error ("Error closing the engine" , e );
206+ }
202207 }));
203208
204209 awaitTermination (future );
205210 }
206211
207212 public void stop () {
208- engine .stop ();
213+ try {
214+ engine .close ();
215+ } catch (IOException e ) {
216+ LOG .error ("Error closing the engine" , e );
217+ }
209218 executorService .shutdown ();
210219 }
211220
@@ -217,7 +226,11 @@ private void awaitTermination(Future<?> future) {
217226 try {
218227 future .get (30 , TimeUnit .SECONDS );
219228 if (future .isDone () || future .isCancelled ()) {
220- engine .stop ();
229+ try {
230+ engine .close ();
231+ } catch (IOException e ) {
232+ LOG .error ("Error closing the engine" , e );
233+ }
221234 executorService .shutdown ();
222235 break ;
223236 }
0 commit comments