Skip to content

Commit bc59ac1

Browse files
authored
Merge pull request #369 from alex268/add_changeset_to_description
Added changefeeds to result of describeTable method
2 parents dbfa5c7 + 46c58b6 commit bc59ac1

File tree

5 files changed

+488
-84
lines changed

5 files changed

+488
-84
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package tech.ydb.table.description;
2+
3+
import java.util.Objects;
4+
5+
import tech.ydb.table.settings.Changefeed;
6+
7+
/**
8+
*
9+
* @author Aleksandr Gorshenin
10+
*/
11+
public class ChangefeedDescription {
12+
public enum State {
13+
/**
14+
* Normal state, from this state changefeed can be disabled
15+
*/
16+
ENABLED,
17+
/**
18+
* No new change records are generated, but the old ones remain available<br>
19+
* From this state changefeed cannot be switched to any other state
20+
*/
21+
DISABLED,
22+
/**
23+
* An initial scan is being performed. <br> After its completion changefeed will switch to the normal state
24+
*/
25+
INITIAL_SCAN;
26+
}
27+
28+
private final String name;
29+
private final Changefeed.Mode mode;
30+
private final Changefeed.Format format;
31+
private final State state;
32+
private final boolean virtualTimestamps;
33+
34+
public ChangefeedDescription(String name, Changefeed.Mode mode, Changefeed.Format format, State state,
35+
boolean virtualTimestamps) {
36+
this.name = name;
37+
this.mode = mode;
38+
this.format = format;
39+
this.state = state;
40+
this.virtualTimestamps = virtualTimestamps;
41+
}
42+
43+
/**
44+
* @return Name of the feed
45+
*/
46+
public String getName() {
47+
return name;
48+
}
49+
50+
/**
51+
* @return Mode specifies the information that will be written to the feed
52+
*/
53+
public Changefeed.Mode getMode() {
54+
return mode;
55+
}
56+
57+
/**
58+
* @return Format of the data
59+
*/
60+
public Changefeed.Format getFormat() {
61+
return format;
62+
}
63+
64+
/**
65+
* @return State of the feed
66+
*/
67+
public State getState() {
68+
return state;
69+
}
70+
71+
/**
72+
* @return State of emitting of virtual timestamps along with data
73+
*/
74+
public boolean hasVirtualTimestamps() {
75+
return virtualTimestamps;
76+
}
77+
78+
@Override
79+
public int hashCode() {
80+
return Objects.hash(name, mode, format, state, virtualTimestamps);
81+
}
82+
83+
@Override
84+
public boolean equals(Object o) {
85+
if (this == o) {
86+
return true;
87+
}
88+
if (o == null || getClass() != o.getClass()) {
89+
return false;
90+
}
91+
92+
ChangefeedDescription cd = (ChangefeedDescription) o;
93+
return Objects.equals(name, cd.name)
94+
&& mode == cd.mode
95+
&& format == cd.format
96+
&& state == cd.state
97+
&& virtualTimestamps == cd.virtualTimestamps;
98+
}
99+
100+
@Override
101+
public String toString() {
102+
return new StringBuilder("Changefeed['").append(name)
103+
.append("']{state=").append(state)
104+
.append(", format=").append(format)
105+
.append(", mode=").append(mode)
106+
.append(", virtual timestamps=").append(virtualTimestamps)
107+
.append("}").toString();
108+
}
109+
}

table/src/main/java/tech/ydb/table/description/TableDescription.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
import com.google.common.collect.ImmutableList;
1414
import com.google.common.collect.Sets;
1515

