Skip to content

Commit 48c3965

Browse files
committed
microsoft_sql_server_cdc: capture schema metadata
Add common schema metadata to messages produced by the microsoft_sql_server_cdc input, matching the behavior of the postgres_cdc and mysql_cdc inputs. The metadata key is "common_schema" (rather than "schema") to avoid colliding with the existing "schema" key that carries the database schema name. Also adds explicit type mappings for MONEY/SMALLMONEY (string) and TIMESTAMP/ROWVERSION (byte array), and json:"-" tags on internal-only MessageEvent fields.
1 parent 66c2140 commit 48c3965

File tree

10 files changed

+420
-21
lines changed

10 files changed

+420
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
1111
- (Benthos) The `command` processor now emits the `exit_code` metadata field. (@mihaitodor)
1212
- The `postgres_cdc` input now adds schema metadata to consumed messages, this can be used for automatic schema conversion in processors such as `schema_registry_encode`. (@Jeffail)
1313
- New `iceberg` output, allows writing Iceberg data to REST catalogs in s3, gcs and adls. (@rockwotj)
14+
- The `microsoft_sql_server_cdc` input now adds schema metadata to consumed messages, this can be used for automatic schema conversion in processors such as `schema_registry_encode`. (@Jeffail)
1415

1516
### Changed
1617

docs/modules/components/pages/inputs/microsoft_sql_server_cdc.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ Additionally, if `stream_snapshot` is set to true, then the existing data in the
9898
9999
This input adds the following metadata fields to each message:
100100
- schema (Schema of the table that the message originated from)
101+
- common_schema (The table schema in benthos common schema format, compatible with processors like parquet_encode)
101102
- table (Name of the table that the message originated from)
102103
- operation (Type of operation that generated the message: "read", "delete", "insert", or "update_before" and "update_after". "read" is from messages that are read in the initial snapshot phase.)
103104
- lsn (the Log Sequence Number in Microsoft SQL Server)

internal/impl/mssqlserver/batcher.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package mssqlserver
1010

