Skip to content

Commit 7c607f5

Browse files
committed
Auto partitioning settings to public api
1 parent 213253f commit 7c607f5

File tree

4 files changed

+216
-1
lines changed

4 files changed

+216
-1
lines changed

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,30 +967,184 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A
967967
class PartitioningSettings(IToProto, IFromProto):
968968
min_active_partitions: int
969969
partition_count_limit: int
970+
max_active_partitions: int
971+
auto_partitioning_settings: AutoPartitioningSettings
970972

971973
@staticmethod
972974
def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings":
973975
return PartitioningSettings(
974976
min_active_partitions=msg.min_active_partitions,
975977
partition_count_limit=msg.partition_count_limit,
978+
max_active_partitions=msg.max_active_partitions,
979+
auto_partitioning_settings=AutoPartitioningSettings.from_proto(
980+
msg.auto_partitioning_settings
981+
),
976982
)
977983

978984
def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
979985
return ydb_topic_pb2.PartitioningSettings(
980986
min_active_partitions=self.min_active_partitions,
981987
partition_count_limit=self.partition_count_limit,
988+
max_active_partitions=self.max_active_partitions,
989+
auto_partitioning_settings=self.auto_partitioning_settings.to_proto()
990+
)
991+
992+
993+
class AutoPartitioningStrategy(int, IFromProto, IFromPublic, IToPublic):
994+
UNSPECIFIED = 0
995+
DISABLED = 1
996+
SCALE_UP = 2
997+
SCALE_UP_AND_DOWN = 3
998+
PAUSED = 4
999+
1000+
@staticmethod
1001+
def from_public(
1002+
strategy: Optional[ydb_topic_public_types.PublicAutoPartitioningStrategy],
1003+
) -> Optional["AutoPartitioningStrategy"]:
1004+
if strategy is None:
1005+
return None
1006+
1007+
return AutoPartitioningStrategy(strategy)
1008+
1009+
@staticmethod
1010+
def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]:
1011+
if code is None:
1012+
return None
1013+
1014+
return AutoPartitioningStrategy(code)
1015+
1016+
def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy:
1017+
try:
1018+
ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
1019+
except KeyError:
1020+
return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED
1021+
1022+
1023+
@dataclass
1024+
class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic):
1025+
strategy: AutoPartitioningStrategy
1026+
partition_write_speed: AutoPartitioningWriteSpeedStrategy
1027+
1028+
@staticmethod
1029+
def from_public(
1030+
settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings]
1031+
) -> Optional[AutoPartitioningSettings]:
1032+
if not settings:
1033+
return None
1034+
1035+
return AutoPartitioningSettings(
1036+
strategy=settings.strategy,
1037+
partition_write_speed=AutoPartitioningWriteSpeedStrategy(
1038+
stabilization_window=settings.stabilization_window,
1039+
up_utilization_percent=settings.up_utilization_percent,
1040+
down_utilization_percent=settings.down_utilization_percent,
1041+
)
1042+
)
1043+
1044+
@staticmethod
1045+
def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings:
1046+
return AutoPartitioningSettings(
1047+
strategy=AutoPartitioningStrategy.from_proto(msg.strategy),
1048+
partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(
1049+
msg.partition_write_speed
1050+
),
1051+
)
1052+
1053+
def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings:
1054+
return ydb_topic_pb2.AutoPartitioningSettings(
1055+
strategy=self.strategy,
1056+
partition_write_speed=self.partition_write_speed.to_proto()
1057+
)
1058+
1059+
def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings:
1060+
return ydb_topic_public_types.PublicAutoPartitioningSettings(
1061+
strategy=self.strategy.to_public(),
1062+
stabilization_window=self.partition_write_speed.stabilization_window,
1063+
up_utilization_percent=self.partition_write_speed.up_utilization_percent,
1064+
down_utilization_percent=self.partition_write_speed.down_utilization_percent,
1065+
)
1066+
1067+
1068+
@dataclass
1069+
class AutoPartitioningWriteSpeedStrategy(IToProto, IFromProto):
1070+
stabilization_window: Optional[datetime.timedelta]
1071+
up_utilization_percent: Optional[int]
1072+
down_utilization_percent: Optional[int]
1073+
1074+
def to_proto(self):
1075+
return ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy(
1076+
stabilization_window=proto_duration_from_timedelta(self.stabilization_window),
1077+
up_utilization_percent=self.up_utilization_percent,
1078+
down_utilization_percent=self.down_utilization_percent,
1079+
)
1080+
1081+
@staticmethod
1082+
def from_proto(msg: ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy) -> AutoPartitioningWriteSpeedStrategy:
1083+
return AutoPartitioningWriteSpeedStrategy(
1084+
stabilization_window=timedelta_from_proto_duration(msg.stabilization_window),
1085+
up_utilization_percent=msg.up_utilization_percent,
1086+
down_utilization_percent=msg.down_utilization_percent,
9821087
)
9831088

