Skip to content

Commit af7dc5d

Browse files
committed
fix describeclientquotas and testing
1 parent 7450047 commit af7dc5d

File tree

3 files changed

+185
-2
lines changed

3 files changed

+185
-2
lines changed

alterclientquotas.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type AlterClientQuotaResponseQuotas struct {
5656
// The error message, or `nil` if the quota alteration succeeded.
5757
ErrorMessage string
5858

59-
// The altered quota entities
59+
// The altered quota entities.
6060
Entities []AlterClientQuotaEntity
6161
}
6262

alterclientquotas_test.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
ktesting "github.com/segmentio/kafka-go/testing"
8+
"github.com/stretchr/testify/assert"
89
)
910

1011
func TestClientAlterClientQuotas(t *testing.T) {
@@ -23,7 +24,22 @@ func TestClientAlterClientQuotas(t *testing.T) {
2324
client, shutdown := newLocalClient()
2425
defer shutdown()
2526

26-
_, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{
27+
expectedAlterResp := AlterClientQuotasResponse{
28+
Throttle: 0,
29+
Entries: []AlterClientQuotaResponseQuotas{
30+
AlterClientQuotaResponseQuotas{
31+
ErrorCode: 0,
32+
Entities: []AlterClientQuotaEntity{
33+
AlterClientQuotaEntity{
34+
EntityName: entityName,
35+
EntityType: entityType,
36+
},
37+
},
38+
},
39+
},
40+
}
41+
42+
alterResp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{
2743
Entries: []AlterClientQuotaEntry{
2844
AlterClientQuotaEntry{
2945
Entities: []AlterClientQuotaEntity{
@@ -46,4 +62,43 @@ func TestClientAlterClientQuotas(t *testing.T) {
4662
if err != nil {
4763
t.Fatal(err)
4864
}
65+
66+
assert.Equal(t, expectedAlterResp, *alterResp)
67+
68+
expectedDescribeResp := DescribeClientQuotasResponse{
69+
Throttle: 0,
70+
ErrorCode: 0,
71+
Entries: []DescribeClientQuotasResponseQuotas{
72+
DescribeClientQuotasResponseQuotas{
73+
Entities: []DescribeClientQuotasEntity{
74+
DescribeClientQuotasEntity{
75+
EntityType: entityType,
76+
EntityName: entityName,
77+
},
78+
},
79+
Values: []DescribeClientQuotasValue{
80+
DescribeClientQuotasValue{
81+
Key: key,
82+
Value: value,
83+
},
84+
},
85+
},
86+
},
87+
}
88+
89+
describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
90+
Components: []DescribeClientQuotasRequestComponent{
91+
DescribeClientQuotasRequestComponent{
92+
EntityType: entityType,
93+
MatchType: 0,
94+
Match: entityName,
95+
},
96+
},
97+
})
98+
99+
if err != nil {
100+
t.Fatal(err)
101+
}
102+
103+
assert.Equal(t, expectedDescribeResp, *describeResp)
49104
}

describeclientquotas.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/describeclientquotas"
10+
)
11+
12+
// DescribeClientQuotasRequest represents a request sent to a kafka broker to
13+
// describe client quotas.
14+
type DescribeClientQuotasRequest struct {
15+
// Address of the kafka broker to send the request to
16+
Addr net.Addr
17+
18+
// List of quota components to describe.
19+
Components []DescribeClientQuotasRequestComponent `kafka:"min=v0,max=v1"`
20+
21+
// Whether the match is strict, i.e. should exclude entities with
22+
// unspecified entity types.
23+
Strict bool `kafka:"min=v0,max=v1"`
24+
}
25+
26+
type DescribeClientQuotasRequestComponent struct {
27+
// The entity type that the filter component applies to.
28+
EntityType string `kafka:"min=v0,max=v1"`
29+
30+
// How to match the entity (0 = exact name, 1 = default name,
31+
// 2 = any specified name).
32+
MatchType int8 `kafka:"min=v0,max=v1"`
33+
34+
// The string to match against, or null if unused for the match type.
35+
Match string `kafka:"min=v0,max=v1,nullable"`
36+
}
37+
38+
// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
39+
type DescribeClientQuotasResponse struct {
40+
// The amount of time that the broker throttled the request.
41+
Throttle time.Duration `kafka:"min=v0,max=v1"`
42+
43+
// The error code, or `0` if the quota description succeeded.
44+
ErrorCode int16 `kafka:"min=v0,max=v1"`
45+
46+
// The error message, or `null` if the quota description succeeded.
47+
ErrorMessage string `kafka:"min=v0,max=v1,nullable"`
48+
49+
// List of describe client quota responses.
50+
Entries []DescribeClientQuotasResponseQuotas `kafka:"min=v0,max=v1"`
51+
}
52+
53+
type DescribeClientQuotasEntity struct {
54+
// The quota entity type.
55+
EntityType string `kafka:"min=v0,max=v1"`
56+
57+
// The name of the quota entity, or null if the default.
58+
EntityName string `kafka:"min=v0,max=v1,nullable"`
59+
}
60+
61+
type DescribeClientQuotasValue struct {
62+
// The quota configuration key.
63+
Key string `kafka:"min=v0,max=v1"`
64+
65+
// The quota configuration value.
66+
Value float64 `kafka:"min=v0,max=v1"`
67+
}
68+
69+
type DescribeClientQuotasResponseQuotas struct {
70+
// List of client quota entities and their descriptions.
71+
Entities []DescribeClientQuotasEntity `kafka:"min=v0,max=v1"`
72+
73+
// The client quota configuration values.
74+
Values []DescribeClientQuotasValue `kafka:"min=v0,max=v1"`
75+
}
76+
77+
// DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns
78+
// the response.
79+
func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
80+
components := make([]describeclientquotas.Component, len(req.Components))
81+
82+
for componentIdx, component := range req.Components {
83+
components[componentIdx] = describeclientquotas.Component{
84+
EntityType: component.EntityType,
85+
MatchType: component.MatchType,
86+
Match: component.Match,
87+
}
88+
}
89+
90+
m, err := c.roundTrip(ctx, req.Addr, &describeclientquotas.Request{
91+
Components: components,
92+
Strict: req.Strict,
93+
})
94+
if err != nil {
95+
return nil, fmt.Errorf("kafka.(*Client).DescribeClientQuotas: %w", err)
96+
}
97+
98+
res := m.(*describeclientquotas.Response)
99+
responseEntries := make([]DescribeClientQuotasResponseQuotas, len(res.Entries))
100+
101+
for responseEntryIdx, responseEntry := range res.Entries {
102+
responseEntities := make([]DescribeClientQuotasEntity, len(responseEntry.Entities))
103+
for responseEntityIdx, responseEntity := range responseEntry.Entities {
104+
responseEntities[responseEntityIdx] = DescribeClientQuotasEntity{
105+
EntityType: responseEntity.EntityType,
106+
EntityName: responseEntity.EntityName,
107+
}
108+
}
109+
110+
responseValues := make([]DescribeClientQuotasValue, len(responseEntry.Values))
111+
for responseValueIdx, responseValue := range responseEntry.Values {
112+
responseValues[responseValueIdx] = DescribeClientQuotasValue{
113+
Key: responseValue.Key,
114+
Value: responseValue.Value,
115+
}
116+
}
117+
responseEntries[responseEntryIdx] = DescribeClientQuotasResponseQuotas{
118+
Entities: responseEntities,
119+
Values: responseValues,
120+
}
121+
}
122+
ret := &DescribeClientQuotasResponse{
123+
Throttle: time.Duration(res.ThrottleTimeMs),
124+
Entries: responseEntries,
125+
}
126+
127+
return ret, nil
128+
}

0 commit comments

Comments
 (0)