-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver_resp_test.go
More file actions
192 lines (172 loc) · 4.69 KB
/
server_resp_test.go
File metadata and controls
192 lines (172 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// server_concurrency_test.go
package rajomon
import (
"context"
"log"
"net"
"sync"
"testing"
"time"
pb "github.com/Jiali-Xing/protobuf" // replace with your actual import path
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
func busyLoop(c chan<- int, quit chan bool) {
for {
if <-quit {
return
}
}
}
func computation(duration int) {
// Jiali: the following block implements the fake computation
quit := make(chan bool)
busyChan := make(chan int)
go busyLoop(busyChan, quit)
select {
case busyResult := <-busyChan:
log.Println(busyResult)
case <-time.After(time.Duration(duration) * time.Millisecond):
// log.Println("timed out")
}
quit <- true
return
}
// greetingServer is the minimal server with Rajomon AQM applied.
type greetingServer struct {
pb.UnimplementedGreetingServiceServer
pt *PriceTable
}
func (s *greetingServer) Greeting(ctx context.Context, req *pb.GreetingRequest) (*pb.GreetingResponse, error) {
// simulate ~5ms of work per request
computation(5000)
// grab the incoming greeting
incoming := req.GetGreeting()
// create your new one
serverGreet := &pb.Greeting{
Service: "server", // or s.name, whatever
// fill in any additional fields here
}
// combine them into the response slice
resp := &pb.GreetingResponse{
Greeting: []*pb.Greeting{
incoming,
serverGreet,
},
}
return resp, nil
}
var (
Opts = map[string]interface{}{
"priceUpdateRate": 5000 * time.Microsecond,
"tokenUpdateRate": 100000 * time.Microsecond,
"latencyThreshold": 500 * time.Microsecond,
"priceStep": int64(180),
"priceStrategy": "expdecay",
"lazyResponse": false,
"rateLimiting": true,
"loadShedding": true,
"pinpointQueuing": true,
// "debug": true,
}
)
func TestHighConcurrencyPriceIncrease(t *testing.T) {
// 1) Prepare Rajomon
callMap := map[string][]string{"Greeting": {}}
pt := NewRajomon("node-1", callMap, Opts)
// 2) Start gRPC server on :50051
lis, err := net.Listen("tcp", ":50051")
if err != nil {
t.Fatalf("listen failed: %v", err)
}
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(pt.UnaryInterceptor),
)
pb.RegisterGreetingServiceServer(grpcServer, &greetingServer{pt: pt})
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("grpc serve error: %v", err)
}
}()
defer grpcServer.Stop()
// let server warm up
time.Sleep(1 * time.Second)
// 4) Fire N clients that loop until stop
const concurrency = 1000
const testDuration = 10 * time.Second
stop := make(chan struct{})
// 3) Dial clients and fire 100 concurrent RPCs
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
t.Errorf("dial error: %v", err)
return
}
defer conn.Close()
client := pb.NewGreetingServiceClient(conn)
// attach metadata so interceptor sees method name
ctx := metadataAppend(context.Background(), "method", "Greeting", "tokens", "1000")
// each client sends req indefinitely
// until the end of the test
for {
select {
case <-stop:
return
default:
resp, err := client.Greeting(ctx, &pb.GreetingRequest{Greeting: &pb.Greeting{Service: "client"}})
if err != nil {
// we allow both success or ResourceExhausted here
if status.Code(err) != codes.ResourceExhausted {
t.Errorf("unexpected error: %v", err)
}
} else {
// resp should not be nil
if resp == nil {
t.Errorf("Greeting response: nil")
}
}
}
}
}()
}
time.Sleep(testDuration / 2)
// print the price table
// pt.priceTableMap.Range(func(key, value interface{}) bool {
// if k, ok := key.(string); ok {
// if v, valid := value.(int64); valid {
// t.Logf("PriceTable: %s = %d", k, v)
// } else {
// t.Errorf("PriceTable: %s = %v (not int64)", k, value)
// }
// } else {
// t.Errorf("PriceTable: %v (not string)", key)
// }
// return true
// })
// 5) Check that price > 0
priceStr, err := pt.RetrieveTotalPrice(context.Background(), "Greeting")
if err != nil {
t.Fatalf("RetrieveTotalPrice error: %v", err)
}
if priceStr == "0" {
t.Errorf("price did not increase under load, still %s", priceStr)
} else {
t.Logf("observed price after load: %s", priceStr)
}
// 4) Give Rajomon a moment to update price
time.Sleep(testDuration / 2)
// stop the clients
close(stop)
wg.Wait()
}
// metadataAppend is a small helper to add k/v pairs into context metadata
func metadataAppend(ctx context.Context, kv ...string) context.Context {
md := metadata.Pairs(kv...)
return metadata.NewOutgoingContext(ctx, md)
}