Skip to content

Commit 81be5d8

Browse files
committed
debezium/dbz#1691 Added Python example demonstrating Debezium-Connect Mode
Signed-off-by: Mohnish <kmohnishm@gmail.com>
1 parent 0ff8592 commit 81be5d8

File tree

10 files changed

+1699
-0
lines changed

10 files changed

+1699
-0
lines changed

debezium-python/README.md

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# Debezium Python - Connect Mode Example
2+
3+
This example demonstrates consuming Debezium CDC events in Python using **Connect mode**, where Java `SourceRecord` objects are converted directly to Python dicts via JPype (no JSON serialization overhead).
4+
5+
> **Note:** JAR dependencies are **not** committed to this repository. They are downloaded automatically via Maven using the `setup_jars.py` script during setup.
6+
7+
## Key Features
8+
9+
- **Zero JSON overhead**: Direct Java → Python conversion without serialization
10+
- **Type-safe**: Pydantic validation for CDC events
11+
- **Extended type conversion**: Handles Debezium/Kafka Connect logical types (timestamps, dates, decimals)
12+
- **Configurable numeric output**: Python native scalars or NumPy scalars
13+
- **Optional static class mapping**: Map topic/table names to Python classes for typed `before`/`after`
14+
- **Simple setup**: Automated script to configure Debezium 3.0 JARs
15+
- **Full CDC operations**: Supports read (r), create (c), update (u), and delete (d) operations
16+
17+
## Prerequisites
18+
19+
- Python 3.9+
20+
- Java 17+ (`JAVA_HOME` must be set)
21+
- Docker (for testcontainers Postgres)
22+
- Maven (for downloading Debezium JAR dependencies)
23+
24+
## Quick Start
25+
26+
1. **Create and activate a virtual environment:**
27+
28+
```bash
29+
python3 -m venv .venv
30+
source .venv/bin/activate # On Windows: .venv\Scripts\activate
31+
```
32+
2. **Install dependencies:**
33+
34+
```bash
35+
pip install -r requirements.txt
36+
```
37+
3. **Install Maven** (if not already installed):
38+
39+
```bash
40+
sudo apt update && sudo apt install maven
41+
```
42+
4. **Set up Debezium JARs:**
43+
44+
```bash
45+
python3 setup_jars.py
46+
```
47+
48+
This script uses Maven to download ~400 Debezium 3.0.0.Final JAR dependencies and installs them to the pydbzengine package. The JARs are intentionally not committed to the repository to keep it lightweight.
49+
5. **Run the example:**
50+
51+
```bash
52+
python3 connect_mode_test.py
53+
```
54+
55+
## What This Example Does
56+
57+
The script:
58+
59+
1. Starts a Postgres container with sample data (`inventory.customers`)
60+
2. Runs Debezium embedded engine in Connect mode
61+
3. Captures initial snapshot (4 existing records with operation='r')
62+
4. Shows the expanded `before`/`after` Python structures
63+
5. Validates events with Pydantic models
64+
6. Stops after processing 5 events
65+
66+
## Expected Output
67+
68+
```
69+
================================================================================
70+
Received batch with 4 records (Connect mode - zero JSON overhead)
71+
================================================================================
72+
73+
--- Record 1/4 ---
74+
Destination: connect_test.inventory.customers
75+
Operation: r
76+
77+
AFTER (fully expanded Python dict):
78+
Type: <class 'dict'>
79+
Content: {'id': 1001, 'first_name': 'Sally', 'last_name': 'Thomas', 'email': 'sally.thomas@acme.com'}
80+
81+
Pydantic Validation: PASSED
82+
Validated op: r
83+
Is create: False
84+
Is update: False
85+
Is delete: False
86+
```
87+
88+
## Connect Mode vs JSON Mode
89+
90+
| Mode | Pipeline |
91+
| ---------------------- | --------------------------------------------------------------------- |
92+
| **JSON mode** | `record.value()` → JSON string → `json.loads()` → Python dict |
93+
| **Connect mode** | `record.value()` → Java Struct → direct conversion → Python dict |
94+
95+
Connect mode eliminates JSON serialization, providing better performance and lower memory usage.
96+
97+
## Advanced conversion options
98+
99+
The Connect-mode extractor supports configurable conversion and optional typed mapping:
100+
101+
```python
102+
from dataclasses import dataclass
103+
from pydebeziumai import ConversionConfig, SourceRecordExtractor
104+
105+
@dataclass
106+
class Customer:
107+
id: int
108+
first_name: str
109+
last_name: str
110+
email: str
111+
112+
cfg = ConversionConfig(numeric_mode="numpy") # or "native"
113+
114+
topic_map = {
115+
"connect_test.inventory.customers": Customer,
116+
# also supports "inventory.customers" or "customers"
117+
}
118+
119+
extractor = SourceRecordExtractor(
120+
source_record,
121+
conversion_config=cfg,
122+
topic_class_map=topic_map,
123+
)
124+
125+
# Default dict access
126+
print(extractor.after)
127+
128+
# Typed class access (if mapping exists)
129+
print(extractor.after_typed)
130+
```
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
"""
2+
connect_mode_test.py
3+
4+
Demonstrates the Connect mode pipeline (EngineFormat.CONNECT):
5+
Debezium  raw Java SourceRecord  struct_to_dict  Python dict  Pydantic validation
6+
7+
NO JSON serialization/deserialization overhead.
8+
9+
This example:
10+
1. Starts a Postgres container with test data
11+
2. Runs Debezium in Connect mode (not JSON mode)
12+
3. Uses BaseConnectChangeHandler to receive raw SourceRecord objects
13+
4. Converts them to validated Pydantic models
14+
5. Prints the expanded before/after structures
15+
"""
16+
17+
from pathlib import Path
18+
from testcontainers.postgres import PostgresContainer
19+
from testcontainers.core.waiting_utils import wait_for_logs
20+
21+
# Import local Connect mode extensions
22+
from debezium_connect import DebeziumConnectEngine, BaseConnectChangeHandler
23+
from pydebeziumai import DebeziumEventModel, SourceRecordExtractor, print_record_info
24+
25+
OFFSET_FILE = Path(__file__).parent.joinpath('connect-mode-offsets.dat')
26+
27+
28+
def wait_for_postgresql_to_start(self) -> None:
29+
"""Patch for testcontainers PostgreSQL startup detection."""
30+
wait_for_logs(self, ".*PostgreSQL init process complete.*")
31+
32+
33+
class DbPostgresql:
34+
POSTGRES_USER = "postgres"
35+
POSTGRES_PASSWORD = "postgres"
36+
POSTGRES_DBNAME = "postgres"
37+
POSTGRES_IMAGE = "debezium/example-postgres:3.0.0.Final"
38+
POSTGRES_HOST = "localhost"
39+
POSTGRES_PORT_DEFAULT = 5432
40+
CONTAINER: PostgresContainer = (PostgresContainer(image=POSTGRES_IMAGE,
41+
port=POSTGRES_PORT_DEFAULT,
42+
username=POSTGRES_USER,
43+
password=POSTGRES_PASSWORD,
44+
dbname=POSTGRES_DBNAME,
45+
driver=None)
46+
.with_exposed_ports(POSTGRES_PORT_DEFAULT)
47+
)
48+
PostgresContainer._connect = wait_for_postgresql_to_start
49+
50+
def start(self):
51+
print("Starting Postgresql Db...")
52+
self.CONTAINER.start()
53+
54+
def stop(self):
55+
print("Stopping Postgresql Db...")
56+
self.CONTAINER.stop()
57+
58+
59+
def debezium_engine_props(sourcedb: DbPostgresql):
60+
"""Create Debezium configuration for Connect mode."""
61+
from pydbzengine._jvm import Properties
62+
props = Properties()
63+
props.setProperty("name", "connect-test-engine")
64+
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
65+
props.setProperty("offset.storage.file.filename", str(OFFSET_FILE))
66+
props.setProperty("offset.flush.interval.ms", "1000")
67+
props.setProperty("database.hostname", sourcedb.POSTGRES_HOST)
68+
props.setProperty("database.port", str(sourcedb.CONTAINER.get_exposed_port(sourcedb.POSTGRES_PORT_DEFAULT)))
69+
props.setProperty("database.user", sourcedb.POSTGRES_USER)
70+
props.setProperty("database.password", sourcedb.POSTGRES_PASSWORD)
71+
props.setProperty("database.dbname", sourcedb.POSTGRES_DBNAME)
72+
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
73+
props.setProperty("topic.prefix", "connect_test")
74+
props.setProperty("schema.include.list", "inventory")
75+
props.setProperty("table.include.list", "inventory.customers")
76+
props.setProperty("plugin.name", "pgoutput")
77+
props.setProperty("snapshot.mode", "initial")
78+
return props
79+
80+
81+
class ConnectModeHandler(BaseConnectChangeHandler):
82+
"""
83+
Handler for Connect mode - receives raw Java SourceRecord objects.
84+
No JSON parsing happens.
85+
"""
86+
87+
def __init__(self):
88+
self.event_count = 0
89+
self.max_events = 5 # Stop after processing a few events
90+
91+
def handleConnectBatch(self, records):
92+
"""
93+
Process a batch of raw Java SourceRecord objects.
94+
95+
Args:
96+
records: List of Java SourceRecord objects (not JSON strings!)
97+
"""
98+
print(f"\n{'='*80}")
99+
print(f"Received batch with {len(records)} records (Connect mode - zero JSON overhead)")
100+
print(f"{'='*80}\n")
101+
102+
for idx, record in enumerate(records):
103+
self.event_count += 1
104+
105+
print(f"\n--- Record {idx + 1}/{len(records)} ---")
106+
107+
# Option 1: Use SourceRecordExtractor for direct access
108+
extractor = SourceRecordExtractor(record)
109+
print(f"Destination: {extractor.destination}")
110+
print(f"Partition: {extractor.partition}")
111+
print(f"Operation: {extractor.op}")
112+
113+
# Show the expanded Python structures (no JSON!)
114+
if extractor.before:
115+
print(f"\nBEFORE (fully expanded Python dict):")
116+
print(f" Type: {type(extractor.before)}")
117+
print(f" Content: {extractor.before}")
118+
119+
if extractor.after:
120+
print(f"\nAFTER (fully expanded Python dict):")
121+
print(f" Type: {type(extractor.after)}")
122+
print(f" Content: {extractor.after}")
123+
124+
# Option 2: Use Pydantic model for validation
125+
try:
126+
validated_event = DebeziumEventModel.from_source_record(record)
127+
print(f"\nPydantic Validation:  PASSED")
128+
print(f" Validated op: {validated_event.payload.op}")
129+
print(f" Is create: {validated_event.is_create()}")
130+
print(f" Is update: {validated_event.is_update()}")
131+
print(f" Is delete: {validated_event.is_delete()}")
132+
print(f" Current state: {validated_event.get_current_state()}")
133+
except Exception as e:
134+
print(f"\nPydantic Validation:  FAILED - {e}")
135+
136+
# Optional: introspect the raw Java object
137+
if idx == 0:
138+
print(f"\n--- Java SourceRecord Introspection (first record only) ---")
139+
print_record_info(record)
140+
141+
print(f"\n{''*80}")
142+
143+
# Stop after max_events
144+
if self.event_count >= self.max_events:
145+
print(f"\nReached {self.max_events} events, stopping engine...")
146+
from pydbzengine._jvm import JavaLangThread
147+
JavaLangThread.currentThread().interrupt()
148+
break
149+
150+
151+
def main():
152+
"""
153+
Main test function:
154+
1. Start Postgres with Debezium example data
155+
2. Run Debezium in Connect mode
156+
3. Print expanded before/after structures
157+
4. Validate with Pydantic
158+
"""
159+
print("\n" + "="*80)
160+
print("CONNECT MODE TEST - Zero JSON Overhead")
161+
print("="*80 + "\n")
162+
163+
# Verify JARs are installed
164+
import pydbzengine
165+
jar_dir = Path(pydbzengine.__file__).parent / "debezium" / "libs"
166+
if not jar_dir.exists() or len(list(jar_dir.glob("*.jar"))) == 0:
167+
print("❌ ERROR: Debezium JARs not found!")
168+
print(f" Expected location: {jar_dir}")
169+
print("\nPlease run the setup script first:")
170+
print(" python3 setup_jars.py")
171+
return
172+
173+
print(f"✓ Found {len(list(jar_dir.glob('*.jar')))} JAR files\n")
174+
175+
# Clean up old offset file
176+
if OFFSET_FILE.exists():
177+
OFFSET_FILE.unlink()
178+
print(f"Removed old offset file: {OFFSET_FILE}")
179+
180+
# Start Postgres container
181+
sourcedb = DbPostgresql()
182+
sourcedb.start()
183+
184+
try:
185+
# Create Debezium config for Connect mode
186+
props = debezium_engine_props(sourcedb)
187+
188+
# Create handler
189+
handler = ConnectModeHandler()
190+
191+
# Create engine in Connect mode (not JSON mode!)
192+
print("\nInitializing DebeziumConnectEngine (EngineFormat.CONNECT)...")
193+
engine = DebeziumConnectEngine(properties=props, handler=handler)
194+
195+
print("Starting engine... (will process snapshot and stop after a few events)\n")
196+
197+
# Run the engine
198+
engine.run()
199+
200+
print("\n" + "="*80)
201+
print("Test completed successfully!")
202+
print(f"Total events processed: {handler.event_count}")
203+
print("="*80 + "\n")
204+
205+
except KeyboardInterrupt:
206+
print("\nInterrupted by user")
207+
except Exception as e:
208+
print(f"\nError during test: {e}")
209+
import traceback
210+
traceback.print_exc()
211+
finally:
212+
sourcedb.stop()
213+
print("\nPostgres container stopped.")
214+
215+
216+
if __name__ == "__main__":
217+
main()

0 commit comments

Comments
 (0)