9841089

9851090
@dataclass
9861091
class AlterPartitioningSettings(IToProto):
9871092
set_min_active_partitions: Optional[int]
9881093
set_partition_count_limit: Optional[int]
1094+
set_max_active_partitions: Optional[int]
1095+
alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings]
9891096

9901097
def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
9911098
return ydb_topic_pb2.AlterPartitioningSettings(
9921099
set_min_active_partitions=self.set_min_active_partitions,
9931100
set_partition_count_limit=self.set_partition_count_limit,
1101+
set_max_active_partitions=self.set_max_active_partitions,
1102+
)
1103+
1104+
1105+
@dataclass
1106+
class AlterAutoPartitioningSettings(IToProto, IFromPublic):
1107+
set_strategy: Optional[AutoPartitioningStrategy]
1108+
set_partition_write_speed: Optional[AlterAutoPartitioningWriteSpeedStrategy]
1109+
1110+
@staticmethod
1111+
def from_public(
1112+
settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings]
1113+
) -> Optional[AlterAutoPartitioningSettings]:
1114+
if not settings:
1115+
return None
1116+
1117+
return AutoPartitioningSettings(
1118+
strategy=settings.set_strategy,
1119+
partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy(
1120+
stabilization_window=settings.set_stabilization_window,
1121+
up_utilization_percent=settings.set_up_utilization_percent,
1122+
down_utilization_percent=settings.set_down_utilization_percent,
1123+
)
1124+
)
1125+
1126+
def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings:
1127+
set_partition_write_speed = None
1128+
if self.set_partition_write_speed:
1129+
set_partition_write_speed = self.set_partition_write_speed.to_proto()
1130+
1131+
return ydb_topic_pb2.AlterAutoPartitioningSettings(
1132+
set_strategy=self.set_strategy,
1133+
set_partition_write_speed=set_partition_write_speed,
1134+
)
1135+
1136+
1137+
@dataclass
1138+
class AlterAutoPartitioningWriteSpeedStrategy(IToProto):
1139+
set_stabilization_window: Optional[datetime.timedelta]
1140+
set_up_utilization_percent: Optional[int]
1141+
set_down_utilization_percent: Optional[int]
1142+
1143+
def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy:
1144+
return ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy(
1145+
set_stabilization_window=proto_duration_from_timedelta(self.set_stabilization_window),
1146+
set_up_utilization_percent=self.set_up_utilization_percent,
1147+
set_down_utilization_percent=self.set_down_utilization_percent,
9941148
)
9951149

9961150

@@ -1063,11 +1217,17 @@ def from_public(req: ydb_topic_public_types.CreateTopicRequestParams):
10631217
consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
10641218
consumers.append(Consumer.from_public(consumer))
10651219