1111
import (
1212
"context"
13+
"database/sql"
1314
"encoding/json"
1415
"fmt"
1516
"sync"
@@ -28,6 +29,14 @@ type batchPublisher struct {
2829
batcher *service.Batcher
2930
batcherMu sync.Mutex
3031

32+
// tableSchemas caches the computed common schema for each table. No
33+
// invalidation is needed because MSSQL CDC capture instances are immutable:
34+
// an ALTER TABLE requires creating a new capture instance, which the input
35+
// won't discover until it restarts (at which point a fresh batchPublisher
36+
// with an empty cache is created).
37+
tableSchemas map[string]any
38+
tableSchemasMu sync.RWMutex
39+
3140
checkpoint *checkpoint.Capped[replication.LSN]
3241
msgChan chan asyncMessage
3342
log *service.Logger
@@ -38,11 +47,12 @@ type batchPublisher struct {
3847
// newBatchPublisher creates an instance of batchPublisher.
3948
func newBatchPublisher(batcher *service.Batcher, checkpoint *checkpoint.Capped[replication.LSN], logger *service.Logger) *batchPublisher {
4049
b := &batchPublisher{
41-
batcher: batcher,
42-
checkpoint: checkpoint,
43-
log: logger,
44-
msgChan: make(chan asyncMessage),
45-
shutSig: shutdown.NewSignaller(),
50+
batcher: batcher,
51+
checkpoint: checkpoint,
52+
log: logger,
53+
msgChan: make(chan asyncMessage),
54+
shutSig: shutdown.NewSignaller(),
55+
tableSchemas: make(map[string]any),
4656
}
4757
go b.loop()
4858
return b
@@ -125,6 +135,28 @@ func (p *batchPublisher) loop() {
125135
}
126136
}
127137

138+
// getOrComputeTableSchema returns the cached schema for tableName. If not yet
139+
// cached and colTypes is non-empty, it computes and caches the schema from the
140+
// provided column metadata.
141+
func (b *batchPublisher) getOrComputeTableSchema(tableName string, colNames []string, colTypes []*sql.ColumnType) any {
142+
b.tableSchemasMu.RLock()
143+
if s, ok := b.tableSchemas[tableName]; ok {
144+
b.tableSchemasMu.RUnlock()
145+
return s
146+
}
147+
b.tableSchemasMu.RUnlock()
148+
149+
if len(colTypes) == 0 {
150+
return nil
151+
}
152+
153+
s := columnTypesToSchema(tableName, colNames, colTypes)
154+
b.tableSchemasMu.Lock()
155+
b.tableSchemas[tableName] = s
156+
b.tableSchemasMu.Unlock()
157+
return s
158+
}
159+
128160
// Publish turns the provided message into a service.Message before batching and
129161
// flushing them based on batch size or time elapsed.
130162
func (b *batchPublisher) Publish(ctx context.Context, m replication.MessageEvent) error {
@@ -140,6 +172,9 @@ func (b *batchPublisher) Publish(ctx context.Context, m replication.MessageEvent
140172
if len(m.LSN) != 0 {
141173
msg.MetaSet("lsn", string(m.LSN))
142174
}
175+
if s := b.getOrComputeTableSchema(m.Table, m.ColumnNames, m.ColumnTypes); s != nil {
176+
msg.MetaSetImmut("common_schema", service.ImmutableAny{V: s})
177+
}
143178

144179
var flushedBatch []*service.Message
145180
b.batcherMu.Lock()

internal/impl/mssqlserver/input_mssqlserver_cdc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ Additionally, if ` + "`" + fieldStreamSnapshot + "`" + ` is set to true, then th
5959
6060
This input adds the following metadata fields to each message:
6161
- schema (Schema of the table that the message originated from)
62+
- common_schema (The table schema in benthos common schema format, compatible with processors like parquet_encode)
6263
- table (Name of the table that the message originated from)
6364
- operation (Type of operation that generated the message: "read", "delete", "insert", or "update_before" and "update_after". "read" is from messages that are read in the initial snapshot phase.)
6465
- lsn (the Log Sequence Number in Microsoft SQL Server)

internal/impl/mssqlserver/integration_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,129 @@ microsoft_sql_server_cdc:
709709
}
710710
}
711711

712+
func TestIntegration_MicrosoftSQLServerCDC_SchemaMetadata(t *testing.T) {
713+
integration.CheckSkip(t)
714+
t.Parallel()
715+
716+
connStr, db := mssqlservertest.SetupTestWithMicrosoftSQLServerVersion(t, "2022-latest")
717+
require.NoError(t, db.CreateTableWithCDCEnabledIfNotExists(t.Context(), "dbo.schema_meta_test", `
718+
CREATE TABLE dbo.schema_meta_test (
719+
id INT PRIMARY KEY,
720+
label NVARCHAR(50) NOT NULL,
721+
active BIT NOT NULL,
722+
score FLOAT NOT NULL,
723+
created DATETIME2 NOT NULL
724+
);`))
725+
726+
// Disable CDC so the first row becomes a snapshot row, then re-enable CDC.
727+
db.MustDisableCDC(t.Context(), "dbo.schema_meta_test")
728+
db.MustExecContext(t.Context(), `INSERT INTO dbo.schema_meta_test VALUES (1, N'snapshot', 1, 3.14, SYSDATETIME())`)
729+
db.MustEnableCDC(t.Context(), "dbo.schema_meta_test")
730+
731+
type msgMeta struct {
732+
schema any
733+
op string
734+
}
735+
var received []msgMeta
736+
var receivedMu sync.Mutex
737+
738+
cfg := fmt.Sprintf(`
739+
microsoft_sql_server_cdc:
740+
connection_string: %s
741+
stream_snapshot: true
742+
include: ["schema_meta_test"]`, connStr)
743+
744+
streamBuilder := service.NewStreamBuilder()
745+
require.NoError(t, streamBuilder.AddInputYAML(cfg))
746+
require.NoError(t, streamBuilder.AddBatchConsumerFunc(func(_ context.Context, mb service.MessageBatch) error {
747+
for _, msg := range mb {
748+
s, _ := msg.MetaGetMut("common_schema")
749+
op, _ := msg.MetaGet("operation")
750+
receivedMu.Lock()
751+
received = append(received, msgMeta{schema: s, op: op})
752+
receivedMu.Unlock()
753+
}
754+
return nil
755+
}))
756+
757+
stream, err := streamBuilder.Build()
758+
require.NoError(t, err)
759+
license.InjectTestService(stream.Resources())
760+
761+
go func() {
762+
require.NoError(t, stream.Run(t.Context()))
763+
}()
764+
765+
// Wait for the snapshot row to arrive.
766+
assert.Eventually(t, func() bool {
767+
receivedMu.Lock()
768+
defer receivedMu.Unlock()
769+
for _, m := range received {
770+
if m.op == "read" {
771+
return true
772+
}
773+
}
774+
return false
775+
}, time.Second*30, time.Millisecond*100)
776+
777+
// Insert a CDC row and wait for it to arrive.
778+
db.MustExecContext(t.Context(), `INSERT INTO dbo.schema_meta_test VALUES (2, N'cdc', 0, 2.71, SYSDATETIME())`)
779+
assert.Eventually(t, func() bool {
780+
receivedMu.Lock()
781+
defer receivedMu.Unlock()
782+
for _, m := range received {
783+
if m.op == "insert" {
784+
return true
785+
}
786+
}
787+
return false
788+
}, time.Second*30, time.Millisecond*100)
789+
790+
require.NoError(t, stream.StopWithin(time.Second*10))
791+
792+
receivedMu.Lock()
793+
defer receivedMu.Unlock()
794+
795+
require.Len(t, received, 2, "expected 1 snapshot message and 1 CDC message")
796+
797+
// Expected column name → benthos common type string for dbo.schema_meta_test.
798+
expectedCols := map[string]string{
799+
"id": "INT64",
800+
"label": "STRING",
801+
"active": "BOOLEAN",
802+
"score": "FLOAT64",
803+
"created": "TIMESTAMP",
804+
}
805+
806+
for i, m := range received {
807+
require.NotNilf(t, m.schema, "message %d (op=%q) is missing schema metadata", i, m.op)
808+
809+
schemaMap, ok := m.schema.(map[string]any)
810+
require.Truef(t, ok, "message %d schema is not map[string]any, got %T", i, m.schema)
811+
812+
assert.Equalf(t, "OBJECT", schemaMap["type"], "message %d schema type", i)
813+
assert.Equalf(t, "schema_meta_test", schemaMap["name"], "message %d schema name", i)
814+
815+
children, ok := schemaMap["children"].([]any)
816+
require.Truef(t, ok, "message %d schema children is not []any", i)
817+
assert.Lenf(t, children, len(expectedCols), "message %d schema children count", i)
818+
819+
for _, child := range children {
820+
childMap, ok := child.(map[string]any)
821+
require.Truef(t, ok, "message %d child schema is not map[string]any", i)
822+
823+
name, _ := childMap["name"].(string)
824+
typ, _ := childMap["type"].(string)
825+
optional, _ := childMap["optional"].(bool)
826+
827+
expectedType, exists := expectedCols[name]
828+
assert.Truef(t, exists, "message %d: unexpected column %q in schema", i, name)
829+
assert.Equalf(t, expectedType, typ, "message %d column %q type mismatch", i, name)
830+
assert.Truef(t, optional, "message %d column %q should be optional", i, name)
831+
}
832+
}
833+
}
834+
712835
// Test_ManualTesting_AddTestDataWithUniqueLSN adds data to an existing table and ensures each change has its own LSN
713836
func Test_ManualTesting_AddTestDataWithUniqueLSN(t *testing.T) {
714837
t.Skip("This test requires a remote database to run. Aimed to seed initial data in a remote test databases")

internal/impl/mssqlserver/replication/snapshot.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,13 @@ func (s *Snapshot) snapshotTable(ctx context.Context, table UserDefinedTable, ma
163163
}
164164

165165
m := MessageEvent{
166-
Table: table.Name,
167-
Schema: table.Schema,
168-
Data: row,
169-
Operation: MessageOperationRead.String(),
170-
LSN: nil,
166+
Table: table.Name,
167+
Schema: table.Schema,
168+
Data: row,
169+
Operation: MessageOperationRead.String(),
170+
LSN: nil,
171+
ColumnNames: columns,
172+
ColumnTypes: types,
171173
}
172174
if err = s.publisher.Publish(ctx, m); err != nil {
173175
return fmt.Errorf("handling snapshot table row: %w", err)

internal/impl/mssqlserver/replication/stream.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"encoding/json"
1717
"errors"
1818
"fmt"
19+
"strings"
1920
"time"
2021

2122
"github.com/redpanda-data/benthos/v4/public/service"
@@ -92,6 +93,11 @@ type changeTableRowIter struct {
9293
log *service.Logger
9394

9495
vals []any
96+
97+
// userColNames and userColTypes are the user-defined columns only,
98+
// excluding MSSQL system columns (those with __$ prefix).
99+
userColNames []string
100+
userColTypes []*sql.ColumnType
95101
}
96102

97103
// newChangeTableRowIter returns an custom row iterator for the given changeTable.
@@ -125,6 +131,17 @@ func newChangeTableRowIter(
125131
return nil, err
126132
}
127133

134+
// Compute user-defined column lists by filtering out MSSQL system columns
135+
// (those with the __$ prefix, e.g. __$start_lsn, __$operation, etc.).
136+
userColNames := make([]string, 0, len(cols))
137+
userColTypes := make([]*sql.ColumnType, 0, len(cols))
138+
for i, c := range cols {
139+
if !strings.HasPrefix(c, "__$") {
140+
userColNames = append(userColNames, c)
141+
userColTypes = append(userColTypes, colTypes[i])
142+
}
143+
}
144+
128145
// pre-allocate slice of pointers for sql.Scan operations
129146
vals := make([]any, len(cols))
130147
for i := range vals {
@@ -133,12 +150,14 @@ func newChangeTableRowIter(
133150
}
134151

135152
iter := &changeTableRowIter{
136-
table: changeTable,
137-
rows: rows,
138-
cols: cols,
139-
colTypes: colTypes,
140-
vals: vals,
141-
log: logger,
153+
table: changeTable,
154+
rows: rows,
155+
cols: cols,
156+
colTypes: colTypes,
157+
vals: vals,
158+
log: logger,
159+
userColNames: userColNames,
160+
userColTypes: userColTypes,
142161
}
143162
// Prime the iterator by loading the first row
144163
if err := iter.next(); err != nil {
@@ -344,11 +363,13 @@ func (r *ChangeTableStream) ReadChangeTables(ctx context.Context, db *sql.DB, st
344363
cur := item.iter.current
345364

346365
msg := MessageEvent{
347-
Table: item.iter.table.Name,
348-
Schema: item.iter.table.Schema,
349-
Data: cur.columns,
350-
LSN: cur.startLSN,
351-
Operation: cur.operation.String(),
366+
Table: item.iter.table.Name,
367+
Schema: item.iter.table.Schema,
368+
Data: cur.columns,
369+
LSN: cur.startLSN,
370+
Operation: cur.operation.String(),
371+
ColumnNames: item.iter.userColNames,
372+
ColumnTypes: item.iter.userColTypes,
352373
}
353374

354375
if err := r.publisher.Publish(ctx, msg); err != nil {

internal/impl/mssqlserver/replication/stream_message.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package replication
1010

1111
import (
12+
"database/sql"
1213
"encoding/hex"
1314
"fmt"
1415
)
@@ -87,4 +88,10 @@ type MessageEvent struct {
8788
Schema string `json:"schema"`
8889
Table string `json:"table"`
8990
Data any `json:"data"`
91+
92+
// ColumnNames and ColumnTypes carry user-defined column metadata (excluding
93+
// MSSQL system columns with __$ prefix). They are used to build schema
94+
// metadata on the outgoing message and are not serialised to JSON.
95+
ColumnNames []string `json:"-"`
96+
ColumnTypes []*sql.ColumnType `json:"-"`
9097
}

0 commit comments

Comments
 (0)