Skip to content

Commit b13100d

Browse files
authored
[core] Introduce properties field in Snapshot (apache#5703)
1 parent 7989e16 commit b13100d

File tree

14 files changed

+241
-30
lines changed

14 files changed

+241
-30
lines changed

paimon-api/src/main/java/org/apache/paimon/Snapshot.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class Snapshot implements Serializable {
8787
protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
8888
protected static final String FIELD_WATERMARK = "watermark";
8989
protected static final String FIELD_STATISTICS = "statistics";
90+
protected static final String FIELD_PROPERTIES = "properties";
9091

9192
// version of snapshot
9293
// null for paimon <= 0.2
@@ -195,6 +196,13 @@ public class Snapshot implements Serializable {
195196
@Nullable
196197
protected final String statistics;
197198

199+
// properties
200+
// null for paimon <= 1.1 or empty properties
201+
@JsonInclude(JsonInclude.Include.NON_NULL)
202+
@JsonProperty(FIELD_PROPERTIES)
203+
@Nullable
204+
protected final Map<String, String> properties;
205+
198206
public Snapshot(
199207
long id,
200208
long schemaId,
@@ -214,7 +222,8 @@ public Snapshot(
214222
@Nullable Long deltaRecordCount,
215223
@Nullable Long changelogRecordCount,
216224
@Nullable Long watermark,
217-
@Nullable String statistics) {
225+
@Nullable String statistics,
226+
@Nullable Map<String, String> properties) {
218227
this(
219228
CURRENT_VERSION,
220229
id,
@@ -235,7 +244,8 @@ public Snapshot(
235244
deltaRecordCount,
236245
changelogRecordCount,
237246
watermark,
238-
statistics);
247+
statistics,
248+
properties);
239249
}
240250

241251
@JsonCreator
@@ -260,7 +270,8 @@ public Snapshot(
260270
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
261271
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
262272
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
263-
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
273+
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
274+
@JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> properties) {
264275
this.version = version;
265276
this.id = id;
266277
this.schemaId = schemaId;
@@ -281,6 +292,7 @@ public Snapshot(
281292
this.changelogRecordCount = changelogRecordCount;
282293
this.watermark = watermark;
283294
this.statistics = statistics;
295+
this.properties = properties;
284296
}
285297

286298
@JsonGetter(FIELD_VERSION)
@@ -395,6 +407,12 @@ public String statistics() {
395407
return statistics;
396408
}
397409

410+
@JsonGetter(FIELD_PROPERTIES)
411+
@Nullable
412+
public Map<String, String> properties() {
413+
return properties;
414+
}
415+
398416
public String toJson() {
399417
return JsonSerdeUtil.toJson(this);
400418
}
@@ -421,7 +439,8 @@ public int hashCode() {
421439
deltaRecordCount,
422440
changelogRecordCount,
423441
watermark,
424-
statistics);
442+
statistics,
443+
properties);
425444
}
426445

427446
@Override
@@ -452,7 +471,8 @@ public boolean equals(Object o) {
452471
&& Objects.equals(deltaRecordCount, that.deltaRecordCount)
453472
&& Objects.equals(changelogRecordCount, that.changelogRecordCount)
454473
&& Objects.equals(watermark, that.watermark)
455-
&& Objects.equals(statistics, that.statistics);
474+
&& Objects.equals(statistics, that.statistics)
475+
&& Objects.equals(properties, that.properties);
456476
}
457477

458478
/** Type of changes in this snapshot. */

paimon-core/src/main/java/org/apache/paimon/Changelog.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public Changelog(Snapshot snapshot) {
6262
snapshot.deltaRecordCount(),
6363
snapshot.changelogRecordCount(),
6464
snapshot.watermark(),
65-
snapshot.statistics());
65+
snapshot.statistics(),
66+
snapshot.properties);
6667
}
6768

6869
@JsonCreator
@@ -87,7 +88,8 @@ public Changelog(
8788
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
8889
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
8990
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
90-
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
91+
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
92+
@JsonProperty(FIELD_PROPERTIES) Map<String, String> properties) {
9193
super(
9294
version,
9395
id,
@@ -108,7 +110,8 @@ public Changelog(
108110
deltaRecordCount,
109111
changelogRecordCount,
110112
watermark,
111-
statistics);
113+
statistics,
114+
properties);
112115
}
113116

114117
public static Changelog fromJson(String json) {

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ManifestCommittable {
3434
private final long identifier;
3535
@Nullable private final Long watermark;
3636
private final Map<Integer, Long> logOffsets;
37+
private final Map<String, String> properties;
3738
private final List<CommitMessage> commitMessages;
3839

3940
public ManifestCommittable(long identifier) {
@@ -45,17 +46,28 @@ public ManifestCommittable(long identifier, @Nullable Long watermark) {
4546
this.watermark = watermark;
4647
this.logOffsets = new HashMap<>();
4748
this.commitMessages = new ArrayList<>();
49+
this.properties = new HashMap<>();
4850
}
4951

5052
public ManifestCommittable(
5153
long identifier,
5254
@Nullable Long watermark,
5355
Map<Integer, Long> logOffsets,
5456
List<CommitMessage> commitMessages) {
57+
this(identifier, watermark, logOffsets, commitMessages, new HashMap<>());
58+
}
59+
60+
public ManifestCommittable(
61+
long identifier,
62+
@Nullable Long watermark,
63+
Map<Integer, Long> logOffsets,
64+
List<CommitMessage> commitMessages,
65+
Map<String, String> properties) {
5566
this.identifier = identifier;
5667
this.watermark = watermark;
5768
this.logOffsets = logOffsets;
5869
this.commitMessages = commitMessages;
70+
this.properties = properties;
5971
}
6072

6173
public void addFileCommittable(CommitMessage commitMessage) {
@@ -72,6 +84,10 @@ public void addLogOffset(int bucket, long offset, boolean allowDuplicate) {
7284
logOffsets.put(bucket, newOffset);
7385
}
7486

87+
public void addProperty(String key, String value) {
88+
properties.put(key, value);
89+
}
90+
7591
public long identifier() {
7692
return identifier;
7793
}
@@ -89,6 +105,10 @@ public List<CommitMessage> fileCommittables() {
89105
return commitMessages;
90106
}
91107

108+
public Map<String, String> properties() {
109+
return properties;
110+
}
111+
92112
@Override
93113
public boolean equals(Object o) {
94114
if (this == o) {
@@ -101,12 +121,13 @@ public boolean equals(Object o) {
101121
return Objects.equals(identifier, that.identifier)
102122
&& Objects.equals(watermark, that.watermark)
103123
&& Objects.equals(logOffsets, that.logOffsets)
104-
&& Objects.equals(commitMessages, that.commitMessages);
124+
&& Objects.equals(commitMessages, that.commitMessages)
125+
&& Objects.equals(properties, that.properties);
105126
}
106127

107128
@Override
108129
public int hashCode() {
109-
return Objects.hash(identifier, watermark, logOffsets, commitMessages);
130+
return Objects.hash(identifier, watermark, logOffsets, commitMessages, properties);
110131
}
111132

112133
@Override
@@ -116,7 +137,8 @@ public String toString() {
116137
+ "identifier = %s, "
117138
+ "watermark = %s, "
118139
+ "logOffsets = %s, "
119-
+ "commitMessages = %s",
120-
identifier, watermark, logOffsets, commitMessages);
140+
+ "commitMessages = %s, "
141+
+ "properties = %s}",
142+
identifier, watermark, logOffsets, commitMessages, properties);
121143
}
122144
}

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
/** {@link VersionedSerializer} for {@link ManifestCommittable}. */
3535
public class ManifestCommittableSerializer implements VersionedSerializer<ManifestCommittable> {
3636

37-
private static final int CURRENT_VERSION = 3;
37+
private static final int CURRENT_VERSION = 4;
3838

3939
private final CommitMessageSerializer commitMessageSerializer;
4040

@@ -62,6 +62,7 @@ public byte[] serialize(ManifestCommittable obj) throws IOException {
6262
view.writeLong(watermark);
6363
}
6464
serializeOffsets(view, obj.logOffsets());
65+
serializeProperties(view, obj.properties());
6566
view.writeInt(commitMessageSerializer.getVersion());
6667
commitMessageSerializer.serializeList(obj.fileCommittables(), view);
6768
return out.toByteArray();
@@ -76,6 +77,15 @@ private void serializeOffsets(DataOutputViewStreamWrapper view, Map<Integer, Lon
7677
}
7778
}
7879

80+
private void serializeProperties(
81+
DataOutputViewStreamWrapper view, Map<String, String> properties) throws IOException {
82+
view.writeInt(properties.size());
83+
for (Map.Entry<String, String> entry : properties.entrySet()) {
84+
view.writeUTF(entry.getKey());
85+
view.writeUTF(entry.getValue());
86+
}
87+
}
88+
7989
@Override
8090
public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException {
8191
if (version > CURRENT_VERSION) {
@@ -91,6 +101,8 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
91101
long identifier = view.readLong();
92102
Long watermark = view.readBoolean() ? null : view.readLong();
93103
Map<Integer, Long> offsets = deserializeOffsets(view);
104+
Map<String, String> properties =
105+
version == CURRENT_VERSION ? deserializeProperties(view) : new HashMap<>();
94106
int fileCommittableSerializerVersion = view.readInt();
95107
List<CommitMessage> fileCommittables;
96108
try {
@@ -116,7 +128,8 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
116128
fileCommittables = legacyV2CommitMessageSerializer.deserializeList(view);
117129
}
118130

119-
return new ManifestCommittable(identifier, watermark, offsets, fileCommittables);
131+
return new ManifestCommittable(
132+
identifier, watermark, offsets, fileCommittables, properties);
120133
}
121134

122135
private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view) throws IOException {
@@ -127,4 +140,14 @@ private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view) throws
127140
}
128141
return offsets;
129142
}
143+
144+
private Map<String, String> deserializeProperties(DataInputDeserializer view)
145+
throws IOException {
146+
int size = view.readInt();
147+
Map<String, String> properties = new HashMap<>(size);
148+
for (int i = 0; i < size; i++) {
149+
properties.put(view.readUTF(), view.readUTF());
150+
}
151+
return properties;
152+
}
130153
}

0 commit comments

Comments
 (0)