Skip to content

Commit dc213a9

Browse files
authored
Add change stream new record models and tests. (apache#35258)
Include PartitionStartRecord, PartitionEndRecord, PartitionEventRecord, MoveInEvent, MoveOutEvent. These will be used in the following PRs - the returned change stream record will be parsed into these models, and corresponding actions will be added and called later.
1 parent f2d0dc8 commit dc213a9

File tree

6 files changed

+527
-0
lines changed

6 files changed

+527
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
19+
20+
import com.google.cloud.Timestamp;
21+
import java.util.Objects;
22+
import org.apache.avro.reflect.AvroEncode;
23+
import org.apache.avro.reflect.Nullable;
24+
import org.apache.beam.sdk.coders.DefaultCoder;
25+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
26+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder.TimestampEncoding;
27+
28+
/**
29+
* A partition end record serves as a notification that the client should stop reading the
30+
* partition. No further records are expected to be retrieved on it.
31+
*/
32+
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
33+
@DefaultCoder(AvroCoder.class)
34+
public class PartitionEndRecord implements ChangeStreamRecord {
35+
36+
private static final long serialVersionUID = 5406538761724655621L;
37+
38+
@AvroEncode(using = TimestampEncoding.class)
39+
private Timestamp endTimestamp;
40+
41+
private String recordSequence;
42+
43+
@Nullable private ChangeStreamRecordMetadata metadata;
44+
45+
/** Default constructor for serialization only. */
46+
private PartitionEndRecord() {}
47+
48+
/**
49+
* Constructs the partition end record with the given timestamp, record sequence and metadata.
50+
*
51+
* @param endTimestamp end timestamp at which the change stream partition is terminated
52+
* @param recordSequence the order within a partition and a transaction in which the record was
53+
* put to the stream
54+
* @param metadata connector execution metadata for the given record
55+
*/
56+
public PartitionEndRecord(
57+
Timestamp endTimestamp, String recordSequence, ChangeStreamRecordMetadata metadata) {
58+
this.endTimestamp = endTimestamp;
59+
this.recordSequence = recordSequence;
60+
this.metadata = metadata;
61+
}
62+
63+
/**
64+
* Indicates the timestamp for which the change stream partition is terminated.
65+
*
66+
* @return the timestamp for which the change stream partition is terminated
67+
*/
68+
@Override
69+
public Timestamp getRecordTimestamp() {
70+
return getEndTimestamp();
71+
}
72+
73+
/**
74+
* The end timestamp at which the change stream partition is terminated.
75+
*
76+
* @return the timestamp for which the change stream partition is terminated
77+
*/
78+
public Timestamp getEndTimestamp() {
79+
return endTimestamp;
80+
}
81+
82+
/**
83+
* Indicates the order in which a record was put to the stream. Is unique and increasing within a
84+
* partition. It is relative to the scope of partition, commit timestamp, and
85+
* server_transaction_id. It is useful for readers downstream to dedup any duplicate records that
86+
* were read/recorded.
87+
*
88+
* @return record sequence of the record
89+
*/
90+
public String getRecordSequence() {
91+
return recordSequence;
92+
}
93+
94+
@Override
95+
public boolean equals(@javax.annotation.Nullable Object o) {
96+
if (this == o) {
97+
return true;
98+
}
99+
if (!(o instanceof PartitionEndRecord)) {
100+
return false;
101+
}
102+
PartitionEndRecord that = (PartitionEndRecord) o;
103+
return Objects.equals(endTimestamp, that.endTimestamp)
104+
&& Objects.equals(recordSequence, that.recordSequence);
105+
}
106+
107+
@Override
108+
public int hashCode() {
109+
return Objects.hash(endTimestamp, recordSequence);
110+
}
111+
112+
@Override
113+
public String toString() {
114+
return "PartitionEndRecord{"
115+
+ "endTimestamp="
116+
+ endTimestamp
117+
+ ", recordSequence='"
118+
+ recordSequence
119+
+ '\''
120+
+ ", metadata="
121+
+ metadata
122+
+ '}';
123+
}
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
19+
20+
import com.google.cloud.Timestamp;
21+
import java.util.Objects;
22+
import org.apache.avro.reflect.AvroEncode;
23+
import org.apache.avro.reflect.Nullable;
24+
import org.apache.beam.sdk.coders.DefaultCoder;
25+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
26+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder.TimestampEncoding;
27+
28+
/**
29+
* A partition event record describes key range changes for a change stream partition. This record
30+
* is only used for updating watermark in the dataflow connector. MoveInEvent and MoveOutEvent are
31+
* not tracked as they will not be used in the dataflow connector.
32+
*/
33+
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
34+
@DefaultCoder(AvroCoder.class)
35+
public class PartitionEventRecord implements ChangeStreamRecord {
36+
37+
private static final long serialVersionUID = 6431436477387396791L;
38+
39+
@AvroEncode(using = TimestampEncoding.class)
40+
private Timestamp commitTimestamp;
41+
42+
private String recordSequence;
43+
44+
@Nullable private ChangeStreamRecordMetadata metadata;
45+
46+
/** Default constructor for serialization only. */
47+
private PartitionEventRecord() {}
48+
49+
/**
50+
* Constructs the partition event record with the given partitions.
51+
*
52+
* @param commitTimestamp the timestamp at which the key range change occurred
53+
* @param recordSequence the order within a partition and a transaction in which the record was
54+
* put to the stream
55+
* @param metadata connector execution metadata for the given record
56+
*/
57+
public PartitionEventRecord(
58+
Timestamp commitTimestamp, String recordSequence, ChangeStreamRecordMetadata metadata) {
59+
this.commitTimestamp = commitTimestamp;
60+
this.recordSequence = recordSequence;
61+
this.metadata = metadata;
62+
}
63+
64+
/**
65+
* Returns the timestamp at which the key range change occurred.
66+
*
67+
* @return the start timestamp of the partition
68+
*/
69+
@Override
70+
public Timestamp getRecordTimestamp() {
71+
return getCommitTimestamp();
72+
}
73+
74+
/**
75+
* Returns the timestamp at which the key range change occurred.
76+
*
77+
* @return the commit timestamp of the key range change
78+
*/
79+
public Timestamp getCommitTimestamp() {
80+
return commitTimestamp;
81+
}
82+
83+
/**
84+
* Indicates the order in which a record was put to the stream. Is unique and increasing within a
85+
* partition. It is relative to the scope of partition, commit timestamp, and
86+
* server_transaction_id. It is useful for readers downstream to dedup any duplicate records that
87+
* were read/recorded.
88+
*
89+
* @return record sequence of the record
90+
*/
91+
public String getRecordSequence() {
92+
return recordSequence;
93+
}
94+
95+
@Override
96+
public boolean equals(@javax.annotation.Nullable Object o) {
97+
if (this == o) {
98+
return true;
99+
}
100+
if (!(o instanceof PartitionEventRecord)) {
101+
return false;
102+
}
103+
PartitionEventRecord that = (PartitionEventRecord) o;
104+
return Objects.equals(commitTimestamp, that.commitTimestamp)
105+
&& Objects.equals(recordSequence, that.recordSequence);
106+
}
107+
108+
@Override
109+
public int hashCode() {
110+
return Objects.hash(commitTimestamp, recordSequence);
111+
}
112+
113+
@Override
114+
public String toString() {
115+
return "PartitionEventRecord{"
116+
+ "commitTimestamp="
117+
+ commitTimestamp
118+
+ ", recordSequence='"
119+
+ recordSequence
120+
+ '\''
121+
+ ", metadata="
122+
+ metadata
123+
+ '}';
124+
}
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
19+
20+
import com.google.cloud.Timestamp;
21+
import java.util.List;
22+
import java.util.Objects;
23+
import org.apache.avro.reflect.AvroEncode;
24+
import org.apache.avro.reflect.Nullable;
25+
import org.apache.beam.sdk.coders.DefaultCoder;
26+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
27+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder.TimestampEncoding;
28+
29+
/**
30+
* A partition start record serves as a notification that the client should schedule the partitions
31+
* to be queried. PartitionStartRecord returns information about one or more partitions.
32+
*/
33+
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
34+
@DefaultCoder(AvroCoder.class)
35+
public class PartitionStartRecord implements ChangeStreamRecord {
36+
37+
private static final long serialVersionUID = 1446342293580399634L;
38+
39+
@AvroEncode(using = TimestampEncoding.class)
40+
private Timestamp startTimestamp;
41+
42+
private String recordSequence;
43+
private List<String> partitionTokens;
44+
@Nullable private ChangeStreamRecordMetadata metadata;
45+
46+
/** Default constructor for serialization only. */
47+
private PartitionStartRecord() {}
48+
49+
/**
50+
* Constructs the partition start record with the given partitions.
51+
*
52+
* @param startTimestamp the timestamp which these partitions started being valid in Cloud Spanner
53+
* @param recordSequence the order within a partition and a transaction in which the record was
54+
* put to the stream
55+
* @param partitionTokens Unique partition identifiers to be used in queries
56+
* @param metadata connector execution metadata for the given record
57+
*/
58+
public PartitionStartRecord(
59+
Timestamp startTimestamp,
60+
String recordSequence,
61+
List<String> partitionTokens,
62+
ChangeStreamRecordMetadata metadata) {
63+
this.startTimestamp = startTimestamp;
64+
this.recordSequence = recordSequence;
65+
this.partitionTokens = partitionTokens;
66+
this.metadata = metadata;
67+
}
68+
69+
/**
70+
* Returns the timestamp that which these partitions started being valid in Cloud Spanner. The
71+
* caller must use this time as the change stream query start timestamp for the new partitions.
72+
*
73+
* @return the start timestamp of the partitions
74+
*/
75+
@Override
76+
public Timestamp getRecordTimestamp() {
77+
return getStartTimestamp();
78+
}
79+
80+
/**
81+
* It is the partition start time of the partition tokens.
82+
*
83+
* @return the start timestamp of the partitions
84+
*/
85+
public Timestamp getStartTimestamp() {
86+
return startTimestamp;
87+
}
88+
89+
/**
90+
* Indicates the order in which a record was put to the stream. Is unique and increasing within a
91+
* partition. It is relative to the scope of partition, commit timestamp, and
92+
* server_transaction_id. It is useful for readers downstream to dedup any duplicate records that
93+
* were read/recorded.
94+
*
95+
* @return record sequence of the record
96+
*/
97+
public String getRecordSequence() {
98+
return recordSequence;
99+
}
100+
101+
/**
102+
* List of partitions yielded within this record.
103+
*
104+
* @return partition tokens
105+
*/
106+
public List<String> getPartitionTokens() {
107+
return partitionTokens;
108+
}
109+
110+
@Override
111+
public boolean equals(@javax.annotation.Nullable Object o) {
112+
if (this == o) {
113+
return true;
114+
}
115+
if (!(o instanceof PartitionStartRecord)) {
116+
return false;
117+
}
118+
PartitionStartRecord that = (PartitionStartRecord) o;
119+
return Objects.equals(startTimestamp, that.startTimestamp)
120+
&& Objects.equals(recordSequence, that.recordSequence)
121+
&& Objects.equals(partitionTokens, that.partitionTokens);
122+
}
123+
124+
@Override
125+
public int hashCode() {
126+
return Objects.hash(startTimestamp, recordSequence, partitionTokens);
127+
}
128+
129+
@Override
130+
public String toString() {
131+
return "PartitionStartRecord{"
132+
+ "startTimestamp="
133+
+ startTimestamp
134+
+ ", recordSequence='"
135+
+ recordSequence
136+
+ '\''
137+
+ ", partitionTokens="
138+
+ partitionTokens
139+
+ ", metadata="
140+
+ metadata
141+
+ '}';
142+
}
143+
}

0 commit comments

Comments
 (0)