|
39 | 39 | import tech.ydb.proto.common.CommonProtos; |
40 | 40 | import tech.ydb.proto.table.YdbTable; |
41 | 41 | import tech.ydb.table.Session; |
| 42 | +import tech.ydb.table.description.ChangefeedDescription; |
42 | 43 | import tech.ydb.table.description.ColumnFamily; |
43 | 44 | import tech.ydb.table.description.KeyBound; |
44 | 45 | import tech.ydb.table.description.KeyRange; |
@@ -210,13 +211,43 @@ private static YdbTable.ColumnMeta buildColumnMeta(TableColumn column) { |
210 | 211 | return builder.build(); |
211 | 212 | } |
212 | 213 |
|
213 | | - private static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) { |
| 214 | + public static YdbTable.ChangefeedFormat.Format buildChangefeedFormat(Changefeed.Format format) { |
| 215 | + switch (format) { |
| 216 | + case JSON: |
| 217 | + return YdbTable.ChangefeedFormat.Format.FORMAT_JSON; |
| 218 | + case DYNAMODB_STREAMS_JSON: |
| 219 | + return YdbTable.ChangefeedFormat.Format.FORMAT_DYNAMODB_STREAMS_JSON; |
| 220 | + case DEBEZIUM_JSON: |
| 221 | + return YdbTable.ChangefeedFormat.Format.FORMAT_DEBEZIUM_JSON; |
| 222 | + default: |
| 223 | + return YdbTable.ChangefeedFormat.Format.FORMAT_UNSPECIFIED; |
| 224 | + } |
| 225 | + } |
| 226 | + |
| 227 | + public static YdbTable.ChangefeedMode.Mode buildChangefeedMode(Changefeed.Mode mode) { |
| 228 | + switch (mode) { |
| 229 | + case KEYS_ONLY: |
| 230 | + return YdbTable.ChangefeedMode.Mode.MODE_KEYS_ONLY; |
| 231 | + case UPDATES: |
| 232 | + return YdbTable.ChangefeedMode.Mode.MODE_UPDATES; |
| 233 | + case NEW_IMAGE: |
| 234 | + return YdbTable.ChangefeedMode.Mode.MODE_NEW_IMAGE; |
| 235 | + case OLD_IMAGE: |
| 236 | + return YdbTable.ChangefeedMode.Mode.MODE_OLD_IMAGE; |
| 237 | + case NEW_AND_OLD_IMAGES: |
| 238 | + return YdbTable.ChangefeedMode.Mode.MODE_NEW_AND_OLD_IMAGES; |
| 239 | + default: |
| 240 | + return YdbTable.ChangefeedMode.Mode.MODE_UNSPECIFIED; |
| 241 | + } |
| 242 | + } |
| 243 | + |
| 244 | + public static YdbTable.Changefeed buildChangefeed(Changefeed changefeed) { |
214 | 245 | YdbTable.Changefeed.Builder builder = YdbTable.Changefeed.newBuilder() |
215 | 246 | .setName(changefeed.getName()) |
216 | | - .setFormat(changefeed.getFormat().toProto()) |
| 247 | + .setFormat(buildChangefeedFormat(changefeed.getFormat())) |
| 248 | + .setMode(buildChangefeedMode(changefeed.getMode())) |
217 | 249 | .setVirtualTimestamps(changefeed.hasVirtualTimestamps()) |
218 | | - .setInitialScan(changefeed.hasInitialScan()) |
219 | | - .setMode(changefeed.getMode().toPb()); |
| 250 | + .setInitialScan(changefeed.hasInitialScan()); |
220 | 251 |
|
221 | 252 | Duration retentionPeriod = changefeed.getRetentionPeriod(); |
222 | 253 | if (retentionPeriod != null) { |
@@ -609,6 +640,87 @@ public CompletableFuture<Result<TableDescription>> describeTable(String path, De |
609 | 640 | .thenApply(result -> result.map(desc -> mapDescribeTable(desc, settings))); |
610 | 641 | } |
611 | 642 |
|
| 643 | + private static TableTtl mapTtlSettings(YdbTable.TtlSettings ttl) { |
| 644 | + switch (ttl.getModeCase()) { |
| 645 | + case DATE_TYPE_COLUMN: |
| 646 | + YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn(); |
| 647 | + return TableTtl |
| 648 | + .dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds()) |
| 649 | + .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); |
| 650 | + case VALUE_SINCE_UNIX_EPOCH: |
| 651 | + YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch(); |
| 652 | + TableTtl.TtlUnit unit; |
| 653 | + switch (vs.getColumnUnit()) { |
| 654 | + case UNIT_SECONDS: |
| 655 | + unit = TableTtl.TtlUnit.SECONDS; |
| 656 | + break; |
| 657 | + case UNIT_MILLISECONDS: |
| 658 | + unit = TableTtl.TtlUnit.MILLISECONDS; |
| 659 | + break; |
| 660 | + case UNIT_MICROSECONDS: |
| 661 | + unit = TableTtl.TtlUnit.MICROSECONDS; |
| 662 | + break; |
| 663 | + case UNIT_NANOSECONDS: |
| 664 | + unit = TableTtl.TtlUnit.NANOSECONDS; |
| 665 | + break; |
| 666 | + case UNIT_UNSPECIFIED: |
| 667 | + case UNRECOGNIZED: |
| 668 | + default: |
| 669 | + unit = TableTtl.TtlUnit.UNSPECIFIED; |
| 670 | + break; |
| 671 | + } |
| 672 | + return TableTtl |
| 673 | + .valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds()) |
| 674 | + .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); |
| 675 | + case MODE_NOT_SET: |
| 676 | + default: |
| 677 | + return TableTtl.notSet(); |
| 678 | + } |
| 679 | + } |
| 680 | + |
| 681 | + private static Changefeed.Format mapChangefeedFormat(YdbTable.ChangefeedFormat.Format pb) { |
| 682 | + switch (pb) { |
| 683 | + case FORMAT_JSON: |
| 684 | + return Changefeed.Format.JSON; |
| 685 | + case FORMAT_DYNAMODB_STREAMS_JSON: |
| 686 | + return Changefeed.Format.DYNAMODB_STREAMS_JSON; |
| 687 | + case FORMAT_DEBEZIUM_JSON: |
| 688 | + return Changefeed.Format.DEBEZIUM_JSON; |
| 689 | + default: |
| 690 | + return null; |
| 691 | + } |
| 692 | + } |
| 693 | + |
| 694 | + private static Changefeed.Mode mapChangefeedMode(YdbTable.ChangefeedMode.Mode pb) { |
| 695 | + switch (pb) { |
| 696 | + case MODE_KEYS_ONLY: |
| 697 | + return Changefeed.Mode.KEYS_ONLY; |
| 698 | + case MODE_NEW_IMAGE: |
| 699 | + return Changefeed.Mode.NEW_IMAGE; |
| 700 | + case MODE_OLD_IMAGE: |
| 701 | + return Changefeed.Mode.OLD_IMAGE; |
| 702 | + case MODE_NEW_AND_OLD_IMAGES: |
| 703 | + return Changefeed.Mode.NEW_AND_OLD_IMAGES; |
| 704 | + case MODE_UPDATES: |
| 705 | + return Changefeed.Mode.UPDATES; |
| 706 | + default: |
| 707 | + return null; |
| 708 | + } |
| 709 | + } |
| 710 | + |
| 711 | + private static ChangefeedDescription.State mapChangefeedState(YdbTable.ChangefeedDescription.State pb) { |
| 712 | + switch (pb) { |
| 713 | + case STATE_ENABLED: |
| 714 | + return ChangefeedDescription.State.ENABLED; |
| 715 | + case STATE_DISABLED: |
| 716 | + return ChangefeedDescription.State.DISABLED; |
| 717 | + case STATE_INITIAL_SCAN: |
| 718 | + return ChangefeedDescription.State.INITIAL_SCAN; |
| 719 | + default: |
| 720 | + return null; |
| 721 | + } |
| 722 | + } |
| 723 | + |
612 | 724 | private static TableDescription mapDescribeTable( |
613 | 725 | YdbTable.DescribeTableResult result, |
614 | 726 | DescribeTableSettings describeTableSettings |
@@ -702,47 +814,16 @@ private static TableDescription mapDescribeTable( |
702 | 814 | } |
703 | 815 | } |
704 | 816 |
|
705 | | - YdbTable.TtlSettings ttl = result.getTtlSettings(); |
706 | | - TableTtl tableTtl; |
707 | | - switch (ttl.getModeCase()) { |
708 | | - case DATE_TYPE_COLUMN: |
709 | | - YdbTable.DateTypeColumnModeSettings dc = ttl.getDateTypeColumn(); |
710 | | - tableTtl = TableTtl |
711 | | - .dateTimeColumn(dc.getColumnName(), dc.getExpireAfterSeconds()) |
712 | | - .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); |
713 | | - break; |
714 | | - case VALUE_SINCE_UNIX_EPOCH: |
715 | | - YdbTable.ValueSinceUnixEpochModeSettings vs = ttl.getValueSinceUnixEpoch(); |
716 | | - TableTtl.TtlUnit unit; |
717 | | - switch (vs.getColumnUnit()) { |
718 | | - case UNIT_SECONDS: |
719 | | - unit = TableTtl.TtlUnit.SECONDS; |
720 | | - break; |
721 | | - case UNIT_MILLISECONDS: |
722 | | - unit = TableTtl.TtlUnit.MILLISECONDS; |
723 | | - break; |
724 | | - case UNIT_MICROSECONDS: |
725 | | - unit = TableTtl.TtlUnit.MICROSECONDS; |
726 | | - break; |
727 | | - case UNIT_NANOSECONDS: |
728 | | - unit = TableTtl.TtlUnit.NANOSECONDS; |
729 | | - break; |
730 | | - case UNIT_UNSPECIFIED: |
731 | | - case UNRECOGNIZED: |
732 | | - default: |
733 | | - unit = TableTtl.TtlUnit.UNSPECIFIED; |
734 | | - break; |
735 | | - } |
736 | | - tableTtl = TableTtl |
737 | | - .valueSinceUnixEpoch(vs.getColumnName(), unit, vs.getExpireAfterSeconds()) |
738 | | - .withRunIntervalSeconds(ttl.getRunIntervalSeconds()); |
739 | | - break; |
740 | | - case MODE_NOT_SET: |
741 | | - default: |
742 | | - tableTtl = TableTtl.notSet(); |
743 | | - break; |
| 817 | + description.setTtlSettings(mapTtlSettings(result.getTtlSettings())); |
| 818 | + for (YdbTable.ChangefeedDescription pb: result.getChangefeedsList()) { |
| 819 | + description.addChangefeed(new ChangefeedDescription( |
| 820 | + pb.getName(), |
| 821 | + mapChangefeedMode(pb.getMode()), |
| 822 | + mapChangefeedFormat(pb.getFormat()), |
| 823 | + mapChangefeedState(pb.getState()), |
| 824 | + pb.getVirtualTimestamps() |
| 825 | + )); |
744 | 826 | } |
745 | | - description.setTtlSettings(tableTtl); |
746 | 827 |
|
747 | 828 | return description.build(); |
748 | 829 | } |
|
0 commit comments