From 46c58b68edeaddf7886b6854bc0b04c41d160363 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 10 Feb 2025 11:36:08 +0000 Subject: [PATCH] Added changefeeds to result of describeTable method --- .../description/ChangefeedDescription.java | 109 ++++++++++ .../table/description/TableDescription.java | 24 +++ .../java/tech/ydb/table/impl/BaseSession.java | 169 +++++++++++---- .../tech/ydb/table/settings/Changefeed.java | 74 +++---- .../integration/ChangefeedTableTest.java | 196 ++++++++++++++++++ 5 files changed, 488 insertions(+), 84 deletions(-) create mode 100644 table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java create mode 100644 table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java diff --git a/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java b/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java new file mode 100644 index 000000000..2039820eb --- /dev/null +++ b/table/src/main/java/tech/ydb/table/description/ChangefeedDescription.java @@ -0,0 +1,109 @@ +package tech.ydb.table.description; + +import java.util.Objects; + +import tech.ydb.table.settings.Changefeed; + +/** + * + * @author Aleksandr Gorshenin + */ +public class ChangefeedDescription { + public enum State { + /** + * Normal state, from this state changefeed can be disabled + */ + ENABLED, + /** + * No new change records are generated, but the old ones remain available
+ * From this state changefeed cannot be switched to any other state + */ + DISABLED, + /** + * An initial scan is being performed.
After its completion changefeed will switch to the normal state + */ + INITIAL_SCAN; + } + + private final String name; + private final Changefeed.Mode mode; + private final Changefeed.Format format; + private final State state; + private final boolean virtualTimestamps; + + public ChangefeedDescription(String name, Changefeed.Mode mode, Changefeed.Format format, State state, + boolean virtualTimestamps) { + this.name = name; + this.mode = mode; + this.format = format; + this.state = state; + this.virtualTimestamps = virtualTimestamps; + } + + /** + * @return Name of the feed + */ + public String getName() { + return name; + } + + /** + * @return Mode specifies the information that will be written to the feed + */ + public Changefeed.Mode getMode() { + return mode; + } + + /** + * @return Format of the data + */ + public Changefeed.Format getFormat() { + return format; + } + + /** + * @return State of the feed + */ + public State getState() { + return state; + } + + /** + * @return State of emitting of virtual timestamps along with data + */ + public boolean hasVirtualTimestamps() { + return virtualTimestamps; + } + + @Override + public int hashCode() { + return Objects.hash(name, mode, format, state, virtualTimestamps); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ChangefeedDescription cd = (ChangefeedDescription) o; + return Objects.equals(name, cd.name) + && mode == cd.mode + && format == cd.format + && state == cd.state + && virtualTimestamps == cd.virtualTimestamps; + } + + @Override + public String toString() { + return new StringBuilder("Changefeed['").append(name) + .append("']{state=").append(state) + .append(", format=").append(format) + .append(", mode=").append(mode) + .append(", virtual timestamps=").append(virtualTimestamps) + .append("}").toString(); + } +} diff --git a/table/src/main/java/tech/ydb/table/description/TableDescription.java b/table/src/main/java/tech/ydb/table/description/TableDescription.java index a08366d6b..de1c091cb 100644 --- a/table/src/main/java/tech/ydb/table/description/TableDescription.java +++ b/table/src/main/java/tech/ydb/table/description/TableDescription.java @@ -13,7 +13,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import tech.ydb.table.Session; import tech.ydb.table.description.TableTtl.TtlMode; +import tech.ydb.table.settings.AlterTableSettings; import tech.ydb.table.settings.PartitioningSettings; import tech.ydb.table.values.OptionalType; import tech.ydb.table.values.Type; @@ -28,6 +30,7 @@ public class TableDescription { private final List indexes; private final List columnFamilies; private final List keyRanges; + private final List changefeeds; @Nullable private final TableStats tableStats; @@ -49,6 +52,7 @@ private TableDescription(Builder builder) { this.partitioningSettings = builder.partitioningSettings; this.partitionStats = ImmutableList.copyOf(builder.partitionStats); this.tableTtl = builder.ttlSettings; + this.changefeeds = builder.changefeeds; } public static Builder newBuilder() { @@ -93,6 +97,10 @@ public TableTtl getTableTtl() { return tableTtl; } + public List getChangefeeds() { + return changefeeds; + } + /** * BUILDER */ @@ -108,6 +116,7 @@ public static class Builder { private PartitioningSettings partitioningSettings = null; private final List partitionStats = new ArrayList<>(); private TableTtl ttlSettings = TableTtl.notSet(); + private final List changefeeds = new ArrayList<>(); public Builder addNonnullColumn(String name, Type type) { return addNonnullColumn(name, type, null); @@ -222,6 +231,21 @@ public Builder addPartitionStat(long rows, long size) { return this; } + /** + * Adds changefeed information to table description
+ * NOTICE: Changefeed description cannot be used for changefeed creating.
+ * It is impossible to create changefeed in one time with table creating. For adding a new changefeed use method + * {@link Session#alterTable(java.lang.String, tech.ydb.table.settings.AlterTableSettings)} and + * {@link AlterTableSettings#addChangefeed(tech.ydb.table.settings.Changefeed)} + * + * @param changefeed changefeed description + * @return table description builder + */ + public Builder addChangefeed(ChangefeedDescription changefeed) { + this.changefeeds.add(changefeed); + return this; + } + @Deprecated public Builder setTtlSettings(int ttlModeCase, String columnName, int expireAfterSeconds) { this.ttlSettings = new TableTtl(TtlMode.forCase(ttlModeCase), columnName, expireAfterSeconds); diff --git a/table/src/main/java/tech/ydb/table/impl/BaseSession.java b/table/src/main/java/tech/ydb/table/impl/BaseSession.java index 98bbd8939..3465f2f8f 100644 --- a/table/src/main/java/tech/ydb/table/impl/BaseSession.java +++ b/table/src/main/java/tech/ydb/table/impl/BaseSession.java @@ -39,6 +39,7 @@ import tech.ydb.proto.common.CommonProtos; import tech.ydb.proto.table.YdbTable; import tech.ydb.table.Session; +import tech.ydb.table.description.ChangefeedDescription; import tech.ydb.table.description.ColumnFamily; import tech.ydb.table.description.KeyBound; import tech.ydb.table.description.KeyRange; @@ -210,13 +211,43 @@ private static YdbTable.ColumnMeta buildColumnMeta(TableColumn column) { return builder.build(); } - private static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) { + public static YdbTable.ChangefeedFormat.Format buildChangefeedFormat(Changefeed.Format format) { + switch (format) { + case JSON: + return YdbTable.ChangefeedFormat.Format.FORMAT_JSON; + case DYNAMODB_STREAMS_JSON: + return YdbTable.ChangefeedFormat.Format.FORMAT_DYNAMODB_STREAMS_JSON; + case DEBEZIUM_JSON: + return YdbTable.ChangefeedFormat.Format.FORMAT_DEBEZIUM_JSON; + default: + return YdbTable.ChangefeedFormat.Format.FORMAT_UNSPECIFIED; + } + } + + public static YdbTable.ChangefeedMode.Mode buildChangefeedMode(Changefeed.Mode mode) { + switch (mode) { + case KEYS_ONLY: + return YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY; + case UPDATES: + return YdbTable.ChangefeedMode.Mode.MODE_UPDATES; + case NEW_IMAGE: + return YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE; + case OLD_IMAGE: + return YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE; + case NEW_AND_OLD_IMAGES: + return YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES; + default: + return YdbTable.ChangefeedMode.Mode.MODE_UNSPECIFIED; + } + } + + public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) { YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder() .setName(changefeed.getName()) - .setFormat(changefeed.getFormat().toProto()) + .setFormat(buildChangefeedFormat(changefeed.getFormat())) + .setMode(buildChangefeedMode(changefeed.getMode())) .setVirtualTimestamps(changefeed.hasVirtualTimestamps()) - .setInitialScan(changefeed.hasInitialScan()) - .setMode(changefeed.getMode().toPb()); + .setInitialScan(changefeed.hasInitialScan()); Duration retentionPeriod = changefeed.getRetentionPeriod(); if (retentionPeriod != null) { @@ -609,6 +640,87 @@ public CompletableFuture> describeTable(String path, De .thenApply(result -> result.map(desc -> mapDescribeTable(desc, settings))); } + private static TableTtl mapTtlSettings(YdbTable.TtlSettings ttl) { + switch (ttl.getModeCase()) { + case DATE_TYPE_COLUMN: + YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn(); + return TableTtl + .dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds()) + .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); + case VALUE_SINCE_UNIX_EPOCH: + YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch(); + TableTtl.TtlUnit unit; + switch (vs.getColumnUnit()) { + case UNIT_SECONDS: + unit = TableTtl.TtlUnit.SECONDS; + break; + case UNIT_MILLISECONDS: + unit = TableTtl.TtlUnit.MILLISECONDS; + break; + case UNIT_MICROSECONDS: + unit = TableTtl.TtlUnit.MICROSECONDS; + break; + case UNIT_NANOSECONDS: + unit = TableTtl.TtlUnit.NANOSECONDS; + break; + case UNIT_UNSPECIFIED: + case UNRECOGNIZED: + default: + unit = TableTtl.TtlUnit.UNSPECIFIED; + break; + } + return TableTtl + .valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds()) + .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); + case MODE_NOT_SET: + default: + return TableTtl.notSet(); + } + } + + private static Changefeed.Format mapChangefeedFormat(YdbTable.ChangefeedFormat.Format pb) { + switch (pb) { + case FORMAT_JSON: + return Changefeed.Format.JSON; + case FORMAT_DYNAMODB_STREAMS_JSON: + return Changefeed.Format.DYNAMODB_STREAMS_JSON; + case FORMAT_DEBEZIUM_JSON: + return Changefeed.Format.DEBEZIUM_JSON; + default: + return null; + } + } + + private static Changefeed.Mode mapChangefeedMode(YdbTable.ChangefeedMode.Mode pb) { + switch (pb) { + case MODE_KEYS_ONLY: + return Changefeed.Mode.KEYS_ONLY; + case MODE_NEW_IMAGE: + return Changefeed.Mode.NEW_IMAGE; + case MODE_OLD_IMAGE: + return Changefeed.Mode.OLD_IMAGE; + case MODE_NEW_AND_OLD_IMAGES: + return Changefeed.Mode.NEW_AND_OLD_IMAGES; + case MODE_UPDATES: + return Changefeed.Mode.UPDATES; + default: + return null; + } + } + + private static ChangefeedDescription.State mapChangefeedState(YdbTable.ChangefeedDescription.State pb) { + switch (pb) { + case STATE_ENABLED: + return ChangefeedDescription.State.ENABLED; + case STATE_DISABLED: + return ChangefeedDescription.State.DISABLED; + case STATE_INITIAL_SCAN: + return ChangefeedDescription.State.INITIAL_SCAN; + default: + return null; + } + } + private static TableDescription mapDescribeTable( YdbTable.DescribeTableResult result, DescribeTableSettings describeTableSettings @@ -702,47 +814,16 @@ private static TableDescription mapDescribeTable( } } - YdbTable.TtlSettings ttl = result.getTtlSettings(); - TableTtl tableTtl; - switch (ttl.getModeCase()) { - case DATE_TYPE_COLUMN: - YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn(); - tableTtl = TableTtl - .dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds()) - .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); - break; - case VALUE_SINCE_UNIX_EPOCH: - YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch(); - TableTtl.TtlUnit unit; - switch (vs.getColumnUnit()) { - case UNIT_SECONDS: - unit = TableTtl.TtlUnit.SECONDS; - break; - case UNIT_MILLISECONDS: - unit = TableTtl.TtlUnit.MILLISECONDS; - break; - case UNIT_MICROSECONDS: - unit = TableTtl.TtlUnit.MICROSECONDS; - break; - case UNIT_NANOSECONDS: - unit = TableTtl.TtlUnit.NANOSECONDS; - break; - case UNIT_UNSPECIFIED: - case UNRECOGNIZED: - default: - unit = TableTtl.TtlUnit.UNSPECIFIED; - break; - } - tableTtl = TableTtl - .valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds()) - .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); - break; - case MODE_NOT_SET: - default: - tableTtl = TableTtl.notSet(); - break; + description.setTtlSettings(mapTtlSettings(result.getTtlSettings())); + for (YdbTable.ChangefeedDescription pb: result.getChangefeedsList()) { + description.addChangefeed(new ChangefeedDescription( + pb.getName(), + mapChangefeedMode(pb.getMode()), + mapChangefeedFormat(pb.getFormat()), + mapChangefeedState(pb.getState()), + pb.getVirtualTimestamps() + )); } - description.setTtlSettings(tableTtl); return description.build(); } diff --git a/table/src/main/java/tech/ydb/table/settings/Changefeed.java b/table/src/main/java/tech/ydb/table/settings/Changefeed.java index eb7335bd5..b1b7172ae 100644 --- a/table/src/main/java/tech/ydb/table/settings/Changefeed.java +++ b/table/src/main/java/tech/ydb/table/settings/Changefeed.java @@ -3,41 +3,38 @@ import java.time.Duration; import java.util.Objects; -import tech.ydb.proto.table.YdbTable; +import tech.ydb.table.description.ChangefeedDescription; +import tech.ydb.table.impl.BaseSession; + /** * @author Egor Litvinenko */ public class Changefeed { public enum Mode { - KEYS_ONLY(YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY), - UPDATES(YdbTable.ChangefeedMode.Mode.MODE_UPDATES), - NEW_IMAGE(YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE), - OLD_IMAGE(YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE), - NEW_AND_OLD_IMAGES(YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES); - - private final YdbTable.ChangefeedMode.Mode proto; - - Mode(YdbTable.ChangefeedMode.Mode proto) { - this.proto = proto; - } - - public YdbTable.ChangefeedMode.Mode toPb() { - return proto; + KEYS_ONLY, + UPDATES, + NEW_IMAGE, + OLD_IMAGE, + NEW_AND_OLD_IMAGES; + + @Deprecated + public tech.ydb.proto.table.YdbTable.ChangefeedMode.Mode toPb() { + return BaseSession.buildChangefeedMode(this); } } public enum Format { - JSON(YdbTable.ChangefeedFormat.Format.FORMAT_JSON); - - private final YdbTable.ChangefeedFormat.Format proto; - - Format(YdbTable.ChangefeedFormat.Format proto) { - this.proto = proto; - } - - public YdbTable.ChangefeedFormat.Format toProto() { - return proto; + /** Change record in JSON format for common (row oriented) tables */ + JSON, + /** Change record in JSON format for document (DynamoDB-compatible) tables */ + DYNAMODB_STREAMS_JSON, + /** Debezium-like change record JSON format for common (row oriented) tables */ + DEBEZIUM_JSON; + + @Deprecated + public tech.ydb.proto.table.YdbTable.ChangefeedFormat.Format toProto() { + return BaseSession.buildChangefeedFormat(this); } } @@ -82,22 +79,12 @@ public Duration getRetentionPeriod() { } @Deprecated - public YdbTable.Changefeed toProto() { - YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder() - .setName(name) - .setFormat(format.toProto()) - .setVirtualTimestamps(virtualTimestamps) - .setInitialScan(initialScan) - .setMode(mode.toPb()); - - if (retentionPeriod != null) { - builder.setRetentionPeriod(com.google.protobuf.Duration.newBuilder() - .setSeconds(retentionPeriod.getSeconds()) - .setNanos(retentionPeriod.getNano()) - .build()); - } + public tech.ydb.proto.table.YdbTable.Changefeed toProto() { + return BaseSession.buildChangefeed(this); + } - return builder.build(); + public static Builder fromDescription(ChangefeedDescription description) { + return new Builder(description); } public static Builder newBuilder(String changefeedName) { @@ -112,6 +99,13 @@ public static class Builder { private Duration retentionPeriod = null; private boolean initialScan = false; + private Builder(ChangefeedDescription description) { + this.name = description.getName(); + this.mode = description.getMode(); + this.format = description.getFormat(); + this.virtualTimestamps = description.hasVirtualTimestamps(); + } + private Builder(String name) { this.name = Objects.requireNonNull(name); } diff --git a/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java b/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java new file mode 100644 index 000000000..9a719d165 --- /dev/null +++ b/table/src/test/java/tech/ydb/table/integration/ChangefeedTableTest.java @@ -0,0 +1,196 @@ +package tech.ydb.table.integration; + +import java.time.Duration; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.table.SessionRetryContext; +import tech.ydb.table.TableClient; +import tech.ydb.table.description.ChangefeedDescription; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.settings.AlterTableSettings; +import tech.ydb.table.settings.Changefeed; +import tech.ydb.table.settings.CreateTableSettings; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.test.junit4.GrpcTransportRule; + +/** + * + * @author Aleksandr Gorshenin + */ +public class ChangefeedTableTest { + @ClassRule + public final static GrpcTransportRule ydbTransport = new GrpcTransportRule(); + + private final String TABLE_NAME = "table_changefeed_test"; + + private final TableClient tableClient = TableClient.newClient(ydbTransport).build(); + + private final SessionRetryContext ctx = SessionRetryContext.create(tableClient).build(); + + private final String tablePath = ydbTransport.getDatabase() + "/" + TABLE_NAME; + + @After + public void dropTable() { + ctx.supplyStatus(session -> session.dropTable(tablePath)).join(); + tableClient.close(); + } + + @Test + public void tableChangefeedsErrorsTest() { + TableDescription tableDescription = TableDescription.newBuilder() + .addNonnullColumn("id", PrimitiveType.Uint64) + .addNullableColumn("code", PrimitiveType.Text) + .setPrimaryKey("id") + .build(); + + Status createStatus = ctx.supplyStatus( + session -> session.createTable(tablePath, tableDescription, new CreateTableSettings()) + ).join(); + Assert.assertTrue("Create table " + createStatus, createStatus.isSuccess()); + + Status alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .addChangefeed(Changefeed.newBuilder("change1").build()) + .addChangefeed(Changefeed.newBuilder("change2").build()) + )).join(); + + Assert.assertFalse("Alter table add multipli changefeeds", alterStatus.isSuccess()); + Assert.assertEquals("Status{code = UNSUPPORTED(code=400180), issues = " + + "[Only one changefeed can be added by one operation (S_ERROR)]}", alterStatus.toString()); + + alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .dropChangefeed("change1") + .addChangefeed(Changefeed.newBuilder("test1") + .withInitialScan(false) + .withMode(Changefeed.Mode.UPDATES) + .withVirtualTimestamps(false) + .build()) + )).join(); + Assert.assertFalse("Alter table mixed alter", alterStatus.isSuccess()); + Assert.assertEquals("Status{code = UNSUPPORTED(code=400180), issues = " + + "[Mixed alter is unsupported (S_ERROR)]}", alterStatus.toString()); + + alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .dropChangefeed("change1") + .dropChangefeed("change2") + )).join(); + Assert.assertFalse("Alter table drop multipli changefeeds", alterStatus.isSuccess()); + Assert.assertEquals("Status{code = UNSUPPORTED(code=400180), issues = " + + "[Only one changefeed can be removed by one operation (S_ERROR)]}", alterStatus.toString()); + } + + @Test + public void tableChangefeedsTest() { + // --------------------- create tables ----------------------------- + TableDescription tableDescription = TableDescription.newBuilder() + .addNonnullColumn("id", PrimitiveType.Uint64) + .addNullableColumn("code", PrimitiveType.Text) + .addNullableColumn("size", PrimitiveType.Float) + .addNullableColumn("created", PrimitiveType.Timestamp) + .addNullableColumn("data", PrimitiveType.Text) + .setPrimaryKey("id") + .build(); + + Status createStatus = ctx.supplyStatus( + session -> session.createTable(tablePath, tableDescription, new CreateTableSettings()) + ).join(); + Assert.assertTrue("Create table " + createStatus, createStatus.isSuccess()); + + // --------------------- describe table after creating ----------------------------- + Result describeResult = ctx.supplyResult(session ->session.describeTable(tablePath)).join(); + Assert.assertTrue("Describe table " + describeResult.getStatus(), describeResult.isSuccess()); + + // No changefeeds + Assert.assertTrue(describeResult.getValue().getChangefeeds().isEmpty()); + + // --------------------- alter table with add changefeeds ----------------------------- + // only one changefeed can be added by one operation + + Status alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .addChangefeed(Changefeed.newBuilder("change1").build()) + )).join(); + Assert.assertTrue("Alter table changefeed 1 " + alterStatus, alterStatus.isSuccess()); + + alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .addChangefeed(Changefeed.newBuilder("change2") + .withFormat(Changefeed.Format.JSON) + .withInitialScan(true) + .withMode(Changefeed.Mode.NEW_AND_OLD_IMAGES) + .withVirtualTimestamps(true) + .withRetentionPeriod(Duration.ofDays(5)) + .build()) + )).join(); + Assert.assertTrue("Alter table changefeed 2 " + alterStatus, alterStatus.isSuccess()); + + // --------------------- describe table after create new changefeeds ----------------------------- + describeResult = ctx.supplyResult(session ->session.describeTable(tablePath)).join(); + Assert.assertTrue("Describe table after altering " + describeResult.getStatus(), describeResult.isSuccess()); + + List changefeeds = describeResult.getValue().getChangefeeds(); + + Assert.assertEquals(2, changefeeds.size()); + + ChangefeedDescription ch1 = changefeeds.get(0); + Assert.assertEquals("change1", ch1.getName()); + Assert.assertEquals(Changefeed.Mode.KEYS_ONLY, ch1.getMode()); + Assert.assertEquals(Changefeed.Format.JSON, ch1.getFormat()); + Assert.assertEquals(ChangefeedDescription.State.ENABLED, ch1.getState()); + Assert.assertFalse(ch1.hasVirtualTimestamps()); + Assert.assertEquals( + "Changefeed['change1']{state=ENABLED, format=JSON, mode=KEYS_ONLY, virtual timestamps=false}", + ch1.toString() + ); + + ChangefeedDescription ch2 = changefeeds.get(1); + Assert.assertEquals("change2", ch2.getName()); + Assert.assertEquals(Changefeed.Mode.NEW_AND_OLD_IMAGES, ch2.getMode()); + Assert.assertEquals(Changefeed.Format.JSON, ch2.getFormat()); + Assert.assertTrue(ch2.hasVirtualTimestamps()); + + // State may be flaky + Assert.assertTrue(ch2.getState() == ChangefeedDescription.State.INITIAL_SCAN || + ch2.getState() == ChangefeedDescription.State.ENABLED); + + // --------------------- drop one changefeed and add another ------------------------------------- + alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .dropChangefeed("change2") + )).join(); + Assert.assertTrue("Alter table changefeed 3 " + alterStatus, alterStatus.isSuccess()); + + alterStatus = ctx.supplyStatus(session -> session.alterTable(tablePath, new AlterTableSettings() + .addChangefeed(Changefeed.fromDescription(ch2) + .withInitialScan(false) + .withMode(Changefeed.Mode.UPDATES) + .withVirtualTimestamps(false) + .build()) + )).join(); + Assert.assertTrue("Alter table changefeed 3 " + alterStatus, alterStatus.isSuccess()); + + describeResult = ctx.supplyResult(session ->session.describeTable(tablePath)).join(); + Assert.assertTrue("Describe table after altering " + describeResult.getStatus(), describeResult.isSuccess()); + + changefeeds = describeResult.getValue().getChangefeeds(); + + Assert.assertEquals(2, changefeeds.size()); + + ch1 = changefeeds.get(0); + Assert.assertEquals(ch1, changefeeds.get(0)); + Assert.assertEquals(Changefeed.Mode.KEYS_ONLY, ch1.getMode()); + Assert.assertEquals(Changefeed.Format.JSON, ch1.getFormat()); + Assert.assertEquals(ChangefeedDescription.State.ENABLED, ch1.getState()); + Assert.assertFalse(ch1.hasVirtualTimestamps()); + + ChangefeedDescription ch3 = changefeeds.get(1); + Assert.assertNotEquals(ch2, ch3); + Assert.assertEquals("change2", ch3.getName()); + Assert.assertEquals(Changefeed.Mode.UPDATES, ch3.getMode()); + Assert.assertEquals(Changefeed.Format.JSON, ch3.getFormat()); + Assert.assertFalse(ch3.hasVirtualTimestamps()); + } +}