16+
import tech.ydb.table.Session;
1617
import tech.ydb.table.description.TableTtl.TtlMode;
18+
import tech.ydb.table.settings.AlterTableSettings;
1719
import tech.ydb.table.settings.PartitioningSettings;
1820
import tech.ydb.table.values.OptionalType;
1921
import tech.ydb.table.values.Type;
@@ -28,6 +30,7 @@ public class TableDescription {
2830
private final List<TableIndex> indexes;
2931
private final List<ColumnFamily> columnFamilies;
3032
private final List<KeyRange> keyRanges;
33+
private final List<ChangefeedDescription> changefeeds;
3134

3235
@Nullable
3336
private final TableStats tableStats;
@@ -49,6 +52,7 @@ private TableDescription(Builder builder) {
4952
this.partitioningSettings = builder.partitioningSettings;
5053
this.partitionStats = ImmutableList.copyOf(builder.partitionStats);
5154
this.tableTtl = builder.ttlSettings;
55+
this.changefeeds = builder.changefeeds;
5256
}
5357

5458
public static Builder newBuilder() {
@@ -93,6 +97,10 @@ public TableTtl getTableTtl() {
9397
return tableTtl;
9498
}
9599

100+
public List<ChangefeedDescription> getChangefeeds() {
101+
return changefeeds;
102+
}
103+
96104
/**
97105
* BUILDER
98106
*/
@@ -108,6 +116,7 @@ public static class Builder {
108116
private PartitioningSettings partitioningSettings = null;
109117
private final List<PartitionStats> partitionStats = new ArrayList<>();
110118
private TableTtl ttlSettings = TableTtl.notSet();
119+
private final List<ChangefeedDescription> changefeeds = new ArrayList<>();
111120

112121
public Builder addNonnullColumn(String name, Type type) {
113122
return addNonnullColumn(name, type, null);
@@ -222,6 +231,21 @@ public Builder addPartitionStat(long rows, long size) {
222231
return this;
223232
}
224233

234+
/**
235+
* Adds changefeed information to table description<br>
236+
* <b>NOTICE:</b> Changefeed description cannot be used for changefeed creating.<br>
237+
* It is impossible to create changefeed in one time with table creating. For adding a new changefeed use method
238+
* {@link Session#alterTable(java.lang.String, tech.ydb.table.settings.AlterTableSettings)} and
239+
* {@link AlterTableSettings#addChangefeed(tech.ydb.table.settings.Changefeed)}
240+
*
241+
* @param changefeed changefeed description
242+
* @return table description builder
243+
*/
244+
public Builder addChangefeed(ChangefeedDescription changefeed) {
245+
this.changefeeds.add(changefeed);
246+
return this;
247+
}
248+
225249
@Deprecated
226250
public Builder setTtlSettings(int ttlModeCase, String columnName, int expireAfterSeconds) {
227251
this.ttlSettings = new TableTtl(TtlMode.forCase(ttlModeCase), columnName, expireAfterSeconds);

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 125 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import tech.ydb.proto.common.CommonProtos;
4040
import tech.ydb.proto.table.YdbTable;
4141
import tech.ydb.table.Session;
42+
import tech.ydb.table.description.ChangefeedDescription;
4243
import tech.ydb.table.description.ColumnFamily;
4344
import tech.ydb.table.description.KeyBound;
4445
import tech.ydb.table.description.KeyRange;
@@ -210,13 +211,43 @@ private static YdbTable.ColumnMeta buildColumnMeta(TableColumn column) {
210211
return builder.build();
211212
}
212213

213-
private static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
214+
public static YdbTable.ChangefeedFormat.Format buildChangefeedFormat(Changefeed.Format format) {
215+
switch (format) {
216+
case JSON:
217+
return YdbTable.ChangefeedFormat.Format.FORMAT_JSON;
218+
case DYNAMODB_STREAMS_JSON:
219+
return YdbTable.ChangefeedFormat.Format.FORMAT_DYNAMODB_STREAMS_JSON;
220+
case DEBEZIUM_JSON:
221+
return YdbTable.ChangefeedFormat.Format.FORMAT_DEBEZIUM_JSON;
222+
default:
223+
return YdbTable.ChangefeedFormat.Format.FORMAT_UNSPECIFIED;
224+
}
225+
}
226+
227+
public static YdbTable.ChangefeedMode.Mode buildChangefeedMode(Changefeed.Mode mode) {
228+
switch (mode) {
229+
case KEYS_ONLY:
230+
return YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY;
231+
case UPDATES:
232+
return YdbTable.ChangefeedMode.Mode.MODE_UPDATES;
233+
case NEW_IMAGE:
234+
return YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE;
235+
case OLD_IMAGE:
236+
return YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE;
237+
case NEW_AND_OLD_IMAGES:
238+
return YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES;
239+
default:
240+
return YdbTable.ChangefeedMode.Mode.MODE_UNSPECIFIED;
241+
}
242+
}
243+
244+
public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
214245
YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder()
215246
.setName(changefeed.getName())
216-
.setFormat(changefeed.getFormat().toProto())
247+
.setFormat(buildChangefeedFormat(changefeed.getFormat()))
248+
.setMode(buildChangefeedMode(changefeed.getMode()))
217249
.setVirtualTimestamps(changefeed.hasVirtualTimestamps())
218-
.setInitialScan(changefeed.hasInitialScan())
219-
.setMode(changefeed.getMode().toPb());
250+
.setInitialScan(changefeed.hasInitialScan());
220251

221252
Duration retentionPeriod = changefeed.getRetentionPeriod();
222253
if (retentionPeriod != null) {
@@ -609,6 +640,87 @@ public CompletableFuture<Result<TableDescription>> describeTable(String path, De
609640
.thenApply(result -> result.map(desc -> mapDescribeTable(desc, settings)));
610641
}
611642

643+
private static TableTtl mapTtlSettings(YdbTable.TtlSettings ttl) {
644+
switch (ttl.getModeCase()) {
645+
case DATE_TYPE_COLUMN:
646+
YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn();
647+
return TableTtl
648+
.dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds())
649+
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
650+
case VALUE_SINCE_UNIX_EPOCH:
651+
YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch();
652+
TableTtl.TtlUnit unit;
653+
switch (vs.getColumnUnit()) {
654+
case UNIT_SECONDS:
655+
unit = TableTtl.TtlUnit.SECONDS;
656+
break;
657+
case UNIT_MILLISECONDS:
658+
unit = TableTtl.TtlUnit.MILLISECONDS;
659+
break;
660+
case UNIT_MICROSECONDS:
661+
unit = TableTtl.TtlUnit.MICROSECONDS;
662+
break;
663+
case UNIT_NANOSECONDS:
664+
unit = TableTtl.TtlUnit.NANOSECONDS;
665+
break;
666+
case UNIT_UNSPECIFIED:
667+
case UNRECOGNIZED:
668+
default:
669+
unit = TableTtl.TtlUnit.UNSPECIFIED;
670+
break;
671+
}
672+
return TableTtl
673+
.valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds())
674+
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
675+
case MODE_NOT_SET:
676+
default:
677+
return TableTtl.notSet();
678+
}
679+
}
680+
681+
private static Changefeed.Format mapChangefeedFormat(YdbTable.ChangefeedFormat.Format pb) {
682+
switch (pb) {
683+
case FORMAT_JSON:
684+
return Changefeed.Format.JSON;
685+
case FORMAT_DYNAMODB_STREAMS_JSON:
686+
return Changefeed.Format.DYNAMODB_STREAMS_JSON;
687+
case FORMAT_DEBEZIUM_JSON:
688+
return Changefeed.Format.DEBEZIUM_JSON;
689+
default:
690+
return null;
691+
}
692+
}
693+
694+
private static Changefeed.Mode mapChangefeedMode(YdbTable.ChangefeedMode.Mode pb) {
695+
switch (pb) {
696+
case MODE_KEYS_ONLY:
697+
return Changefeed.Mode.KEYS_ONLY;
698+
case MODE_NEW_IMAGE:
699+
return Changefeed.Mode.NEW_IMAGE;
700+
case MODE_OLD_IMAGE:
701+
return Changefeed.Mode.OLD_IMAGE;
702+
case MODE_NEW_AND_OLD_IMAGES:
703+
return Changefeed.Mode.NEW_AND_OLD_IMAGES;
704+
case MODE_UPDATES:
705+
return Changefeed.Mode.UPDATES;
706+
default:
707+
return null;
708+
}
709+
}
710+
711+
private static ChangefeedDescription.State mapChangefeedState(YdbTable.ChangefeedDescription.State pb) {
712+
switch (pb) {
713+
case STATE_ENABLED:
714+
return ChangefeedDescription.State.ENABLED;
715+
case STATE_DISABLED:
716+
return ChangefeedDescription.State.DISABLED;
717+
case STATE_INITIAL_SCAN:
718+
return ChangefeedDescription.State.INITIAL_SCAN;
719+
default:
720+
return null;
721+
}
722+
}
723+
612724
private static TableDescription mapDescribeTable(
613725
YdbTable.DescribeTableResult result,
614726
DescribeTableSettings describeTableSettings
@@ -702,47 +814,16 @@ private static TableDescription mapDescribeTable(
702814
}
703815
}
704816

705-
YdbTable.TtlSettings ttl = result.getTtlSettings();
706-
TableTtl tableTtl;
707-
switch (ttl.getModeCase()) {
708-
case DATE_TYPE_COLUMN:
709-
YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn();
710-
tableTtl = TableTtl
711-
.dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds())
712-
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
713-
break;
714-
case VALUE_SINCE_UNIX_EPOCH:
715-
YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch();
716-
TableTtl.TtlUnit unit;
717-
switch (vs.getColumnUnit()) {
718-
case UNIT_SECONDS:
719-
unit = TableTtl.TtlUnit.SECONDS;
720-
break;
721-
case UNIT_MILLISECONDS:
722-
unit = TableTtl.TtlUnit.MILLISECONDS;
723-
break;
724-
case UNIT_MICROSECONDS:
725-
unit = TableTtl.TtlUnit.MICROSECONDS;
726-
break;
727-
case UNIT_NANOSECONDS:
728-
unit = TableTtl.TtlUnit.NANOSECONDS;
729-
break;
730-
case UNIT_UNSPECIFIED:
731-
case UNRECOGNIZED:
732-
default:
733-
unit = TableTtl.TtlUnit.UNSPECIFIED;
734-
break;
735-
}
736-
tableTtl = TableTtl
737-
.valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds())
738-
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
739-
break;
740-
case MODE_NOT_SET:
741-
default:
742-
tableTtl = TableTtl.notSet();
743-
break;
817+
description.setTtlSettings(mapTtlSettings(result.getTtlSettings()));
818+
for (YdbTable.ChangefeedDescription pb: result.getChangefeedsList()) {
819+
description.addChangefeed(new ChangefeedDescription(
820+
pb.getName(),
821+
mapChangefeedMode(pb.getMode()),
822+
mapChangefeedFormat(pb.getFormat()),
823+
mapChangefeedState(pb.getState()),
824+
pb.getVirtualTimestamps()
825+
));
744826
}
745-
description.setTtlSettings(tableTtl);
746827

747828
return description.build();
748829
}

0 commit comments

Comments
 (0)