1220+
auto_partitioning_settings = None
1221+
if req.auto_partitioning_settings is not None:
1222+
auto_partitioning_settings = AutoPartitioningSettings.from_public(req.auto_partitioning_settings)
1223+
10661224
return CreateTopicRequest(
10671225
path=req.path,
10681226
partitioning_settings=PartitioningSettings(
10691227
min_active_partitions=req.min_active_partitions,
10701228
partition_count_limit=req.partition_count_limit,
1229+
max_active_partitions=req.max_active_partitions,
1230+
auto_partitioning_settings=auto_partitioning_settings
10711231
),
10721232
retention_period=req.retention_period,
10731233
retention_storage_mb=req.retention_storage_mb,
@@ -1138,13 +1298,22 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
11381298
consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
11391299
alter_consumers.append(AlterConsumer.from_public(consumer))
11401300

1301+
alter_auto_partitioning_settings = None
1302+
if req.alter_auto_partitioning_settings is not None:
1303+
alter_auto_partitioning_settings = AutoPartitioningSettings.from_public(
1304+
req.alter_auto_partitioning_settings
1305+
)
1306+
1307+
11411308
drop_consumers = req.drop_consumers if req.drop_consumers else []
11421309

11431310
return AlterTopicRequest(
11441311
path=req.path,
11451312
alter_partitioning_settings=AlterPartitioningSettings(
11461313
set_min_active_partitions=req.set_min_active_partitions,
11471314
set_partition_count_limit=req.set_partition_count_limit,
1315+
set_max_active_partitions=req.set_max_active_partitions,
1316+
alter_auto_partitioning_settings=alter_auto_partitioning_settings,
11481317
),
11491318
add_consumers=add_consumers,
11501319
set_retention_period=req.set_retention_period,
@@ -1205,6 +1374,8 @@ def to_public(self) -> ydb_topic_public_types.PublicDescribeTopicResult:
12051374
return ydb_topic_public_types.PublicDescribeTopicResult(
12061375
self=scheme._wrap_scheme_entry(self.self_proto),
12071376
min_active_partitions=self.partitioning_settings.min_active_partitions,
1377+
max_active_partitions=self.partitioning_settings.max_active_partitions,
1378+
auto_partitioning_settings=self.partitioning_settings.auto_partitioning_settings.to_public(),
12081379
partition_count_limit=self.partitioning_settings.partition_count_limit,
12091380
partitions=list(map(DescribeTopicResult.PartitionInfo.to_public, self.partitions)),
12101381
retention_period=self.retention_period,

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
class CreateTopicRequestParams:
2020
path: str
2121
min_active_partitions: Optional[int]
22+
max_active_partitions: Optional[int]
2223
partition_count_limit: Optional[int]
2324
retention_period: Optional[datetime.timedelta]
2425
retention_storage_mb: Optional[int]
@@ -28,12 +29,14 @@ class CreateTopicRequestParams:
2829
attributes: Optional[Dict[str, str]]
2930
consumers: Optional[List[Union["PublicConsumer", str]]]
3031
metering_mode: Optional["PublicMeteringMode"]
32+
auto_partitioning_settings: Optional["PublicAutoPartitioningSettings"]
3133

3234

3335
@dataclass
3436
class AlterTopicRequestParams:
3537
path: str
3638
set_min_active_partitions: Optional[int]
39+
set_max_active_partitions: Optional[int]
3740
set_partition_count_limit: Optional[int]
3841
add_consumers: Optional[List[Union["PublicConsumer", str]]]
3942
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
@@ -45,6 +48,7 @@ class AlterTopicRequestParams:
4548
set_retention_period: Optional[datetime.timedelta]
4649
set_retention_storage_mb: Optional[int]
4750
set_supported_codecs: Optional[List[Union["PublicCodec", int]]]
51+
alter_auto_partitioning_settings: Optional["PublicAlterAutoPartitioningSettings"]
4852

4953

5054
class PublicCodec(int):
@@ -68,6 +72,30 @@ class PublicMeteringMode(IntEnum):
6872
REQUEST_UNITS = 2
6973

7074

75+
class PublicAutoPartitioningStrategy(IntEnum):
76+
UNSPECIFIED = 0
77+
DISABLED = 1
78+
SCALE_UP = 2
79+
SCALE_UP_AND_DOWN = 3
80+
PAUSED = 4
81+
82+
83+
@dataclass
84+
class PublicAutoPartitioningSettings:
85+
strategy: Optional["PublicAutoPartitioningStrategy"]
86+
stabilization_window: Optional[datetime.timedelta]
87+
up_utilization_percent: Optional[int]
88+
down_utilization_percent: Optional[int]
89+
90+
91+
@dataclass
92+
class PublicAlterAutoPartitioningSettings:
93+
set_strategy: Optional["PublicAutoPartitioningStrategy"]
94+
set_stabilization_window: Optional[datetime.timedelta]
95+
set_up_utilization_percent: Optional[int]
96+
set_down_utilization_percent: Optional[int]
97+
98+
7199
@dataclass
72100
class PublicConsumer:
73101
name: str
@@ -138,6 +166,9 @@ class PublicDescribeTopicResult:
138166
min_active_partitions: int
139167
"Minimum partition count auto merge would stop working at"
140168

169+
max_active_partitions: int
170+
"Minimum partition count auto split would stop working at"
171+
141172
partition_count_limit: int
142173
"Limit for total partition count, including active (open for write) and read-only partitions"
143174

@@ -171,6 +202,8 @@ class PublicDescribeTopicResult:
171202
topic_stats: "PublicDescribeTopicResult.TopicStats"
172203
"Statistics of topic"
173204

205+
auto_partitioning_settings: "PublicAutoPartitioningSettings"
206+
174207
@dataclass
175208
class PartitionInfo:
176209
partition_id: int

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes
582582
)
583583

584584
def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
585-
logger.debug(f"End partition session with id: {message.partition_session_id}")
585+
logger.info(f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}")
586586

587587
def _on_read_response(self, message: StreamReadMessage.ReadResponse):
588588
self._buffer_consume_bytes(message.bytes_size)

ydb/topic.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@
8080
PublicConsumer as TopicConsumer,
8181
PublicAlterConsumer as TopicAlterConsumer,
8282
PublicMeteringMode as TopicMeteringMode,
83+
PublicAutoPartitioningStrategy as TopicAutoPartitioningStrategy,
84+
PublicAutoPartitioningSettings as TopicAutoPartitioningSettings,
85+
PublicAlterAutoPartitioningSettings as TopicAlterAutoPartitioningSettings,
8386
)
8487

8588

@@ -117,6 +120,7 @@ async def create_topic(
117120
attributes: Optional[Dict[str, str]] = None,
118121
consumers: Optional[List[Union[TopicConsumer, str]]] = None,
119122
metering_mode: Optional[TopicMeteringMode] = None,
123+
auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None,
120124
):
121125
"""
122126
create topic command
@@ -151,6 +155,7 @@ async def alter_topic(
151155
self,
152156
path: str,
153157
set_min_active_partitions: Optional[int] = None,
158+
set_max_active_partitions: Optional[int] = None,
154159
set_partition_count_limit: Optional[int] = None,
155160
add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
156161
alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
@@ -162,6 +167,7 @@ async def alter_topic(
162167
set_retention_period: Optional[datetime.timedelta] = None,
163168
set_retention_storage_mb: Optional[int] = None,
164169
set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
170+
alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None,
165171
):
166172
"""
167173
alter topic command
@@ -226,6 +232,7 @@ def reader(
226232
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
227233
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
228234
decoder_executor: Optional[concurrent.futures.Executor] = None,
235+
auto_partitioning_support: Optional[bool] = False, # Auto partitioning feature flag. Default - False.
229236
) -> TopicReaderAsyncIO:
230237

231238
if not decoder_executor:
@@ -314,6 +321,7 @@ def create_topic(
314321
attributes: Optional[Dict[str, str]] = None,
315322
consumers: Optional[List[Union[TopicConsumer, str]]] = None,
316323
metering_mode: Optional[TopicMeteringMode] = None,
324+
auto_partitioning_settings: Optional[TopicAutoPartitioningSettings] = None,
317325
):
318326
"""
319327
create topic command
@@ -350,6 +358,7 @@ def alter_topic(
350358
self,
351359
path: str,
352360
set_min_active_partitions: Optional[int] = None,
361+
set_max_active_partitions: Optional[int] = None,
353362
set_partition_count_limit: Optional[int] = None,
354363
add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
355364
alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
@@ -361,6 +370,7 @@ def alter_topic(
361370
set_retention_period: Optional[datetime.timedelta] = None,
362371
set_retention_storage_mb: Optional[int] = None,
363372
set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
373+
alter_auto_partitioning_settings: Optional[TopicAlterAutoPartitioningSettings] = None,
364374
):
365375
"""
366376
alter topic command
@@ -431,6 +441,7 @@ def reader(
431441
# custom decoder executor for call builtin and custom decoders. If None - use shared executor pool.
432442
# if max_worker in the executor is 1 - then decoders will be called from the thread without parallel
433443
decoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
444+
auto_partitioning_support: Optional[bool] = False, # Auto partitioning feature flag. Default - False.
434445
) -> TopicReader:
435446
if not decoder_executor:
436447
decoder_executor = self._executor

0 commit comments

Comments
 (0)