Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package tech.ydb.table.description;

import java.util.Objects;

import tech.ydb.table.settings.Changefeed;

/**
*
* @author Aleksandr Gorshenin
*/
public class ChangefeedDescription {
public enum State {
/**
* Normal state, from this state changefeed can be disabled
*/
ENABLED,
/**
* No new change records are generated, but the old ones remain available<br>
* From this state changefeed cannot be switched to any other state
*/
DISABLED,
/**
* An initial scan is being performed. <br> After its completion changefeed will switch to the normal state
*/
INITIAL_SCAN;
}

private final String name;
private final Changefeed.Mode mode;
private final Changefeed.Format format;
private final State state;
private final boolean virtualTimestamps;

public ChangefeedDescription(String name, Changefeed.Mode mode, Changefeed.Format format, State state,
boolean virtualTimestamps) {
this.name = name;
this.mode = mode;
this.format = format;
this.state = state;
this.virtualTimestamps = virtualTimestamps;
}

/**
* @return Name of the feed
*/
public String getName() {
return name;
}

/**
* @return Mode specifies the information that will be written to the feed
*/
public Changefeed.Mode getMode() {
return mode;
}

/**
* @return Format of the data
*/
public Changefeed.Format getFormat() {
return format;
}

/**
* @return State of the feed
*/
public State getState() {
return state;
}

/**
* @return State of emitting of virtual timestamps along with data
*/
public boolean hasVirtualTimestamps() {
return virtualTimestamps;
}

@Override
public int hashCode() {
return Objects.hash(name, mode, format, state, virtualTimestamps);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ChangefeedDescription cd = (ChangefeedDescription) o;
return Objects.equals(name, cd.name)
&& mode == cd.mode
&& format == cd.format
&& state == cd.state
&& virtualTimestamps == cd.virtualTimestamps;
}

@Override
public String toString() {
return new StringBuilder("Changefeed['").append(name)
.append("']{state=").append(state)
.append(", format=").append(format)
.append(", mode=").append(mode)
.append(", virtual timestamps=").append(virtualTimestamps)
.append("}").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;

import tech.ydb.table.Session;
import tech.ydb.table.description.TableTtl.TtlMode;
import tech.ydb.table.settings.AlterTableSettings;
import tech.ydb.table.settings.PartitioningSettings;
import tech.ydb.table.values.OptionalType;
import tech.ydb.table.values.Type;
Expand All @@ -28,6 +30,7 @@ public class TableDescription {
private final List<TableIndex> indexes;
private final List<ColumnFamily> columnFamilies;
private final List<KeyRange> keyRanges;
private final List<ChangefeedDescription> changefeeds;

@Nullable
private final TableStats tableStats;
Expand All @@ -49,6 +52,7 @@ private TableDescription(Builder builder) {
this.partitioningSettings = builder.partitioningSettings;
this.partitionStats = ImmutableList.copyOf(builder.partitionStats);
this.tableTtl = builder.ttlSettings;
this.changefeeds = builder.changefeeds;
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -93,6 +97,10 @@ public TableTtl getTableTtl() {
return tableTtl;
}

public List<ChangefeedDescription> getChangefeeds() {
return changefeeds;
}

/**
* BUILDER
*/
Expand All @@ -108,6 +116,7 @@ public static class Builder {
private PartitioningSettings partitioningSettings = null;
private final List<PartitionStats> partitionStats = new ArrayList<>();
private TableTtl ttlSettings = TableTtl.notSet();
private final List<ChangefeedDescription> changefeeds = new ArrayList<>();

public Builder addNonnullColumn(String name, Type type) {
return addNonnullColumn(name, type, null);
Expand Down Expand Up @@ -222,6 +231,21 @@ public Builder addPartitionStat(long rows, long size) {
return this;
}

/**
* Adds changefeed information to table description<br>
* <b>NOTICE:</b> Changefeed description cannot be used for changefeed creating.<br>
* It is impossible to create changefeed in one time with table creating. For adding a new changefeed use method
* {@link Session#alterTable(java.lang.String, tech.ydb.table.settings.AlterTableSettings)} and
* {@link AlterTableSettings#addChangefeed(tech.ydb.table.settings.Changefeed)}
*
* @param changefeed changefeed description
* @return table description builder
*/
public Builder addChangefeed(ChangefeedDescription changefeed) {
this.changefeeds.add(changefeed);
return this;
}

@Deprecated
public Builder setTtlSettings(int ttlModeCase, String columnName, int expireAfterSeconds) {
this.ttlSettings = new TableTtl(TtlMode.forCase(ttlModeCase), columnName, expireAfterSeconds);
Expand Down
169 changes: 125 additions & 44 deletions table/src/main/java/tech/ydb/table/impl/BaseSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import tech.ydb.proto.common.CommonProtos;
import tech.ydb.proto.table.YdbTable;
import tech.ydb.table.Session;
import tech.ydb.table.description.ChangefeedDescription;
import tech.ydb.table.description.ColumnFamily;
import tech.ydb.table.description.KeyBound;
import tech.ydb.table.description.KeyRange;
Expand Down Expand Up @@ -210,13 +211,43 @@ private static YdbTable.ColumnMeta buildColumnMeta(TableColumn column) {
return builder.build();
}

private static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
public static YdbTable.ChangefeedFormat.Format buildChangefeedFormat(Changefeed.Format format) {
switch (format) {
case JSON:
return YdbTable.ChangefeedFormat.Format.FORMAT_JSON;
case DYNAMODB_STREAMS_JSON:
return YdbTable.ChangefeedFormat.Format.FORMAT_DYNAMODB_STREAMS_JSON;
case DEBEZIUM_JSON:
return YdbTable.ChangefeedFormat.Format.FORMAT_DEBEZIUM_JSON;
default:
return YdbTable.ChangefeedFormat.Format.FORMAT_UNSPECIFIED;
}
}

public static YdbTable.ChangefeedMode.Mode buildChangefeedMode(Changefeed.Mode mode) {
switch (mode) {
case KEYS_ONLY:
return YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY;
case UPDATES:
return YdbTable.ChangefeedMode.Mode.MODE_UPDATES;
case NEW_IMAGE:
return YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE;
case OLD_IMAGE:
return YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE;
case NEW_AND_OLD_IMAGES:
return YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES;
default:
return YdbTable.ChangefeedMode.Mode.MODE_UNSPECIFIED;
}
}

public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) {
YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder()
.setName(changefeed.getName())
.setFormat(changefeed.getFormat().toProto())
.setFormat(buildChangefeedFormat(changefeed.getFormat()))
.setMode(buildChangefeedMode(changefeed.getMode()))
.setVirtualTimestamps(changefeed.hasVirtualTimestamps())
.setInitialScan(changefeed.hasInitialScan())
.setMode(changefeed.getMode().toPb());
.setInitialScan(changefeed.hasInitialScan());

Duration retentionPeriod = changefeed.getRetentionPeriod();
if (retentionPeriod != null) {
Expand Down Expand Up @@ -609,6 +640,87 @@ public CompletableFuture<Result<TableDescription>> describeTable(String path, De
.thenApply(result -> result.map(desc -> mapDescribeTable(desc, settings)));
}

private static TableTtl mapTtlSettings(YdbTable.TtlSettings ttl) {
switch (ttl.getModeCase()) {
case DATE_TYPE_COLUMN:
YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn();
return TableTtl
.dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds())
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
case VALUE_SINCE_UNIX_EPOCH:
YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch();
TableTtl.TtlUnit unit;
switch (vs.getColumnUnit()) {
case UNIT_SECONDS:
unit = TableTtl.TtlUnit.SECONDS;
break;
case UNIT_MILLISECONDS:
unit = TableTtl.TtlUnit.MILLISECONDS;
break;
case UNIT_MICROSECONDS:
unit = TableTtl.TtlUnit.MICROSECONDS;
break;
case UNIT_NANOSECONDS:
unit = TableTtl.TtlUnit.NANOSECONDS;
break;
case UNIT_UNSPECIFIED:
case UNRECOGNIZED:
default:
unit = TableTtl.TtlUnit.UNSPECIFIED;
break;
}
return TableTtl
.valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds())
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
case MODE_NOT_SET:
default:
return TableTtl.notSet();
}
}

private static Changefeed.Format mapChangefeedFormat(YdbTable.ChangefeedFormat.Format pb) {
switch (pb) {
case FORMAT_JSON:
return Changefeed.Format.JSON;
case FORMAT_DYNAMODB_STREAMS_JSON:
return Changefeed.Format.DYNAMODB_STREAMS_JSON;
case FORMAT_DEBEZIUM_JSON:
return Changefeed.Format.DEBEZIUM_JSON;
default:
return null;
}
}

private static Changefeed.Mode mapChangefeedMode(YdbTable.ChangefeedMode.Mode pb) {
switch (pb) {
case MODE_KEYS_ONLY:
return Changefeed.Mode.KEYS_ONLY;
case MODE_NEW_IMAGE:
return Changefeed.Mode.NEW_IMAGE;
case MODE_OLD_IMAGE:
return Changefeed.Mode.OLD_IMAGE;
case MODE_NEW_AND_OLD_IMAGES:
return Changefeed.Mode.NEW_AND_OLD_IMAGES;
case MODE_UPDATES:
return Changefeed.Mode.UPDATES;
default:
return null;
}
}

private static ChangefeedDescription.State mapChangefeedState(YdbTable.ChangefeedDescription.State pb) {
switch (pb) {
case STATE_ENABLED:
return ChangefeedDescription.State.ENABLED;
case STATE_DISABLED:
return ChangefeedDescription.State.DISABLED;
case STATE_INITIAL_SCAN:
return ChangefeedDescription.State.INITIAL_SCAN;
default:
return null;
}
}

private static TableDescription mapDescribeTable(
YdbTable.DescribeTableResult result,
DescribeTableSettings describeTableSettings
Expand Down Expand Up @@ -702,47 +814,16 @@ private static TableDescription mapDescribeTable(
}
}

YdbTable.TtlSettings ttl = result.getTtlSettings();
TableTtl tableTtl;
switch (ttl.getModeCase()) {
case DATE_TYPE_COLUMN:
YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn();
tableTtl = TableTtl
.dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds())
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
break;
case VALUE_SINCE_UNIX_EPOCH:
YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch();
TableTtl.TtlUnit unit;
switch (vs.getColumnUnit()) {
case UNIT_SECONDS:
unit = TableTtl.TtlUnit.SECONDS;
break;
case UNIT_MILLISECONDS:
unit = TableTtl.TtlUnit.MILLISECONDS;
break;
case UNIT_MICROSECONDS:
unit = TableTtl.TtlUnit.MICROSECONDS;
break;
case UNIT_NANOSECONDS:
unit = TableTtl.TtlUnit.NANOSECONDS;
break;
case UNIT_UNSPECIFIED:
case UNRECOGNIZED:
default:
unit = TableTtl.TtlUnit.UNSPECIFIED;
break;
}
tableTtl = TableTtl
.valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds())
.withRunIntervalSeconds(ttl.getRunIntervalSeconds());
break;
case MODE_NOT_SET:
default:
tableTtl = TableTtl.notSet();
break;
description.setTtlSettings(mapTtlSettings(result.getTtlSettings()));
for (YdbTable.ChangefeedDescription pb: result.getChangefeedsList()) {
description.addChangefeed(new ChangefeedDescription(
pb.getName(),
mapChangefeedMode(pb.getMode()),
mapChangefeedFormat(pb.getFormat()),
mapChangefeedState(pb.getState()),
pb.getVirtualTimestamps()
));
}
description.setTtlSettings(tableTtl);

return description.build();
}
Expand Down
Loading