Skip to content

Commit 941002e

Browse files
authored
Merge pull request #1119 from segmentio/quotas-api
KIP-546: Add Client Quota APIs
2 parents f057b1d + b5875a6 commit 941002e

12 files changed

+772
-0
lines changed

alterclientquotas.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/alterclientquotas"
10+
)
11+
12+
// AlterClientQuotasRequest represents a request sent to a kafka broker to
13+
// alter client quotas.
14+
type AlterClientQuotasRequest struct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// List of client quotas entries to alter.
19+
Entries []AlterClientQuotaEntry
20+
21+
// Whether the alteration should be validated, but not performed.
22+
ValidateOnly bool
23+
}
24+
25+
type AlterClientQuotaEntry struct {
26+
// The quota entities to alter.
27+
Entities []AlterClientQuotaEntity
28+
29+
// An individual quota configuration entry to alter.
30+
Ops []AlterClientQuotaOps
31+
}
32+
33+
type AlterClientQuotaEntity struct {
34+
// The quota entity type.
35+
EntityType string
36+
37+
// The name of the quota entity, or null if the default.
38+
EntityName string
39+
}
40+
41+
type AlterClientQuotaOps struct {
42+
// The quota configuration key.
43+
Key string
44+
45+
// The quota configuration value to set, otherwise ignored if the value is to be removed.
46+
Value float64
47+
48+
// Whether the quota configuration value should be removed, otherwise set.
49+
Remove bool
50+
}
51+
52+
type AlterClientQuotaResponseQuotas struct {
53+
// Error is set to a non-nil value including the code and message if a top-level
54+
// error was encountered when doing the update.
55+
Error error
56+
57+
// The altered quota entities.
58+
Entities []AlterClientQuotaEntity
59+
}
60+
61+
// AlterClientQuotasResponse represents a response from a kafka broker to an alter client
62+
// quotas request.
63+
type AlterClientQuotasResponse struct {
64+
// The amount of time that the broker throttled the request.
65+
Throttle time.Duration
66+
67+
// List of altered client quotas responses.
68+
Entries []AlterClientQuotaResponseQuotas
69+
}
70+
71+
// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
72+
// the response.
73+
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
74+
entries := make([]alterclientquotas.Entry, len(req.Entries))
75+
76+
for entryIdx, entry := range req.Entries {
77+
entities := make([]alterclientquotas.Entity, len(entry.Entities))
78+
for entityIdx, entity := range entry.Entities {
79+
entities[entityIdx] = alterclientquotas.Entity{
80+
EntityType: entity.EntityType,
81+
EntityName: entity.EntityName,
82+
}
83+
}
84+
85+
ops := make([]alterclientquotas.Ops, len(entry.Ops))
86+
for opsIdx, op := range entry.Ops {
87+
ops[opsIdx] = alterclientquotas.Ops{
88+
Key: op.Key,
89+
Value: op.Value,
90+
Remove: op.Remove,
91+
}
92+
}
93+
94+
entries[entryIdx] = alterclientquotas.Entry{
95+
Entities: entities,
96+
Ops: ops,
97+
}
98+
}
99+
100+
m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
101+
Entries: entries,
102+
ValidateOnly: req.ValidateOnly,
103+
})
104+
if err != nil {
105+
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
106+
}
107+
108+
res := m.(*alterclientquotas.Response)
109+
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))
110+
111+
for responseEntryIdx, responseEntry := range res.Results {
112+
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
113+
for responseEntityIdx, responseEntity := range responseEntry.Entities {
114+
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
115+
EntityType: responseEntity.EntityType,
116+
EntityName: responseEntity.EntityName,
117+
}
118+
}
119+
120+
responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
121+
Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage),
122+
Entities: responseEntities,
123+
}
124+
}
125+
ret := &AlterClientQuotasResponse{
126+
Throttle: makeDuration(res.ThrottleTimeMs),
127+
Entries: responseEntries,
128+
}
129+
130+
return ret, nil
131+
}

alterclientquotas_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestClientAlterClientQuotas(t *testing.T) {
12+
// Added in Version 2.6.0 https://issues.apache.org/jira/browse/KAFKA-7740
13+
if !ktesting.KafkaIsAtLeast("2.6.0") {
14+
return
15+
}
16+
17+
const (
18+
entityType = "client-id"
19+
entityName = "my-client-id"
20+
key = "producer_byte_rate"
21+
value = 500000.0
22+
)
23+
24+
client, shutdown := newLocalClient()
25+
defer shutdown()
26+
27+
alterResp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{
28+
Entries: []AlterClientQuotaEntry{
29+
{
30+
Entities: []AlterClientQuotaEntity{
31+
{
32+
EntityType: entityType,
33+
EntityName: entityName,
34+
},
35+
},
36+
Ops: []AlterClientQuotaOps{
37+
{
38+
Key: key,
39+
Value: value,
40+
Remove: false,
41+
},
42+
},
43+
},
44+
},
45+
})
46+
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
51+
expectedAlterResp := AlterClientQuotasResponse{
52+
Throttle: 0,
53+
Entries: []AlterClientQuotaResponseQuotas{
54+
{
55+
Error: makeError(0, ""),
56+
Entities: []AlterClientQuotaEntity{
57+
{
58+
EntityName: entityName,
59+
EntityType: entityType,
60+
},
61+
},
62+
},
63+
},
64+
}
65+
66+
assert.Equal(t, expectedAlterResp, *alterResp)
67+
68+
describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
69+
Components: []DescribeClientQuotasRequestComponent{
70+
{
71+
EntityType: entityType,
72+
MatchType: 0,
73+
Match: entityName,
74+
},
75+
},
76+
})
77+
78+
if err != nil {
79+
t.Fatal(err)
80+
}
81+
82+
expectedDescribeResp := DescribeClientQuotasResponse{
83+
Throttle: 0,
84+
Error: makeError(0, ""),
85+
Entries: []DescribeClientQuotasResponseQuotas{
86+
{
87+
Entities: []DescribeClientQuotasEntity{
88+
{
89+
EntityType: entityType,
90+
EntityName: entityName,
91+
},
92+
},
93+
Values: []DescribeClientQuotasValue{
94+
{
95+
Key: key,
96+
Value: value,
97+
},
98+
},
99+
},
100+
},
101+
}
102+
103+
assert.Equal(t, expectedDescribeResp, *describeResp)
104+
}

