Skip to content

Commit 21f84b0

Browse files
committed
add public api
1 parent f697fff commit 21f84b0

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

alterclientquotas.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/alterclientquotas"
10+
)
11+
12+
// AlterClientQuotasRequest represents a request sent to a kafka broker to add
13+
// alter client quotas.
14+
type AlterClientQuotasRequest struct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// List of client quotas entries to alter.
19+
Entries []AlterClientQuotaEntry
20+
21+
// Whether the alteration should be validated, but not performed.
22+
ValidateOnly bool
23+
}
24+
25+
type AlterClientQuotaEntry struct {
26+
Entities []AlterClientQuotaEntity
27+
Ops []AlterClientQuotaOps
28+
}
29+
30+
type AlterClientQuotaEntity struct {
31+
EntityType string
32+
EntityName string
33+
}
34+
35+
type AlterClientQuotaOps struct {
36+
Key string
37+
Value float64
38+
Remove bool
39+
}
40+
41+
type AlterClientQuotaResponseQuotas struct {
42+
ErrorCode int16
43+
ErrorMessage string
44+
Entities []AlterClientQuotaEntity
45+
}
46+
47+
// AlterClientQuotasResponse represents a response from a kafka broker to an alter client
48+
// quotas request.
49+
type AlterClientQuotasResponse struct {
50+
// The amount of time that the broker throttled the request.
51+
Throttle time.Duration
52+
53+
// List of altered client quotas responses.
54+
Entries []AlterClientQuotaResponseQuotas
55+
}
56+
57+
// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
58+
// the response.
59+
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
60+
entries := make([]alterclientquotas.Entry, len(req.Entries))
61+
62+
for entryIdx, entry := range req.Entries {
63+
entities := make([]alterclientquotas.Entity, len(entry.Entities))
64+
for entityIdx, entity := range entry.Entities {
65+
entities[entityIdx] = alterclientquotas.Entity{
66+
EntityType: entity.EntityType,
67+
EntityName: entity.EntityName,
68+
}
69+
}
70+
71+
ops := make([]alterclientquotas.Ops, len(entry.Ops))
72+
for opsIdx, op := range entry.Ops {
73+
ops[opsIdx] = alterclientquotas.Ops{
74+
Key: op.Key,
75+
Value: op.Value,
76+
Remove: op.Remove,
77+
}
78+
}
79+
80+
entries[entryIdx] = alterclientquotas.Entry{
81+
Entities: entities,
82+
Ops: ops,
83+
}
84+
}
85+
86+
m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
87+
Entries: entries,
88+
ValidateOnly: req.ValidateOnly,
89+
})
90+
if err != nil {
91+
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
92+
}
93+
94+
res := m.(*alterclientquotas.Response)
95+
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))
96+
97+
for responseEntryIdx, responseEntry := range res.Results {
98+
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
99+
for responseEntityIdx, responseEntity := range responseEntry.Entities {
100+
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
101+
EntityType: responseEntity.EntityType,
102+
EntityName: responseEntity.EntityName,
103+
}
104+
}
105+
106+
responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
107+
ErrorCode: responseEntry.ErrorCode,
108+
ErrorMessage: responseEntry.ErrorMessage,
109+
Entities: responseEntities,
110+
}
111+
}
112+
ret := &AlterClientQuotasResponse{
113+
Throttle: makeDuration(res.ThrottleTimeMs),
114+
Entries: responseEntries,
115+
}
116+
117+
return ret, nil
118+
}

alterclientquotas_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
)
9+
10+
func TestClientAlterClientQuotas(t *testing.T) {
11+
// Added in Version 2.6.0 https://issues.apache.org/jira/browse/KAFKA-7740
12+
if !ktesting.KafkaIsAtLeast("2.6.0") {
13+
return
14+
}
15+
16+
const (
17+
entityType = "client-id"
18+
entityName = "my-client-id"
19+
key = "producer_byte_rate"
20+
value = 500000.0
21+
)
22+
23+
client, shutdown := newLocalClient()
24+
defer shutdown()
25+
26+
resp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{
27+
Entries: []AlterClientQuotaEntry{
28+
AlterClientQuotaEntry{
29+
Entities: []AlterClientQuotaEntity{
30+
AlterClientQuotaEntity{
31+
EntityType: entityType,
32+
EntityName: entityName,
33+
},
34+
},
35+
Ops: []AlterClientQuotaOps{
36+
AlterClientQuotaOps{
37+
Key: key,
38+
Value: value,
39+
Remove: false,
40+
},
41+
},
42+
},
43+
},
44+
})
45+
46+
if err != nil {
47+
t.Fatal(err)
48+
}
49+
}

0 commit comments

Comments
 (0)