diff --git a/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java b/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java
new file mode 100644
index 000000000..2039820eb
--- /dev/null
+++ b/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java
@@ -0,0 +1,109 @@
+package tech.ydb.table.description;
+
+import java.util.Objects;
+
+import tech.ydb.table.settings.Changefeed;
+
+/**
+ *
+ * @author Aleksandr Gorshenin
+ */
+public class ChangefeedDescription {
+ public enum State {
+ /**
+ * Normal state, from this state changefeed can be disabled
+ */
+ ENABLED,
+ /**
+ * No new change records are generated, but the old ones remain available
+ * From this state changefeed cannot be switched to any other state
+ */
+ DISABLED,
+ /**
+ * An initial scan is being performed.
After its completion changefeed will switch to the normal state
+ */
+ INITIAL_SCAN;
+ }
+
+ private final String name;
+ private final Changefeed.Mode mode;
+ private final Changefeed.Format format;
+ private final State state;
+ private final boolean virtualTimestamps;
+
+ public ChangefeedDescription(String name, Changefeed.Mode mode, Changefeed.Format format, State state,
+ boolean virtualTimestamps) {
+ this.name = name;
+ this.mode = mode;
+ this.format = format;
+ this.state = state;
+ this.virtualTimestamps = virtualTimestamps;
+ }
+
+ /**
+ * @return Name of the feed
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return Mode specifies the information that will be written to the feed
+ */
+ public Changefeed.Mode getMode() {
+ return mode;
+ }
+
+ /**
+ * @return Format of the data
+ */
+ public Changefeed.Format getFormat() {
+ return format;
+ }
+
+ /**
+ * @return State of the feed
+ */
+ public State getState() {
+ return state;
+ }
+
+ /**
+ * @return State of emitting of virtual timestamps along with data
+ */
+ public boolean hasVirtualTimestamps() {
+ return virtualTimestamps;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, mode, format, state, virtualTimestamps);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ChangefeedDescription cd = (ChangefeedDescription) o;
+ return Objects.equals(name, cd.name)
+ && mode == cd.mode
+ && format == cd.format
+ && state == cd.state
+ && virtualTimestamps == cd.virtualTimestamps;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder("Changefeed['").append(name)
+ .append("']{state=").append(state)
+ .append(", format=").append(format)
+ .append(", mode=").append(mode)
+ .append(", virtual timestamps=").append(virtualTimestamps)
+ .append("}").toString();
+ }
+}
diff --git a/table/src/main/java/tech/ydb/table/description/TableDescription.java b/table/src/main/java/tech/ydb/table/description/TableDescription.java
index a08366d6b..de1c091cb 100644
--- a/table/src/main/java/tech/ydb/table/description/TableDescription.java
+++ b/table/src/main/java/tech/ydb/table/description/TableDescription.java
@@ -13,7 +13,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
+import tech.ydb.table.Session;
import tech.ydb.table.description.TableTtl.TtlMode;
+import tech.ydb.table.settings.AlterTableSettings;
import tech.ydb.table.settings.PartitioningSettings;
import tech.ydb.table.values.OptionalType;
import tech.ydb.table.values.Type;
@@ -28,6 +30,7 @@ public class TableDescription {
private final List indexes;
private final List columnFamilies;
private final List keyRanges;
+ private final List changefeeds;
@Nullable
private final TableStats tableStats;
@@ -49,6 +52,7 @@ private TableDescription(Builder builder) {
this.partitioningSettings = builder.partitioningSettings;
this.partitionStats = ImmutableList.copyOf(builder.partitionStats);
this.tableTtl = builder.ttlSettings;
+ this.changefeeds = builder.changefeeds;
}
public static Builder newBuilder() {
@@ -93,6 +97,10 @@ public TableTtl getTableTtl() {
return tableTtl;
}
+ public List getChangefeeds() {
+ return changefeeds;
+ }
+
/**
* BUILDER
*/
@@ -108,6 +116,7 @@ public static class Builder {
private PartitioningSettings partitioningSettings = null;
private final List partitionStats = new ArrayList<>();
private TableTtl ttlSettings = TableTtl.notSet();
+ private final List changefeeds = new ArrayList<>();
public Builder addNonnullColumn(String name, Type type) {
return addNonnullColumn(name, type, null);
@@ -222,6 +231,21 @@ public Builder addPartitionStat(long rows, long size) {
return this;
}
+ /**
+ * Adds changefeed information to table description
+ * NOTICE: Changefeed description cannot be used for changefeed creating.
+ * It is impossible to create changefeed in one time with table creating. For adding a new changefeed use method
+ * {@link Session#alterTable(java.lang.String, tech.ydb.table.settings.AlterTableSettings)} and
+ * {@link AlterTableSettings#addChangefeed(tech.ydb.table.settings.Changefeed)}
+ *
+ * @param changefeed changefeed description
+ * @return table description builder
+ */
+ public Builder addChangefeed(ChangefeedDescription changefeed) {
+ this.changefeeds.add(changefeed);
+ return this;
+ }
+
@Deprecated
public Builder setTtlSettings(int ttlModeCase, String columnName, int expireAfterSeconds) {
this.ttlSettings = new TableTtl(TtlMode.forCase(ttlModeCase), columnName, expireAfterSeconds);
diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java
index 98bbd8939..3465f2f8f 100644
--- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java
+++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java
@@ -39,6 +39,7 @@
import tech.ydb.proto.common.CommonProtos;
import tech.ydb.proto.table.YdbTable;
import tech.ydb.table.Session;
+import tech.ydb.table.description.ChangefeedDescription;
import tech.ydb.table.description.ColumnFamily;
import tech.ydb.table.description.KeyBound;
import tech.ydb.table.description.KeyRange;
@@ -210,13 +211,43 @@ private static YdbTable.ColumnMeta buildColumnMeta(TableColumn column) {
return builder.build();
}
- private static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
+ public static YdbTable.ChangefeedFormat.Format buildChangefeedFormat(Changefeed.Format format) {
+ switch (format) {
+ case JSON:
+ return YdbTable.ChangefeedFormat.Format.FORMAT_JSON;
+ case DYNAMODB_STREAMS_JSON:
+ return YdbTable.ChangefeedFormat.Format.FORMAT_DYNAMODB_STREAMS_JSON;
+ case DEBEZIUM_JSON:
+ return YdbTable.ChangefeedFormat.Format.FORMAT_DEBEZIUM_JSON;
+ default:
+ return YdbTable.ChangefeedFormat.Format.FORMAT_UNSPECIFIED;
+ }
+ }
+
+ public static YdbTable.ChangefeedMode.Mode buildChangefeedMode(Changefeed.Mode mode) {
+ switch (mode) {
+ case KEYS_ONLY:
+ return YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY;
+ case UPDATES:
+ return YdbTable.ChangefeedMode.Mode.MODE_UPDATES;
+ case NEW_IMAGE:
+ return YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE;
+ case OLD_IMAGE:
+ return YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE;
+ case NEW_AND_OLD_IMAGES:
+ return YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES;
+ default:
+ return YdbTable.ChangefeedMode.Mode.MODE_UNSPECIFIED;
+ }
+ }
+
+ public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder()
.setName(changefeed.getName())
- .setFormat(changefeed.getFormat().toProto())
+ .setFormat(buildChangefeedFormat(changefeed.getFormat()))
+ .setMode(buildChangefeedMode(changefeed.getMode()))
.setVirtualTimestamps(changefeed.hasVirtualTimestamps())
- .setInitialScan(changefeed.hasInitialScan())
- .setMode(changefeed.getMode().toPb());
+ .setInitialScan(changefeed.hasInitialScan());
Duration retentionPeriod = changefeed.getRetentionPeriod();
if (retentionPeriod != null) {
@@ -609,6 +640,87 @@ public CompletableFuture> describeTable(String path, De
.thenApply(result -> result.map(desc -> mapDescribeTable(desc, settings)));
}
+ private static TableTtl mapTtlSettings(YdbTable.TtlSettings ttl) {
+ switch (ttl.getModeCase()) {
+ case DATE_TYPE_COLUMN:
+ YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn();
+ return TableTtl
+ .dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds())
+ .withRunIntervalSeconds(ttl.getRunIntervalSeconds());
+ case VALUE_SINCE_UNIX_EPOCH:
+ YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch();
+ TableTtl.TtlUnit unit;
+ switch (vs.getColumnUnit()) {
+ case UNIT_SECONDS:
+ unit = TableTtl.TtlUnit.SECONDS;
+ break;
+ case UNIT_MILLISECONDS:
+ unit = TableTtl.TtlUnit.MILLISECONDS;
+ break;
+ case UNIT_MICROSECONDS:
+ unit = TableTtl.TtlUnit.MICROSECONDS;
+ break;
+ case UNIT_NANOSECONDS:
+ unit = TableTtl.TtlUnit.NANOSECONDS;
+ break;
+ case UNIT_UNSPECIFIED:
+ case UNRECOGNIZED:
+ default:
+ unit = TableTtl.TtlUnit.UNSPECIFIED;
+ break;
+ }
+ return TableTtl
+ .valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds())
+ .withRunIntervalSeconds(ttl.getRunIntervalSeconds());
+ case MODE_NOT_SET:
+ default:
+ return TableTtl.notSet();
+ }
+ }
+
+ private static Changefeed.Format mapChangefeedFormat(YdbTable.ChangefeedFormat.Format pb) {
+ switch (pb) {
+ case FORMAT_JSON:
+ return Changefeed.Format.JSON;
+ case FORMAT_DYNAMODB_STREAMS_JSON:
+ return Changefeed.Format.DYNAMODB_STREAMS_JSON;
+ case FORMAT_DEBEZIUM_JSON:
+ return Changefeed.Format.DEBEZIUM_JSON;
+ default:
+ return null;
+ }
+ }
+
+ private static Changefeed.Mode mapChangefeedMode(YdbTable.ChangefeedMode.Mode pb) {
+ switch (pb) {
+ case MODE_KEYS_ONLY:
+ return Changefeed.Mode.KEYS_ONLY;
+ case MODE_NEW_IMAGE:
+ return Changefeed.Mode.NEW_IMAGE;
+ case MODE_OLD_IMAGE:
+ return Changefeed.Mode.OLD_IMAGE;
+ case MODE_NEW_AND_OLD_IMAGES:
+ return Changefeed.Mode.NEW_AND_OLD_IMAGES;
+ case MODE_UPDATES:
+ return Changefeed.Mode.UPDATES;
+ default:
+ return null;
+ }
+ }
+
+ private static ChangefeedDescription.State mapChangefeedState(YdbTable.ChangefeedDescription.State pb) {
+ switch (pb) {
+ case STATE_ENABLED:
+ return ChangefeedDescription.State.ENABLED;
+ case STATE_DISABLED:
+ return ChangefeedDescription.State.DISABLED;
+ case STATE_INITIAL_SCAN:
+ return ChangefeedDescription.State.INITIAL_SCAN;
+ default:
+ return null;
+ }
+ }
+
private static TableDescription mapDescribeTable(
YdbTable.DescribeTableResult result,
DescribeTableSettings describeTableSettings
@@ -702,47 +814,16 @@ private static TableDescription mapDescribeTable(
}
}
- YdbTable.TtlSettings ttl = result.getTtlSettings();
- TableTtl tableTtl;
- switch (ttl.getModeCase()) {
- case DATE_TYPE_COLUMN:
- YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn();
- tableTtl = TableTtl
- .dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds())
- .withRunIntervalSeconds(ttl.getRunIntervalSeconds());
- break;
- case VALUE_SINCE_UNIX_EPOCH:
- YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch();
- TableTtl.TtlUnit unit;
- switch (vs.getColumnUnit()) {
- case UNIT_SECONDS:
- unit = TableTtl.TtlUnit.SECONDS;
- break;
- case UNIT_MILLISECONDS:
- unit = TableTtl.TtlUnit.MILLISECONDS;
- break;
- case UNIT_MICROSECONDS:
- unit = TableTtl.TtlUnit.MICROSECONDS;
- break;
- case UNIT_NANOSECONDS:
- unit = TableTtl.TtlUnit.NANOSECONDS;
- break;
- case UNIT_UNSPECIFIED:
- case UNRECOGNIZED:
- default:
- unit = TableTtl.TtlUnit.UNSPECIFIED;
- break;
- }
- tableTtl = TableTtl
- .valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds())
- .withRunIntervalSeconds(ttl.getRunIntervalSeconds());
- break;
- case MODE_NOT_SET:
- default:
- tableTtl = TableTtl.notSet();
- break;
+ description.setTtlSettings(mapTtlSettings(result.getTtlSettings()));
+ for (YdbTable.ChangefeedDescription pb: result.getChangefeedsList()) {
+ description.addChangefeed(new ChangefeedDescription(
+ pb.getName(),
+ mapChangefeedMode(pb.getMode()),
+ mapChangefeedFormat(pb.getFormat()),
+ mapChangefeedState(pb.getState()),
+ pb.getVirtualTimestamps()
+ ));
}
- description.setTtlSettings(tableTtl);
return description.build();
}
diff --git a/table/src/main/java/tech/ydb/table/settings/Changefeed.java b/table/src/main/java/tech/ydb/table/settings/Changefeed.java
index eb7335bd5..b1b7172ae 100644
--- a/table/src/main/java/tech/ydb/table/settings/Changefeed.java
+++ b/table/src/main/java/tech/ydb/table/settings/Changefeed.java
@@ -3,41 +3,38 @@
import java.time.Duration;
import java.util.Objects;
-import tech.ydb.proto.table.YdbTable;
+import tech.ydb.table.description.ChangefeedDescription;
+import tech.ydb.table.impl.BaseSession;
+
/**
* @author Egor Litvinenko
*/
public class Changefeed {
public enum Mode {
- KEYS_ONLY(YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY),
- UPDATES(YdbTable.ChangefeedMode.Mode.MODE_UPDATES),
- NEW_IMAGE(YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE),
- OLD_IMAGE(YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE),
- NEW_AND_OLD_IMAGES(YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES);
-
- private final YdbTable.ChangefeedMode.Mode proto;
-
- Mode(YdbTable.ChangefeedMode.Mode proto) {
- this.proto = proto;
- }
-
- public YdbTable.ChangefeedMode.Mode toPb() {
- return proto;
+ KEYS_ONLY,
+ UPDATES,
+ NEW_IMAGE,
+ OLD_IMAGE,
+ NEW_AND_OLD_IMAGES;
+
+ @Deprecated
+ public tech.ydb.proto.table.YdbTable.ChangefeedMode.Mode toPb() {
+ return BaseSession.buildChangefeedMode(this);
}
}
public enum Format {
- JSON(YdbTable.ChangefeedFormat.Format.FORMAT_JSON);
-
- private final YdbTable.ChangefeedFormat.Format proto;
-
- Format(YdbTable.ChangefeedFormat.Format proto) {
- this.proto = proto;
- }
-
- public YdbTable.ChangefeedFormat.Format toProto() {
- return proto;
+ /** Change record in JSON format for common (row oriented) tables */
+ JSON,
+ /** Change record in JSON format for document (DynamoDB-compatible) tables */
+ DYNAMODB_STREAMS_JSON,
+ /** Debezium-like change record JSON format for common (row oriented) tables */
+ DEBEZIUM_JSON;
+
+ @Deprecated
+ public tech.ydb.proto.table.YdbTable.ChangefeedFormat.Format toProto() {
+ return BaseSession.buildChangefeedFormat(this);
}
}
@@ -82,22 +79,12 @@ public Duration getRetentionPeriod() {
}
@Deprecated
- public YdbTable.Changefeed toProto() {
- YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder()
- .setName(name)
- .setFormat(format.toProto())
- .setVirtualTimestamps(virtualTimestamps)
- .setInitialScan(initialScan)
- .setMode(mode.toPb());
-
- if (retentionPeriod != null) {
- builder.setRetentionPeriod(com.google.protobuf.Duration.newBuilder()
- .setSeconds(retentionPeriod.getSeconds())
- .setNanos(retentionPeriod.getNano())
- .build());
- }
+ public tech.ydb.proto.table.YdbTable.Changefeed toProto() {
+ return BaseSession.buildChangefeed(this);
+ }
- return builder.build();
+ public static Builder fromDescription(ChangefeedDescription description) {
+ return new Builder(description);
}
public static Builder newBuilder(String changefeedName) {
@@ -112,6 +99,13 @@ public static class Builder {
private Duration retentionPeriod = null;
private boolean initialScan = false;
+ private Builder(ChangefeedDescription description) {
+ this.name = description.getName();
+ this.mode = description.getMode();
+ this.format = description.getFormat();
+ this.virtualTimestamps = description.hasVirtualTimestamps();
+ }
+
private Builder(String name) {
this.name = Objects.requireNonNull(name);
}
diff --git a/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java b/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java
new file mode 100644
index 000000000..9a719d165
--- /dev/null
+++ b/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java
@@ -0,0 +1,196 @@
+package tech.ydb.table.integration;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.table.SessionRetryContext;
+import tech.ydb.table.TableClient;
+import tech.ydb.table.description.ChangefeedDescription;
+import tech.ydb.table.description.TableDescription;
+import tech.ydb.table.settings.AlterTableSettings;
+import tech.ydb.table.settings.Changefeed;
+import tech.ydb.table.settings.CreateTableSettings;
+import tech.ydb.table.values.PrimitiveType;
+import tech.ydb.test.junit4.GrpcTransportRule;
+
+/**
+ *
+ * @author Aleksandr Gorshenin
+ */
+public class ChangefeedTableTest {
+ @ClassRule
+ public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
+
+ private final String TABLE_NAME = "table_changefeed_test";
+
+ private final TableClient tableClient = TableClient.newClient(ydbTransport).build();
+
+ private final SessionRetryContext ctx = SessionRetryContext.create(tableClient).build();
+
+ private final String tablePath = ydbTransport.getDatabase() + "/" + TABLE_NAME;
+
+ @After
+ public void dropTable() {
+ ctx.supplyStatus(session -> session.dropTable(tablePath)).join();
+ tableClient.close();
+ }
+
+ @Test
+ public void tableChangefeedsErrorsTest() {
+ TableDescription tableDescription = TableDescription.newBuilder()
+ .addNonnullColumn("id", PrimitiveType.Uint64)
+ .addNullableColumn("code", PrimitiveType.Text)
+ .setPrimaryKey("id")
+ .build();
+
+ Status createStatus = ctx.supplyStatus(
+ session -> session.createTable(tablePath, tableDescription, new CreateTableSettings())
+ ).join();
+ Assert.assertTrue("Create table " + createStatus, createStatus.isSuccess());
+
+ Status alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .addChangefeed(Changefeed.newBuilder("change1").build())
+ .addChangefeed(Changefeed.newBuilder("change2").build())
+ )).join();
+
+ Assert.assertFalse("Alter table add multipli changefeeds", alterStatus.isSuccess());
+ Assert.assertEquals("Status{code = UNSUPPORTED(code=400180), issues = "
+ + "[Only one changefeed can be added by one operation (S_ERROR)]}", alterStatus.toString());
+
+ alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .dropChangefeed("change1")
+ .addChangefeed(Changefeed.newBuilder("test1")
+ .withInitialScan(false)
+ .withMode(Changefeed.Mode.UPDATES)
+ .withVirtualTimestamps(false)
+ .build())
+ )).join();
+ Assert.assertFalse("Alter table mixed alter", alterStatus.isSuccess());
+ Assert.assertEquals("Status{code = UNSUPPORTED(code=400180), issues = "
+ + "[Mixed alter is unsupported (S_ERROR)]}", alterStatus.toString());
+
+ alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .dropChangefeed("change1")
+ .dropChangefeed("change2")
+ )).join();
+ Assert.assertFalse("Alter table drop multipli changefeeds", alterStatus.isSuccess());
+ Assert.assertEquals("Status{code = UNSUPPORTED(code=400180), issues = "
+ + "[Only one changefeed can be removed by one operation (S_ERROR)]}", alterStatus.toString());
+ }
+
+ @Test
+ public void tableChangefeedsTest() {
+ // --------------------- create tables -----------------------------
+ TableDescription tableDescription = TableDescription.newBuilder()
+ .addNonnullColumn("id", PrimitiveType.Uint64)
+ .addNullableColumn("code", PrimitiveType.Text)
+ .addNullableColumn("size", PrimitiveType.Float)
+ .addNullableColumn("created", PrimitiveType.Timestamp)
+ .addNullableColumn("data", PrimitiveType.Text)
+ .setPrimaryKey("id")
+ .build();
+
+ Status createStatus = ctx.supplyStatus(
+ session -> session.createTable(tablePath, tableDescription, new CreateTableSettings())
+ ).join();
+ Assert.assertTrue("Create table " + createStatus, createStatus.isSuccess());
+
+ // --------------------- describe table after creating -----------------------------
+ Result describeResult = ctx.supplyResult(session ->session.describeTable(tablePath)).join();
+ Assert.assertTrue("Describe table " + describeResult.getStatus(), describeResult.isSuccess());
+
+ // No changefeeds
+ Assert.assertTrue(describeResult.getValue().getChangefeeds().isEmpty());
+
+ // --------------------- alter table with add changefeeds -----------------------------
+ // only one changefeed can be added by one operation
+
+ Status alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .addChangefeed(Changefeed.newBuilder("change1").build())
+ )).join();
+ Assert.assertTrue("Alter table changefeed 1 " + alterStatus, alterStatus.isSuccess());
+
+ alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .addChangefeed(Changefeed.newBuilder("change2")
+ .withFormat(Changefeed.Format.JSON)
+ .withInitialScan(true)
+ .withMode(Changefeed.Mode.NEW_AND_OLD_IMAGES)
+ .withVirtualTimestamps(true)
+ .withRetentionPeriod(Duration.ofDays(5))
+ .build())
+ )).join();
+ Assert.assertTrue("Alter table changefeed 2 " + alterStatus, alterStatus.isSuccess());
+
+ // --------------------- describe table after create new changefeeds -----------------------------
+ describeResult = ctx.supplyResult(session ->session.describeTable(tablePath)).join();
+ Assert.assertTrue("Describe table after altering " + describeResult.getStatus(), describeResult.isSuccess());
+
+ List changefeeds = describeResult.getValue().getChangefeeds();
+
+ Assert.assertEquals(2, changefeeds.size());
+
+ ChangefeedDescription ch1 = changefeeds.get(0);
+ Assert.assertEquals("change1", ch1.getName());
+ Assert.assertEquals(Changefeed.Mode.KEYS_ONLY, ch1.getMode());
+ Assert.assertEquals(Changefeed.Format.JSON, ch1.getFormat());
+ Assert.assertEquals(ChangefeedDescription.State.ENABLED, ch1.getState());
+ Assert.assertFalse(ch1.hasVirtualTimestamps());
+ Assert.assertEquals(
+ "Changefeed['change1']{state=ENABLED, format=JSON, mode=KEYS_ONLY, virtual timestamps=false}",
+ ch1.toString()
+ );
+
+ ChangefeedDescription ch2 = changefeeds.get(1);
+ Assert.assertEquals("change2", ch2.getName());
+ Assert.assertEquals(Changefeed.Mode.NEW_AND_OLD_IMAGES, ch2.getMode());
+ Assert.assertEquals(Changefeed.Format.JSON, ch2.getFormat());
+ Assert.assertTrue(ch2.hasVirtualTimestamps());
+
+ // State may be flaky
+ Assert.assertTrue(ch2.getState() == ChangefeedDescription.State.INITIAL_SCAN ||
+ ch2.getState() == ChangefeedDescription.State.ENABLED);
+
+ // --------------------- drop one changefeed and add another -------------------------------------
+ alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .dropChangefeed("change2")
+ )).join();
+ Assert.assertTrue("Alter table changefeed 3 " + alterStatus, alterStatus.isSuccess());
+
+ alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings()
+ .addChangefeed(Changefeed.fromDescription(ch2)
+ .withInitialScan(false)
+ .withMode(Changefeed.Mode.UPDATES)
+ .withVirtualTimestamps(false)
+ .build())
+ )).join();
+ Assert.assertTrue("Alter table changefeed 3 " + alterStatus, alterStatus.isSuccess());
+
+ describeResult = ctx.supplyResult(session ->session.describeTable(tablePath)).join();
+ Assert.assertTrue("Describe table after altering " + describeResult.getStatus(), describeResult.isSuccess());
+
+ changefeeds = describeResult.getValue().getChangefeeds();
+
+ Assert.assertEquals(2, changefeeds.size());
+
+ ch1 = changefeeds.get(0);
+ Assert.assertEquals(ch1, changefeeds.get(0));
+ Assert.assertEquals(Changefeed.Mode.KEYS_ONLY, ch1.getMode());
+ Assert.assertEquals(Changefeed.Format.JSON, ch1.getFormat());
+ Assert.assertEquals(ChangefeedDescription.State.ENABLED, ch1.getState());
+ Assert.assertFalse(ch1.hasVirtualTimestamps());
+
+ ChangefeedDescription ch3 = changefeeds.get(1);
+ Assert.assertNotEquals(ch2, ch3);
+ Assert.assertEquals("change2", ch3.getName());
+ Assert.assertEquals(Changefeed.Mode.UPDATES, ch3.getMode());
+ Assert.assertEquals(Changefeed.Format.JSON, ch3.getFormat());
+ Assert.assertFalse(ch3.hasVirtualTimestamps());
+ }
+}