Skip to content

Commit dfe2ad8

Browse files
authored
Merge pull request #1508 from qrort/describe-topic-consumer added Topic.DescribeTopicConsumer
2 parents 60754e6 + 6503633 commit dfe2ad8

File tree

11 files changed

+680
-15
lines changed

11 files changed

+680
-15
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `db.Topic().DescribeTopicConsumer()` method for displaying consumer information
2+
13
## v3.84.1
24
* Added session info into `trace.TableSessionBulkUpsertStartInfo`
35

internal/grpcwrapper/rawoptional/rawoptional.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,26 @@ func (v *Duration) ToProto() *durationpb.Duration {
3535
return nil
3636
}
3737

38+
func (v *Duration) MustFromProto(proto *durationpb.Duration) {
39+
if proto == nil {
40+
v.Value = time.Duration(0)
41+
v.HasValue = false
42+
43+
return
44+
}
45+
46+
v.HasValue = true
47+
v.Value = proto.AsDuration()
48+
}
49+
50+
func (v *Duration) ToDuration() *time.Duration {
51+
if v.HasValue {
52+
return nil
53+
}
54+
55+
return &v.Value
56+
}
57+
3858
type Int64 struct {
3959
Value int64
4060
HasValue bool
@@ -74,3 +94,11 @@ func (v *Time) MustFromProto(proto *timestamppb.Timestamp) {
7494
v.HasValue = true
7595
v.Value = proto.AsTime()
7696
}
97+
98+
func (v *Time) ToTime() *time.Time {
99+
if v.HasValue {
100+
return nil
101+
}
102+
103+
return &v.Value
104+
}

internal/grpcwrapper/rawtopic/client.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,27 @@ func (c *Client) CreateTopic(
4646
func (c *Client) DescribeTopic(ctx context.Context, req DescribeTopicRequest) (res DescribeTopicResult, err error) {
4747
resp, err := c.service.DescribeTopic(ctx, req.ToProto())
4848
if err != nil {
49-
return DescribeTopicResult{}, xerrors.WithStackTrace(xerrors.Wrap(
50-
fmt.Errorf("ydb: describe topic grpc failed: %w", err),
51-
))
49+
return DescribeTopicResult{}, xerrors.WithStackTrace(
50+
xerrors.Wrap(
51+
fmt.Errorf("ydb: describe topic grpc failed: %w", err),
52+
),
53+
)
54+
}
55+
err = res.FromProto(resp)
56+
57+
return res, err
58+
}
59+
60+
func (c *Client) DescribeConsumer(ctx context.Context, req DescribeConsumerRequest) (
61+
res DescribeConsumerResult, err error,
62+
) {
63+
resp, err := c.service.DescribeConsumer(ctx, req.ToProto())
64+
if err != nil {
65+
return DescribeConsumerResult{}, xerrors.WithStackTrace(
66+
xerrors.Wrap(
67+
fmt.Errorf("ydb: describe topic consumer grpc failed: %w", err),
68+
),
69+
)
5270
}
5371
err = res.FromProto(resp)
5472

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package rawtopic
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/clone"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawoptional"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawscheme"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
15+
)
16+
17+
type DescribeConsumerRequest struct {
18+
OperationParams rawydb.OperationParams
19+
Path string
20+
Consumer string
21+
IncludeStats bool
22+
}
23+
24+
func (req *DescribeConsumerRequest) ToProto() *Ydb_Topic.DescribeConsumerRequest {
25+
return &Ydb_Topic.DescribeConsumerRequest{
26+
OperationParams: req.OperationParams.ToProto(),
27+
Path: req.Path,
28+
Consumer: req.Consumer,
29+
IncludeStats: req.IncludeStats,
30+
}
31+
}
32+
33+
type DescribeConsumerResult struct {
34+
Operation rawydb.Operation
35+
Self rawscheme.Entry
36+
Consumer Consumer
37+
Partitions []DescribeConsumerResultPartitionInfo
38+
}
39+
40+
func (res *DescribeConsumerResult) FromProto(response operation.Response) error {
41+
if err := res.Operation.FromProtoWithStatusCheck(response.GetOperation()); err != nil {
42+
return err
43+
}
44+
protoResult := &Ydb_Topic.DescribeConsumerResult{}
45+
if err := response.GetOperation().GetResult().UnmarshalTo(protoResult); err != nil {
46+
return xerrors.WithStackTrace(
47+
fmt.Errorf(
48+
"ydb: describe consumer result failed on unmarshal grpc result: %w", err,
49+
),
50+
)
51+
}
52+
53+
if err := res.Self.FromProto(protoResult.GetSelf()); err != nil {
54+
return err
55+
}
56+
57+
res.Consumer.MustFromProto(protoResult.GetConsumer())
58+
59+
protoPartitions := protoResult.GetPartitions()
60+
res.Partitions = make([]DescribeConsumerResultPartitionInfo, len(protoPartitions))
61+
for i, protoPartition := range protoPartitions {
62+
if err := res.Partitions[i].FromProto(protoPartition); err != nil {
63+
return err
64+
}
65+
}
66+
67+
return nil
68+
}
69+
70+
type MultipleWindowsStat struct {
71+
PerMinute int64
72+
PerHour int64
73+
PerDay int64
74+
}
75+
76+
func (stat *MultipleWindowsStat) MustFromProto(proto *Ydb_Topic.MultipleWindowsStat) {
77+
stat.PerMinute = proto.GetPerMinute()
78+
stat.PerHour = proto.GetPerHour()
79+
stat.PerDay = proto.GetPerDay()
80+
}
81+
82+
type PartitionStats struct {
83+
PartitionsOffset rawtopiccommon.OffsetRange
84+
StoreSizeBytes int64
85+
LastWriteTime rawoptional.Time
86+
MaxWriteTimeLag rawoptional.Duration
87+
BytesWritten MultipleWindowsStat
88+
}
89+
90+
func (ps *PartitionStats) FromProto(proto *Ydb_Topic.PartitionStats) error {
91+
if proto == nil {
92+
return nil
93+
}
94+
if err := ps.PartitionsOffset.FromProto(proto.GetPartitionOffsets()); err != nil {
95+
return err
96+
}
97+
ps.StoreSizeBytes = proto.GetStoreSizeBytes()
98+
ps.LastWriteTime.MustFromProto(proto.GetLastWriteTime())
99+
ps.MaxWriteTimeLag.ToProto()
100+
ps.BytesWritten.MustFromProto(proto.GetBytesWritten())
101+
102+
return nil
103+
}
104+
105+
type PartitionConsumerStats struct {
106+
LastReadOffset int64
107+
CommittedOffset int64
108+
ReadSessionID string
109+
PartitionReadSessionCreateTime rawoptional.Time
110+
LastReadTime rawoptional.Time
111+
MaxReadTimeLag rawoptional.Duration
112+
MaxWriteTimeLag rawoptional.Duration
113+
BytesRead MultipleWindowsStat
114+
ReaderName string
115+
}
116+
117+
func (stats *PartitionConsumerStats) FromProto(proto *Ydb_Topic.DescribeConsumerResult_PartitionConsumerStats) error {
118+
if proto == nil {
119+
return nil
120+
}
121+
stats.LastReadOffset = proto.GetLastReadOffset()
122+
stats.CommittedOffset = proto.GetCommittedOffset()
123+
stats.ReadSessionID = proto.GetReadSessionId()
124+
stats.PartitionReadSessionCreateTime.MustFromProto(proto.GetPartitionReadSessionCreateTime())
125+
stats.LastReadTime.MustFromProto(proto.GetLastReadTime())
126+
stats.MaxReadTimeLag.MustFromProto(proto.GetMaxReadTimeLag())
127+
stats.MaxWriteTimeLag.MustFromProto(proto.GetMaxWriteTimeLag())
128+
stats.BytesRead.MustFromProto(proto.GetBytesRead())
129+
stats.ReaderName = proto.GetReaderName()
130+
131+
return nil
132+
}
133+
134+
type DescribeConsumerResultPartitionInfo struct {
135+
PartitionID int64
136+
Active bool
137+
ChildPartitionIDs []int64
138+
ParentPartitionIDs []int64
139+
PartitionStats PartitionStats
140+
PartitionConsumerStats PartitionConsumerStats
141+
}
142+
143+
func (pi *DescribeConsumerResultPartitionInfo) FromProto(proto *Ydb_Topic.DescribeConsumerResult_PartitionInfo) error {
144+
pi.PartitionID = proto.GetPartitionId()
145+
pi.Active = proto.GetActive()
146+
147+
pi.ChildPartitionIDs = clone.Int64Slice(proto.GetChildPartitionIds())
148+
pi.ParentPartitionIDs = clone.Int64Slice(proto.GetParentPartitionIds())
149+
150+
return pi.PartitionStats.FromProto(proto.GetPartitionStats())
151+
}

internal/topic/topicclientinternal/client.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,54 @@ func (c *Client) Describe(
181181
return res, nil
182182
}
183183

184+
// Describe topic consumer
185+
func (c *Client) DescribeTopicConsumer(
186+
ctx context.Context,
187+
path string,
188+
consumer string,
189+
opts ...topicoptions.DescribeConsumerOption,
190+
) (res topictypes.TopicConsumerDescription, _ error) {
191+
req := rawtopic.DescribeConsumerRequest{
192+
OperationParams: c.defaultOperationParams,
193+
Path: path,
194+
Consumer: consumer,
195+
}
196+
197+
for _, opt := range opts {
198+
if opt != nil {
199+
opt(&req)
200+
}
201+
}
202+
203+
var rawRes rawtopic.DescribeConsumerResult
204+
205+
call := func(ctx context.Context) (describeErr error) {
206+
rawRes, describeErr = c.rawClient.DescribeConsumer(ctx, req)
207+
208+
return describeErr
209+
}
210+
211+
var err error
212+
213+
if c.cfg.AutoRetry() {
214+
err = retry.Retry(ctx, call,
215+
retry.WithIdempotent(true),
216+
retry.WithTrace(c.cfg.TraceRetry()),
217+
retry.WithBudget(c.cfg.RetryBudget()),
218+
)
219+
} else {
220+
err = call(ctx)
221+
}
222+
223+
if err != nil {
224+
return res, err
225+
}
226+
227+
res.FromRaw(&rawRes)
228+
229+
return res, nil
230+
}
231+
184232
// Drop topic
185233
func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error {
186234
req := rawtopic.DropTopicRequest{}

0 commit comments

Comments
 (0)