describeclientquotas.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/describeclientquotas"
10+
)
11+
12+
// DescribeClientQuotasRequest represents a request sent to a kafka broker to
13+
// describe client quotas.
14+
type DescribeClientQuotasRequest struct {
15+
// Address of the kafka broker to send the request to
16+
Addr net.Addr
17+
18+
// List of quota components to describe.
19+
Components []DescribeClientQuotasRequestComponent
20+
21+
// Whether the match is strict, i.e. should exclude entities with
22+
// unspecified entity types.
23+
Strict bool
24+
}
25+
26+
type DescribeClientQuotasRequestComponent struct {
27+
// The entity type that the filter component applies to.
28+
EntityType string
29+
30+
// How to match the entity (0 = exact name, 1 = default name,
31+
// 2 = any specified name).
32+
MatchType int8
33+
34+
// The string to match against, or null if unused for the match type.
35+
Match string
36+
}
37+
38+
// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
39+
type DescribeClientQuotasResponse struct {
40+
// The amount of time that the broker throttled the request.
41+
Throttle time.Duration
42+
43+
// Error is set to a non-nil value including the code and message if a top-level
44+
// error was encountered when doing the update.
45+
Error error
46+
47+
// List of describe client quota responses.
48+
Entries []DescribeClientQuotasResponseQuotas
49+
}
50+
51+
type DescribeClientQuotasEntity struct {
52+
// The quota entity type.
53+
EntityType string
54+
55+
// The name of the quota entity, or null if the default.
56+
EntityName string
57+
}
58+
59+
type DescribeClientQuotasValue struct {
60+
// The quota configuration key.
61+
Key string
62+
63+
// The quota configuration value.
64+
Value float64
65+
}
66+
67+
type DescribeClientQuotasResponseQuotas struct {
68+
// List of client quota entities and their descriptions.
69+
Entities []DescribeClientQuotasEntity
70+
71+
// The client quota configuration values.
72+
Values []DescribeClientQuotasValue
73+
}
74+
75+
// DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns
76+
// the response.
77+
func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
78+
components := make([]describeclientquotas.Component, len(req.Components))
79+
80+
for componentIdx, component := range req.Components {
81+
components[componentIdx] = describeclientquotas.Component{
82+
EntityType: component.EntityType,
83+
MatchType: component.MatchType,
84+
Match: component.Match,
85+
}
86+
}
87+
88+
m, err := c.roundTrip(ctx, req.Addr, &describeclientquotas.Request{
89+
Components: components,
90+
Strict: req.Strict,
91+
})
92+
if err != nil {
93+
return nil, fmt.Errorf("kafka.(*Client).DescribeClientQuotas: %w", err)
94+
}
95+
96+
res := m.(*describeclientquotas.Response)
97+
responseEntries := make([]DescribeClientQuotasResponseQuotas, len(res.Entries))
98+
99+
for responseEntryIdx, responseEntry := range res.Entries {
100+
responseEntities := make([]DescribeClientQuotasEntity, len(responseEntry.Entities))
101+
for responseEntityIdx, responseEntity := range responseEntry.Entities {
102+
responseEntities[responseEntityIdx] = DescribeClientQuotasEntity{
103+
EntityType: responseEntity.EntityType,
104+
EntityName: responseEntity.EntityName,
105+
}
106+
}
107+
108+
responseValues := make([]DescribeClientQuotasValue, len(responseEntry.Values))
109+
for responseValueIdx, responseValue := range responseEntry.Values {
110+
responseValues[responseValueIdx] = DescribeClientQuotasValue{
111+
Key: responseValue.Key,
112+
Value: responseValue.Value,
113+
}
114+
}
115+
responseEntries[responseEntryIdx] = DescribeClientQuotasResponseQuotas{
116+
Entities: responseEntities,
117+
Values: responseValues,
118+
}
119+
}
120+
ret := &DescribeClientQuotasResponse{
121+
Throttle: time.Duration(res.ThrottleTimeMs),
122+
Entries: responseEntries,
123+
}
124+
125+
return ret, nil
126+
}

0 commit comments

Comments
 (0)