Skip to content

Commit 6982a40

Browse files
feat: create/drop topic (#199)
1 parent 67a694c commit 6982a40

File tree

3 files changed

+266
-1
lines changed

3 files changed

+266
-1
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
/// <summary>
4+
/// Create topic request sent from client to server.
5+
/// </summary>
6+
public class CreateTopicSettings : OperationSettings
7+
{
8+
/// <summary>
9+
/// Topic path.
10+
/// </summary>
11+
public string Path { get; set; } = string.Empty;
12+
13+
/// <summary>
14+
/// Settings for partitioning
15+
/// </summary>
16+
public PartitioningSettings? PartitioningSettings { get; set; }
17+
18+
/// <summary>
19+
/// Retention settings.
20+
///
21+
/// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
22+
/// Default limit - 36 hours.
23+
/// </summary>
24+
public TimeSpan? RetentionPeriod { get; set; }
25+
26+
/// <summary>
27+
/// How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
28+
/// Zero value means infinite limit.
29+
/// </summary>
30+
public long RetentionStorageMb { get; set; }
31+
32+
/// <summary>
33+
/// List of allowed codecs for writes.
34+
/// Writes with codec not from this list are forbidden.
35+
/// If empty, codec compatibility check for the topic is disabled.
36+
/// </summary>
37+
public List<Codec> SupportedCodecs { get; } = new();
38+
39+
/// <summary>
40+
/// Partition write speed in bytes per second. Must be less than database limit.
41+
/// Zero value means default limit: 1 MB per second.
42+
/// </summary>
43+
public long PartitionWriteSpeedBytesPerSecond { get; set; }
44+
45+
/// <summary>
46+
/// Burst size for write in partition, in bytes. Must be less than database limit.
47+
/// Zero value means default limit: 1 MB.
48+
/// </summary>
49+
public long PartitionWriteBurstBytes { get; set; }
50+
51+
/// <summary>
52+
/// User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
53+
/// </summary>
54+
public Dictionary<string, string> Attributes { get; set; } = new();
55+
56+
/// <summary>
57+
/// List of consumers for this topic.
58+
/// </summary>
59+
public List<Consumer> Consumers { get; } = new();
60+
61+
/// <summary>
62+
/// Metering mode for the topic in a serverless database.
63+
/// </summary>
64+
public MeteringMode MeteringMode { get; set; }
65+
}
66+
67+
/// <summary>
68+
/// Partitioning settings for topic.
69+
/// </summary>
70+
public class PartitioningSettings
71+
{
72+
/// <summary>
73+
/// Minimum partition count auto merge would stop working at.
74+
/// Zero value means default - 1.
75+
/// </summary>
76+
public long MinActivePartitions { get; set; }
77+
78+
/// <summary>
79+
/// Limit for total partition count, including active (open for write) and read-only partitions.
80+
/// Zero value means default - 100.
81+
/// </summary>
82+
public long PartitionCountLimit { get; set; }
83+
}
84+
85+
/// <summary>
86+
/// Drop topic request sent from client to server.
87+
/// </summary>
88+
public class DropTopicSettings : OperationSettings
89+
{
90+
/// <summary>
91+
/// Topic path.
92+
/// </summary>
93+
public string Path { get; set; } = string.Empty;
94+
}
95+
96+
/// <summary>
97+
/// Update existing topic request sent from client to server.
98+
/// </summary>
99+
public class AlterTopicSettings : OperationSettings
100+
{
101+
/// <summary>
102+
/// Topic path.
103+
/// </summary>
104+
public string Path { get; set; } = string.Empty;
105+
}
106+
107+
public enum Codec
108+
{
109+
Unspecified = Ydb.Topic.Codec.Unspecified,
110+
Raw = Ydb.Topic.Codec.Raw,
111+
Gzip = Ydb.Topic.Codec.Gzip,
112+
Lzop = Ydb.Topic.Codec.Lzop,
113+
Zstd = Ydb.Topic.Codec.Zstd
114+
}
115+
116+
/// <summary>
117+
/// Metering mode specifies the method used to determine consumption of resources by the topic.
118+
/// This settings will have an effect only in a serverless database.
119+
/// </summary>
120+
public enum MeteringMode
121+
{
122+
/// <summary>
123+
/// Use default
124+
/// </summary>
125+
MeteringModeUnspecified = Ydb.Topic.MeteringMode.Unspecified,
126+
127+
/// <summary>
128+
/// Metering based on resource reservation
129+
/// </summary>
130+
MeteringModeReservedCapacity = Ydb.Topic.MeteringMode.ReservedCapacity,
131+
132+
/// <summary>
133+
/// Metering based on actual consumption. Default.
134+
/// </summary>
135+
MeteringModeRequestUnits = Ydb.Topic.MeteringMode.RequestUnits
136+
}
137+
138+
/// <summary>
139+
/// Consumer description
140+
/// </summary>
141+
public class Consumer
142+
{
143+
/// <param name="name">Must have valid not empty name as a key.</param>
144+
public Consumer(string name)
145+
{
146+
Name = name;
147+
}
148+
149+
/// <summary>
150+
/// Must have valid not empty name as a key.
151+
/// </summary>
152+
public string Name { get; }
153+
154+
/// <summary>
155+
/// Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
156+
/// User should take care that such consumer never stalls, to prevent running out of disk space.
157+
/// Flag that this consumer is important.
158+
/// </summary>
159+
public bool Important { get; set; }
160+
161+
/// <summary>
162+
/// All messages with smaller server written_at timestamp will be skipped.
163+
/// </summary>
164+
public DateTime? ReadFrom { get; set; }
165+
166+
/// <summary>
167+
/// List of supported codecs by this consumer.
168+
/// supported_codecs on topic must be contained inside this list.
169+
/// If empty, codec compatibility check for the consumer is disabled.
170+
/// </summary>
171+
public List<Codec> SupportedCodecs { get; } = new();
172+
173+
/// <summary>
174+
/// Attributes of consumer.
175+
/// </summary>
176+
public Dictionary<string, string> Attributes { get; } = new();
177+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using Google.Protobuf.WellKnownTypes;
2+
using Ydb.Topic;
3+
using Ydb.Topic.V1;
4+
5+
namespace Ydb.Sdk.Services.Topic;
6+
7+
public class TopicClient
8+
{
9+
private readonly Driver _driver;
10+
11+
public TopicClient(Driver driver)
12+
{
13+
_driver = driver;
14+
}
15+
16+
public async Task CreateTopic(CreateTopicSettings settings)
17+
{
18+
var protoSettings = new CreateTopicRequest
19+
{
20+
Path = settings.Path,
21+
RetentionStorageMb = settings.RetentionStorageMb,
22+
PartitionWriteBurstBytes = settings.PartitionWriteBurstBytes,
23+
PartitionWriteSpeedBytesPerSecond = settings.PartitionWriteSpeedBytesPerSecond,
24+
MeteringMode = (Ydb.Topic.MeteringMode)settings.MeteringMode,
25+
OperationParams = settings.MakeOperationParams()
26+
};
27+
28+
protoSettings.Attributes.Add(settings.Attributes);
29+
30+
if (settings.PartitioningSettings != null)
31+
{
32+
protoSettings.PartitioningSettings = new Ydb.Topic.PartitioningSettings
33+
{
34+
PartitionCountLimit = settings.PartitioningSettings.PartitionCountLimit,
35+
MinActivePartitions = settings.PartitioningSettings.MinActivePartitions
36+
};
37+
}
38+
39+
if (settings.RetentionPeriod != null)
40+
{
41+
protoSettings.RetentionPeriod = Duration.FromTimeSpan(settings.RetentionPeriod.Value);
42+
}
43+
44+
foreach (var codec in settings.SupportedCodecs)
45+
{
46+
protoSettings.SupportedCodecs.Codecs.Add((int)codec);
47+
}
48+
49+
foreach (var consumer in settings.Consumers)
50+
{
51+
var protoConsumer = new Ydb.Topic.Consumer
52+
{
53+
Name = consumer.Name,
54+
Important = consumer.Important
55+
};
56+
protoConsumer.Attributes.Add(consumer.Attributes);
57+
58+
if (consumer.ReadFrom != null)
59+
{
60+
protoConsumer.ReadFrom = Timestamp.FromDateTime(consumer.ReadFrom.Value);
61+
}
62+
63+
foreach (var codec in consumer.SupportedCodecs)
64+
{
65+
protoConsumer.SupportedCodecs.Codecs.Add((int)codec);
66+
}
67+
68+
protoSettings.Consumers.Add(protoConsumer);
69+
}
70+
71+
var response = await _driver.UnaryCall(TopicService.CreateTopicMethod, protoSettings, settings);
72+
73+
Status.FromProto(response.Operation.Status, response.Operation.Issues).EnsureSuccess();
74+
}
75+
76+
public async Task DropTopic(DropTopicSettings settings)
77+
{
78+
var protoSettings = new DropTopicRequest
79+
{
80+
Path = settings.Path,
81+
OperationParams = settings.MakeOperationParams()
82+
};
83+
84+
var response = await _driver.UnaryCall(TopicService.DropTopicMethod, protoSettings, settings);
85+
86+
Status.FromProto(response.Operation.Status, response.Operation.Issues).EnsureSuccess();
87+
}
88+
}

src/Ydb.Sdk/tests/Auth/StaticAuthTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace Ydb.Sdk.Tests.Auth;
88

9-
[Trait("Category", "Integration")]
9+
// [Trait("Category", "Integration")]
1010
[Collection("Auth tests")]
1111
public class StaticAuthTests : IDisposable
1212
{

0 commit comments

Comments
 (0)