Skip to content

Commit dbd7818

Browse files
committed
Update ChangeStreamDao to query differnet TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 471050c commit dbd7818

File tree

2 files changed

+194
-60
lines changed

2 files changed

+194
-60
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 126 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import com.google.cloud.spanner.Dialect;
2323
import com.google.cloud.spanner.Options;
2424
import com.google.cloud.spanner.Options.RpcPriority;
25+
import com.google.cloud.spanner.ReadOnlyTransaction;
2526
import com.google.cloud.spanner.ResultSet;
2627
import com.google.cloud.spanner.Statement;
2728
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
2829

2930
/**
30-
* Responsible for making change stream queries for a given partition. The result will be given back
31+
* Responsible for making change stream queries for a given partition. The
32+
* result will be given back
3133
* as a {@link ResultSet}, which can be consumed until the stream is finished.
3234
*/
3335
public class ChangeStreamDao {
@@ -37,16 +39,19 @@ public class ChangeStreamDao {
3739
private final RpcPriority rpcPriority;
3840
private final String jobName;
3941
private final Dialect dialect;
42+
private final boolean isMutableKeyRange;
4043

4144
/**
42-
* Constructs a change stream dao. All the queries performed by this class will be for the given
43-
* change stream name with the specified rpc priority. The job name will be used to tag all the
45+
* Constructs a change stream dao. All the queries performed by this class will
46+
* be for the given
47+
* change stream name with the specified rpc priority. The job name will be used
48+
* to tag all the
4449
* queries made.
4550
*
4651
* @param changeStreamName the name of the change stream to be queried
47-
* @param databaseClient a spanner {@link DatabaseClient}
48-
* @param rpcPriority the priority to be used for the change stream queries
49-
* @param jobName the name of the job performing the query
52+
* @param databaseClient a spanner {@link DatabaseClient}
53+
* @param rpcPriority the priority to be used for the change stream queries
54+
* @param jobName the name of the job performing the query
5055
*/
5156
ChangeStreamDao(
5257
String changeStreamName,
@@ -59,83 +64,144 @@ public class ChangeStreamDao {
5964
this.rpcPriority = rpcPriority;
6065
this.jobName = jobName;
6166
this.dialect = dialect;
67+
this.isMutableKeyRange = isMutableKeyRangeChangeStream(databaseClient, dialect, changeStreamName);
6268
}
6369

6470
/**
65-
* Performs a change stream query. If the partition token given is the initial partition null will
66-
* be used in the query instead. The change stream query will be tagged as following: {@code
67-
* "job=<jobName>"}. The result will be given as a {@link ChangeStreamResultSet} which can be
68-
* consumed as a stream, yielding records until no more are available for the query made. Note
69-
* that one needs to call {@link ChangeStreamResultSet#next()} to initiate the change stream
71+
* Performs a change stream query. If the partition token given is the initial
72+
* partition null will
73+
* be used in the query instead. The change stream query will be tagged as
74+
* following: {@code
75+
* "job=<jobName>"}. The result will be given as a {@link ChangeStreamResultSet}
76+
* which can be
77+
* consumed as a stream, yielding records until no more are available for the
78+
* query made. Note
79+
* that one needs to call {@link ChangeStreamResultSet#next()} to initiate the
80+
* change stream
7081
* query.
7182
*
72-
* @param partitionToken the unique partition token to be queried. If {@link
73-
* InitialPartition#PARTITION_TOKEN} is given, null will be used in the change stream query
74-
* instead.
75-
* @param startTimestamp the inclusive start time for the change stream query
76-
* @param endTimestamp the inclusive end time for the change stream query
77-
* @param heartbeatMillis the number of milliseconds after the stream is idle, which a heartbeat
78-
* record will be emitted in the change stream query
79-
* @return a {@link ChangeStreamResultSet} that will produce a stream of records for the change
80-
* stream query
83+
* @param partitionToken the unique partition token to be queried. If {@link
84+
* InitialPartition#PARTITION_TOKEN} is given, null will
85+
* be used in the change stream query
86+
* instead.
87+
* @param startTimestamp the inclusive start time for the change stream query
88+
* @param endTimestamp the inclusive end time for the change stream query
89+
* @param heartbeatMillis the number of milliseconds after the stream is idle,
90+
* which a heartbeat
91+
* record will be emitted in the change stream query
92+
* @return a {@link ChangeStreamResultSet} that will produce a stream of records
93+
* for the change
94+
* stream query
8195
*/
8296
public ChangeStreamResultSet changeStreamQuery(
8397
String partitionToken,
8498
Timestamp startTimestamp,
8599
Timestamp endTimestamp,
86100
long heartbeatMillis) {
87101
// For the initial partition we query with a null partition token
88-
final String partitionTokenOrNull =
89-
InitialPartition.isInitialPartition(partitionToken) ? null : partitionToken;
102+
final String partitionTokenOrNull = InitialPartition.isInitialPartition(partitionToken) ? null : partitionToken;
90103

91104
String query = "";
92105
Statement statement;
93106
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
96-
statement =
97-
Statement.newBuilder(query)
98-
.bind("p1")
99-
.to(startTimestamp)
100-
.bind("p2")
101-
.to(endTimestamp)
102-
.bind("p3")
103-
.to(partitionTokenOrNull)
104-
.bind("p4")
105-
.to(heartbeatMillis)
106-
.build();
107+
if (this.isMutableKeyRange) {
108+
query = "SELECT * FROM \"spanner\".\"read_proto_bytes_"
109+
+ changeStreamName
110+
+ "\"($1, $2, $3, $4, null)";
111+
} else {
112+
query = "SELECT * FROM \"spanner\".\"read_json_"
113+
+ changeStreamName
114+
+ "\"($1, $2, $3, $4, null)";
115+
}
116+
statement = Statement.newBuilder(query)
117+
.bind("p1")
118+
.to(startTimestamp)
119+
.bind("p2")
120+
.to(endTimestamp)
121+
.bind("p3")
122+
.to(partitionTokenOrNull)
123+
.bind("p4")
124+
.to(heartbeatMillis)
125+
.build();
107126
} else {
108-
query =
109-
"SELECT * FROM READ_"
110-
+ changeStreamName
111-
+ "("
112-
+ " start_timestamp => @startTimestamp,"
113-
+ " end_timestamp => @endTimestamp,"
114-
+ " partition_token => @partitionToken,"
115-
+ " read_options => null,"
116-
+ " heartbeat_milliseconds => @heartbeatMillis"
117-
+ ")";
118-
statement =
119-
Statement.newBuilder(query)
120-
.bind("startTimestamp")
121-
.to(startTimestamp)
122-
.bind("endTimestamp")
123-
.to(endTimestamp)
124-
.bind("partitionToken")
125-
.to(partitionTokenOrNull)
126-
.bind("heartbeatMillis")
127-
.to(heartbeatMillis)
128-
.build();
127+
query = "SELECT * FROM READ_"
128+
+ changeStreamName
129+
+ "("
130+
+ " start_timestamp => @startTimestamp,"
131+
+ " end_timestamp => @endTimestamp,"
132+
+ " partition_token => @partitionToken,"
133+
+ " read_options => null,"
134+
+ " heartbeat_milliseconds => @heartbeatMillis"
135+
+ ")";
136+
statement = Statement.newBuilder(query)
137+
.bind("startTimestamp")
138+
.to(startTimestamp)
139+
.bind("endTimestamp")
140+
.to(endTimestamp)
141+
.bind("partitionToken")
142+
.to(partitionTokenOrNull)
143+
.bind("heartbeatMillis")
144+
.to(heartbeatMillis)
145+
.build();
129146
}
130-
final ResultSet resultSet =
131-
databaseClient
132-
.singleUse()
133-
.executeQuery(statement, Options.priority(rpcPriority), Options.tag("job=" + jobName));
147+
final ResultSet resultSet = databaseClient
148+
.singleUse()
149+
.executeQuery(statement, Options.priority(rpcPriority), Options.tag("job=" + jobName));
134150

135151
return new ChangeStreamResultSet(resultSet);
136152
}
137153

138154
private boolean isPostgres() {
139155
return this.dialect == Dialect.POSTGRESQL;
140156
}
157+
158+
protected boolean isMutableKeyRangeChangeStream() {
159+
return this.isMutableKeyRange;
160+
}
161+
162+
// Returns if the change stream with the give name is MUTABLE_KEY_RANGE or not.
163+
private static boolean isMutableKeyRangeChangeStream(
164+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
165+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
166+
ResultSet resultSet = readChangeStreamOptions(tx, dialect, changeStreamName);
167+
168+
while (resultSet.next()) {
169+
String optionName = resultSet.getString(0);
170+
if ("partition_mode".equalsIgnoreCase(optionName)) {
171+
String optionValue = resultSet.getString(1);
172+
return "MUTABLE_KEY_RANGE".equalsIgnoreCase(optionValue);
173+
}
174+
}
175+
}
176+
return false;
177+
}
178+
179+
private static ResultSet readChangeStreamOptions(
180+
ReadOnlyTransaction tx, Dialect dialect, String changeStreamName) {
181+
Statement statement;
182+
if (dialect == Dialect.POSTGRESQL) {
183+
statement = Statement.newBuilder(
184+
"select"
185+
+ " option_name,"
186+
+ " option_value\n"
187+
+ "from"
188+
+ " information_schema.change_stream_options\n"
189+
+ "where change_stream_name = $1")
190+
.bind("p1")
191+
.to(changeStreamName)
192+
.build();
193+
} else {
194+
statement = Statement.newBuilder(
195+
"select"
196+
+ " option_name,"
197+
+ " option_value\n"
198+
+ "from"
199+
+ " information_schema.change_stream_options\n"
200+
+ "where change_stream_name = @changeStreamName")
201+
.bind("changeStreamName")
202+
.to(changeStreamName)
203+
.build();
204+
}
205+
return tx.executeQuery(statement);
206+
}
141207
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import com.google.cloud.spanner.DatabaseClient;
26+
import com.google.cloud.spanner.Dialect;
27+
import com.google.cloud.spanner.Options.RpcPriority;
28+
import com.google.cloud.spanner.ReadOnlyTransaction;
29+
import com.google.cloud.spanner.ResultSet;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
public class ChangeStreamDaoTest {
34+
private DatabaseClient databaseClient;
35+
private RpcPriority rpcPriority;
36+
private ChangeStreamDao changeStreamDao;
37+
private static final String CHANGE_STREAM_NAME = "testCS";
38+
39+
@Before
40+
public void setUp() {
41+
databaseClient = mock(DatabaseClient.class);
42+
rpcPriority = mock(RpcPriority.class);
43+
changeStreamDao =
44+
new ChangeStreamDao(
45+
CHANGE_STREAM_NAME,
46+
databaseClient,
47+
rpcPriority,
48+
"testjob",
49+
Dialect.GOOGLE_STANDARD_SQL);
50+
}
51+
52+
@Test
53+
public void testPartitionOption() {
54+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
55+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
56+
57+
ResultSet resultSet = mock(ResultSet.class);
58+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
59+
when(resultSet.next()).thenReturn(true).thenReturn(false);
60+
when(resultSet.getString(0)).thenReturn("partition_mode");
61+
62+
when(resultSet.getString(1)).thenReturn("MUTABLE_KEY_RANGE");
63+
assertEquals(true, changeStreamDao.isMutableKeyRangeChangeStream());
64+
65+
when(resultSet.getString(1)).thenReturn("IMMUTABLE_KEY_RANGE");
66+
assertEquals(false, changeStreamDao.isMutableKeyRangeChangeStream());
67+
}
68+
}

0 commit comments

Comments
 (0)