Skip to content

Commit 07efe07

Browse files
committed
working connect-sql connector
1 parent f674bca commit 07efe07

File tree

12 files changed

+102
-79
lines changed

12 files changed

+102
-79
lines changed
Lines changed: 59 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,59 @@
1-
As the README describes, you can reuse the Docker daemon from Minikube with eval $(minikube docker-env).
2-
3-
So to use an image without uploading it, you can follow these steps:
4-
5-
Set the environment variables with eval $(minikube docker-env)
6-
Build the image with the Docker daemon of Minikube (eg docker build -t my-image .)
7-
Set the image in the pod spec like the build tag (eg my-image)
8-
Set the imagePullPolicy to Never, otherwise Kubernetes will try to download the image.
9-
Important note: You have to run eval $(minikube docker-env) on each terminal you want to use, since it only sets the environment variables for the current shell session
10-
11-
12-
13-
14-
/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "nbBg8G4DkR83Xs"
15-
16-
select name from sys.databases
17-
go
18-
19-
/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "nbBg8G4DkR83Xs"
20-
USE AdventureWorks
21-
GO
22-
EXEC sys.sp_cdc_enable_db
23-
GO
24-
25-
Deploying a connector:
26-
27-
curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors
28-
29-
Connect REST API Docs:
30-
https://docs.confluent.io/platform/current/connect/references/restapi.html
31-
32-
33-
curl -X POST -H "Content-Type: application/json" https://localhost:8083/connectors
34-
35-
36-
https://debezium.io/documentation/reference/1.0/connectors/sqlserver.html
37-
38-
39-
-- ====
40-
-- Enable Database for CDC template
41-
-- ====
42-
USE MyDB
43-
GO
44-
EXEC sys.sp_cdc_enable_db
45-
GO
46-
47-
-- =========
48-
-- Enable a Table Specifying Filegroup Option Template
49-
-- =========
50-
USE MyDB
51-
GO
52-
53-
EXEC sys.sp_cdc_enable_table
54-
@source_schema = N'dbo',
55-
@source_name = N'Person',
56-
@role_name = N'MyRole',
57-
@filegroup_name = N'MyDB_CT',
58-
@supports_net_changes = 0
59-
GO
60-
61-
sqlcmd -S myServer\instanceName -i C:\myScript.sql
62-
63-
64-
65-
1+
## Custom Connect Image / Debezium SQL Server
2+
In this example we go through the following process:
3+
4+
* Build a 'custom' kafka connect image with [Debezium](https://debezium.io/) plug-in, and make available to the internal (minikube) kubernetes cluster
5+
* Initiate a SQLServer stub populated with the traditional '[AdventureWorks](https://docs.microsoft.com/en-us/sql/samples/adventureworks-install-configure?view=sql-server-ver15&tabs=ssms)' database, and enable [CDC](https://en.wikipedia.org/wiki/Change_data_capture) on select tables
6+
* Deploy a connector via a cURL command
7+
* Observe how changes to CDC enabled tables will trigger events in Kafka
8+
9+
NOTE: For ease of readability, we will simply reference the scripts that perform the actions of the following stages. For better understanding of what is actually being done, please review the scripts themselves which will have their own comments/notations. **Assumptions are that you will be running all commands from the present directory**
10+
11+
### Building the custom docker image
12+
The Dockerfile installs a custom plugin with the following line: `RUN confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:1.6.0`. To build, run:
13+
14+
```shell
15+
./build-inside.sh
16+
```
17+
### Deploy CRDs
18+
Deploy the CRDS using the standard way:
19+
```shell
20+
kubectl apply -k ../../kustomize/crds
21+
```
22+
### Deploy Confluent Operator and Confluent Services
23+
Deploy the confluent operator and services:
24+
```shell
25+
kubectl apply -k .
26+
```
27+
### Enable CDC on 'person' table of AdventureWorks Database
28+
CDC needs to be enabled on a table by table basis. This table is also referenced in the prod-mssql-connnector.json file.
29+
```shell
30+
./enable_cdc.sh
31+
Context "minikube" modified.
32+
Changed database context to 'AdventureWorks'.
33+
Job 'cdc.AdventureWorks_capture' started successfully.
34+
Job 'cdc.AdventureWorks_cleanup' started successfully.
35+
```
36+
### Deploy Debezium Connector
37+
A curl request is sent to the 'connect pod' to install the connector.
38+
```shell
39+
./deploy_connector.sh
40+
```
41+
42+
At this stage, if you log onto Control Center, you should see a running connector:
43+
44+
![connector](./connect_image.png)
45+
46+
### Update CDC enabled 'Person' table
47+
Now we will send a SQL Command that will update all users in the person.Person table on the AdventureWorks database:
48+
49+
```shell
50+
./update_person.sh
51+
Context "minikube" modified.
52+
Changed database context to 'AdventureWorks'.
53+
54+
(19972 rows affected)
55+
```
56+
If you observe the automatically created topic `adventureworks-connect.Person.Person` you will see the update event messages streaming through
57+
58+
59+
![topic_update](./topic_update.png)
55.9 KB
Loading
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
#!/bin/bash
2-
nohup kubectl port-forward -n destination connect-0 8083:8083 &
3-
sleep 2
4-
curl -XPOST -H "Content-Type: application/json" --data @prod-mssql-connector.json https://localhost:8083/connectors -kv
2+
export CONNECTOR_CONFIG=$(cat ./prod-mssql-connector.json)
3+
kubectl exec -i connect-0 -c connect -- curl -k -X PUT -H 'Content-Type:application/json' -d "$CONNECTOR_CONFIG" https://localhost:8083/connectors/debezium-sql-server/config

examples/custom-connect-sql/cdc.sh renamed to examples/custom-connect-sql/enable_cdc.sh

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
#!/bin/bash
2-
3-
kubectl create ns sandbox
42
kubectl config set-context --current --namespace=sandbox
5-
6-
kubectl exec -it sql-server -- \
3+
kubectl exec -i sql-server -- \
74
tee -a /tmp/person.sql > /dev/null <<EOT
85
USE AdventureWorks;
96
GO
48.3 KB
Loading

examples/custom-connect-sql/prod-mssql-connector.json

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
{
2-
"name": "prod-mssql-connector",
3-
"config": {
42
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
53

64
"database.hostname": "sql-server.sandbox.svc.cluster.local",
@@ -40,8 +38,8 @@
4038
"time.precision.mode": "connect",
4139
"database.history.skip.unparseable.ddl": false,
4240
"database.history.store.only.monitored.tables.ddl": false,
43-
"table.include.list": ["person.person"],
41+
"table.include.list": "person.Person",
4442
"table.ignore.builtin": false,
4543
"include.schema.changes": true
46-
}
4744
}
45+
108 KB
Loading
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
3+
"connection.url": "jdbc:sqlserver://DB_HOST;databaseName=DB_NAME",
4+
"connection.user": "DATABASE_USER",
5+
"connection.password": "DATABASE_PASS",
6+
"db.timezone": "Asia/Riyadh",
7+
"mode": "incrementing",
8+
"incrementing.column.name": "TRANS_ID",
9+
"query": "SELECT * FROM T_PRT_TRANSACTION_V",
10+
"topic.prefix": "transaction",
11+
12+
"principal.service.name": "CONNECT_USER",
13+
"principal.service.password": "CONNECT_PASS",
14+
15+
"transforms": "createKey,extractInt",
16+
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
17+
"transforms.createKey.fields": "JOURNAL_NUMBER",
18+
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
19+
"transforms.extractInt.field": "JOURNAL_NUMBER"
20+
21+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
use AdventureWorks
2+
GO
3+
update person.Person
4+
set ModifiedDate = GETDATE()
5+
WHERE 1=1
6+
GO
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
kubectl config set-context --current --namespace=sandbox
3+
export UPDATE_CONFIG=$(cat ./update.sql)
4+
kubectl exec sql-server -n sandbox -- /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "nbBg8G4DkR83Xs" -q "$UPDATE_CONFIG" && exit

0 commit comments

Comments
 (0)