Skip to content

Commit cd1f7d6

Browse files
committed
Add change stream new record models and tests.
Include PartitionStartRecord, PartitionEndRecord, PartitionEventRecord, MoveInEvent, MoveOutEvent. These will be used in the following PRs - the returned change stream record will be parsed into these models, and corresponding actions will be added and called later.
1 parent 72cfcea commit cd1f7d6

File tree

10 files changed

+790
-0
lines changed

10 files changed

+790
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
19+
20+
import java.io.Serializable;
21+
import java.util.Objects;
22+
import javax.annotation.Nullable;
23+
import org.apache.beam.sdk.coders.DefaultCoder;
24+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
25+
26+
/** Describes move-in of the key ranges into the change stream partition. */
27+
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
28+
@DefaultCoder(AvroCoder.class)
29+
public class MoveInEvent implements Serializable {
30+
31+
private static final long serialVersionUID = 7954905180615424094L;
32+
33+
private String sourcePartitionToken;
34+
35+
/** Default constructor for serialization only. */
36+
private MoveInEvent() {}
37+
38+
/**
39+
* Constructs a MoveInEvent from the source partition token.
40+
*
41+
* @param sourcePartitionToken An unique partition identifier describing the source change stream
42+
* partition that recorded changes for the key range that is moving into this partition.
43+
*/
44+
public MoveInEvent(String sourcePartitionToken) {
45+
this.sourcePartitionToken = sourcePartitionToken;
46+
}
47+
48+
/**
49+
* The source change stream partition token that recorded changes for the key range that is moving
50+
* into this partition.
51+
*
52+
* @return source partition token string.
53+
*/
54+
public String getSourcePartitionToken() {
55+
return sourcePartitionToken;
56+
}
57+
58+
@Override
59+
public boolean equals(@Nullable Object o) {
60+
if (this == o) {
61+
return true;
62+
}
63+
if (!(o instanceof MoveInEvent)) {
64+
return false;
65+
}
66+
MoveInEvent moveInEvent = (MoveInEvent) o;
67+
return Objects.equals(sourcePartitionToken, moveInEvent.sourcePartitionToken);
68+
}
69+
70+
@Override
71+
public int hashCode() {
72+
return Objects.hash(sourcePartitionToken);
73+
}
74+
75+
@Override
76+
public String toString() {
77+
return "MoveInEvent{" + "sourcePartitionToken=" + sourcePartitionToken + '}';
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
19+
20+
import java.io.Serializable;
21+
import java.util.Objects;
22+
import javax.annotation.Nullable;
23+
import org.apache.beam.sdk.coders.DefaultCoder;
24+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
25+
26+
/** Describes move-out of the key ranges into the change stream partition. */
27+
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
28+
@DefaultCoder(AvroCoder.class)
29+
public class MoveOutEvent implements Serializable {
30+
31+
private static final long serialVersionUID = 1961571231618408326L;
32+
33+
private String destinationPartitionToken;
34+
35+
/** Default constructor for serialization only. */
36+
private MoveOutEvent() {}
37+
38+
/**
39+
* Constructs a MoveOutEvent from the destination partition token.
40+
*
41+
* @param destinationPartitionToken An unique partition identifier describing the destination
42+
* change stream partition that recorded changes for the key range that is moving into this
43+
* partition.
44+
*/
45+
public MoveOutEvent(String destinationPartitionToken) {
46+
this.destinationPartitionToken = destinationPartitionToken;
47+
}
48+
49+
/**
50+
* The destination change stream partition token that recorded changes for the key range that is
51+
* moving out this partition.
52+
*
53+
* @return destination partition token string.
54+
*/
55+
public String getDestinationPartitionToken() {
56+
return destinationPartitionToken;
57+
}
58+
59+
@Override
60+
public boolean equals(@Nullable Object o) {
61+
if (this == o) {
62+
return true;
63+
}
64+
if (!(o instanceof MoveOutEvent)) {
65+
return false;
66+
}
67+
MoveOutEvent moveOutEvent = (MoveOutEvent) o;
68+
return Objects.equals(destinationPartitionToken, moveOutEvent.destinationPartitionToken);
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return Objects.hash(destinationPartitionToken);
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "MoveOutEvent{" + "destinationPartitionToken=" + destinationPartitionToken + '}';
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.model;
19+
20+
import com.google.cloud.Timestamp;
21+
import java.util.Objects;
22+
import org.apache.avro.reflect.AvroEncode;
23+
import org.apache.avro.reflect.Nullable;
24+
import org.apache.beam.sdk.coders.DefaultCoder;
25+
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
26+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.encoder.TimestampEncoding;
27+
28+
/**
29+
* A partition end record serves as a notification that the client should stop reading the
30+
* partition. No further records are expected to be retrieved on it.
31+
*/
32+
@SuppressWarnings("initialization.fields.uninitialized") // Avro requires the default constructor
33+
@DefaultCoder(AvroCoder.class)
34+
public class PartitionEndRecord implements ChangeStreamRecord {
35+
36+
private static final long serialVersionUID = 5406538761724655621L;
37+
38+
@AvroEncode(using = TimestampEncoding.class)
39+
private Timestamp endTimestamp;
40+
41+
private String recordSequence;
42+
43+
@Nullable private ChangeStreamRecordMetadata metadata;
44+
45+
/** Default constructor for serialization only. */
46+
private PartitionEndRecord() {}
47+
48+
/**
49+
* Constructs the partition end record with the given timestamp, record sequence and metadata.
50+
*
51+
* @param endTimestamp end timestamp at which the change stream partition is terminated
52+
* @param recordSequence the order within a partition and a transaction in which the record was
53+
* put to the stream
54+
* @param metadata connector execution metadata for the given record
55+
*/
56+
public PartitionEndRecord(
57+
Timestamp endTimestamp, String recordSequence, ChangeStreamRecordMetadata metadata) {
58+
this.endTimestamp = endTimestamp;
59+
this.recordSequence = recordSequence;
60+
this.metadata = metadata;
61+
}
62+
63+
/**
64+
* Indicates the timestamp for which the change stream partition is terminated.
65+
*
66+
* @return the timestamp for which the change stream partition is terminated
67+
*/
68+
@Override
69+
public Timestamp getRecordTimestamp() {
70+
return getEndTimestamp();
71+
}
72+
73+
/**
74+
* The end timestamp at which the change stream partition is terminated.
75+
*
76+
* @return the timestamp for which the change stream partition is terminated
77+
*/
78+
public Timestamp getEndTimestamp() {
79+
return endTimestamp;
80+
}
81+
82+
/**
83+
* Indicates the order in which a record was put to the stream. Is unique and increasing within a
84+
* partition. It is relative to the scope of partition, commit timestamp, and
85+
* server_transaction_id. It is useful for readers downstream to dedup any duplicate records that
86+
* were read/recorded.
87+
*
88+
* @return record sequence of the record
89+
*/
90+
public String getRecordSequence() {
91+
return recordSequence;
92+
}
93+
94+
@Override
95+
public boolean equals(@javax.annotation.Nullable Object o) {
96+
if (this == o) {
97+
return true;
98+
}
99+
if (!(o instanceof PartitionEndRecord)) {
100+
return false;
101+
}
102+
PartitionEndRecord that = (PartitionEndRecord) o;
103+
return Objects.equals(endTimestamp, that.endTimestamp)
104+
&& Objects.equals(recordSequence, that.recordSequence);
105+
}
106+
107+
@Override
108+
public int hashCode() {
109+
return Objects.hash(endTimestamp, recordSequence);
110+
}
111+
112+
@Override
113+
public String toString() {
114+
return "PartitionEndRecord{"
115+
+ "endTimestamp="
116+
+ endTimestamp
117+
+ ", recordSequence='"
118+
+ recordSequence
119+
+ '\''
120+
+ ", metadata="
121+
+ metadata
122+
+ '}';
123+
}
124+
}

0 commit comments

Comments
 (0)