Skip to content

Commit 9ff96df

Browse files
authored
Update Golang - DebeziumIO 3.1.3 (apache#37667)
1 parent 1052216 commit 9ff96df

File tree

4 files changed

+20
-3
lines changed

4 files changed

+20
-3
lines changed

sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type readFromDebeziumSchema struct {
7575
Host string
7676
Port string
7777
MaxNumberOfRecords *int64
78+
MaxTimeToRun *int64
7879
ConnectionProperties []string
7980
}
8081

@@ -133,6 +134,13 @@ func MaxRecord(r int64) readOption {
133134
}
134135
}
135136

137+
// MaxTimeToRun specifies maximum number of milliseconds to run before stop.
138+
func MaxTimeToRun(r int64) readOption {
139+
return func(cfg *debeziumConfig) {
140+
cfg.readSchema.MaxTimeToRun = &r
141+
}
142+
}
143+
136144
// ConnectionProperties specifies properties of the debezium connection passed as
137145
// a string with format [propertyName=property;]*
138146
func ConnectionProperties(cp []string) readOption {

sdks/go/test/integration/io/xlang/debezium/debezium.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ func ReadPipeline(addr, username, password, dbname, host, port string, connector
2929
p, s := beam.NewPipelineWithRoot()
3030
result := debeziumio.Read(s.Scope("Read from debezium"), username, password, host, port,
3131
connectorClass, reflectx.String, debeziumio.MaxRecord(maxrecords),
32+
debeziumio.MaxTimeToRun(120000),
3233
debeziumio.ConnectionProperties(connectionProperties), debeziumio.ExpansionAddr(addr))
33-
expectedJson := `{"metadata":{"connector":"postgresql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}`
34+
expectedJson := `{"metadata":{"connector":"postgresql","version":"3.1.3.Final","name":"beam-debezium-connector","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}`
3435
expected := beam.Create(s, expectedJson)
3536
passert.Equals(s, result, expected)
3637
return p

sdks/go/test/integration/io/xlang/debezium/debezium_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
)
3535

3636
const (
37-
debeziumImage = "quay.io/debezium/example-postgres:latest"
37+
debeziumImage = "quay.io/debezium/example-postgres:3.1.3.Final"
3838
debeziumPort = "5432/tcp"
3939
maxRetries = 5
4040
)
@@ -82,7 +82,6 @@ func TestDebeziumIO_BasicRead(t *testing.T) {
8282
connectionProperties := []string{
8383
"database.dbname=inventory",
8484
"database.server.name=dbserver1",
85-
"database.include.list=inventory",
8685
"include.schema.changes=false",
8786
}
8887
read := ReadPipeline(expansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties)

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public static class ReadBuilder
7777
public static class Configuration extends CrossLanguageConfiguration {
7878
private @Nullable List<String> connectionProperties;
7979
private @Nullable Long maxNumberOfRecords;
80+
private @Nullable Long maxTimeToRun;
8081

8182
public void setConnectionProperties(@Nullable List<String> connectionProperties) {
8283
this.connectionProperties = connectionProperties;
@@ -85,6 +86,10 @@ public void setConnectionProperties(@Nullable List<String> connectionProperties)
8586
public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
8687
this.maxNumberOfRecords = maxNumberOfRecords;
8788
}
89+
90+
public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
91+
this.maxTimeToRun = maxTimeToRun;
92+
}
8893
}
8994

9095
@Override
@@ -114,6 +119,10 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
114119
readTransform.withMaxNumberOfRecords(configuration.maxNumberOfRecords.intValue());
115120
}
116121

122+
if (configuration.maxTimeToRun != null) {
123+
readTransform = readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
124+
}
125+
117126
return readTransform;
118127
}
119128
}

0 commit comments

Comments
 (0)