Skip to content

Commit 4733475

Browse files
committed
Tweaked invoker and added tests
Signed-off-by: aryans1204 <arshar1204@gmail.com> Add VHIVE_REPO and LOADER_REPO to setup script config Signed-off-by: Mohsen Ghasemi <mohsenghasemi8156@gmail.com> Added tests and created new vSwarm invoker Fixed linting Fixed VSwarm Invoker for Knative mode Committer: aryans1204 arshar1204@gmail.com Fixed VSwarm Invoker for Knative mode Added new invoker for vSwarm functions and added tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed formatting under grpc_client.go Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed driver tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed linting and slimmed VSwarm tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed linting errors Signed-off-by: aryans1204 <arshar1204@gmail.com> Removed redundancies from Workload Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed vSwarm tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Integrated vSwarm invoker into interface Signed-off-by: aryans1204 <arshar1204@gmail.com>
1 parent 11fc411 commit 4733475

File tree

8 files changed

+251
-52
lines changed

8 files changed

+251
-52
lines changed

.github/workflows/unit-tests.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
[
2929
config,
3030
driver,
31+
driver/clients,
3132
generator,
3233
trace,
3334
]

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ require (
1818
require (
1919
github.com/aws/aws-lambda-go v1.47.0
2020
github.com/stretchr/testify v1.10.0
21+
github.com/containerd/log v0.1.0
22+
github.com/google/uuid v1.6.0
23+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a
2124
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a
2225
go.mongodb.org/mongo-driver v1.17.1
2326
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
@@ -36,7 +39,6 @@ require (
3639
github.com/go-logr/stdr v1.2.2 // indirect
3740
github.com/go-pdf/fpdf v0.9.0 // indirect
3841
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
39-
github.com/google/uuid v1.6.0 // indirect
4042
github.com/kr/fs v0.1.0 // indirect
4143
github.com/pkg/sftp v1.13.4 // indirect
4244
github.com/pmezard/go-difflib v1.0.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s
1111
github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
1212
github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY=
1313
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
14+
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
15+
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
1416
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1517
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1618
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -64,6 +66,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
6466
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6567
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
6668
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
69+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a h1:uT20mQeIhHlzRGgUznT7El03WbWfPt6J9xLPflEmx4E=
70+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a/go.mod h1:e19QDifxTHn1xeHS7ZDFZzUW1EWeVmfaiqm0/jEEyUk=
6771
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a h1:Wq/7eNz96WxQWPMEnhg3ai5sZQufCyplAUotEC+j5Kc=
6872
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a/go.mod h1:7PjQe6bDZ5W5cWHTpNeKRobMy9NK0odj6ROXrfa/CLQ=
6973
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

pkg/config/parser.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type LoaderConfiguration struct {
8383
EnableDAGDataset bool `json:"EnableDAGDataset"`
8484
Width int `json:"Width"`
8585
Depth int `json:"Depth"`
86+
VSwarm bool `json:"VSwarm"`
8687
}
8788

8889
func ReadConfigurationFile(path string) LoaderConfiguration {

pkg/driver/clients/grpc_client.go

Lines changed: 91 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,97 @@ package clients
2626

2727
import (
2828
"context"
29-
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
30-
"strings"
31-
"time"
32-
29+
"github.com/google/uuid"
30+
"github.com/sirupsen/logrus"
3331
"github.com/vhive-serverless/loader/pkg/common"
3432
"github.com/vhive-serverless/loader/pkg/config"
3533
"github.com/vhive-serverless/loader/pkg/workload/proto"
36-
37-
"github.com/sirupsen/logrus"
34+
helloworld "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld"
35+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3836
"google.golang.org/grpc"
3937
"google.golang.org/grpc/credentials/insecure"
38+
"strings"
39+
"time"
4040

4141
mc "github.com/vhive-serverless/loader/pkg/metric"
4242
)
4343

44+
type invoker interface {
45+
Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool
46+
}
47+
48+
type ExecutorRPC struct {
49+
}
50+
51+
func (i ExecutorRPC) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool {
52+
grpcClient := proto.NewExecutorClient(conn)
53+
54+
response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
55+
Message: "nothing",
56+
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
57+
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
58+
})
59+
60+
if err != nil {
61+
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
62+
63+
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
64+
record.FunctionTimeout = true
65+
66+
return false
67+
}
68+
69+
record.Instance = extractInstanceName(response.GetMessage())
70+
record.ActualDuration = response.DurationInMicroSec
71+
72+
if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
73+
record.MemoryAllocationTimeout = true
74+
} else {
75+
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
76+
}
77+
78+
logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
79+
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
80+
81+
return true
82+
}
83+
84+
type SayHelloRPC struct {
85+
}
86+
87+
func (i SayHelloRPC) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool {
88+
grpcClient := helloworld.NewGreeterClient(conn)
89+
response, err := grpcClient.SayHello(executionCxt, &helloworld.HelloRequest{
90+
Name: "Invoke Relay",
91+
VHiveMetadata: MakeVHiveMetadata(
92+
uuid.New().String(),
93+
uuid.New().String(),
94+
time.Now().UTC(),
95+
),
96+
})
97+
if err != nil {
98+
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
99+
record.ConnectionTimeout = true
100+
record.FunctionTimeout = true
101+
102+
return false
103+
}
104+
record.ActualDuration = 0
105+
record.Instance = extractSwarmFunction(response.GetMessage())
106+
record.ActualMemoryUsage = common.Kib2Mib(0) //Memory usage may not be available for all vSwarm benchmarks
107+
108+
return true
109+
}
110+
44111
type grpcInvoker struct {
45-
cfg *config.LoaderConfiguration
112+
cfg *config.LoaderConfiguration
113+
invoker invoker
46114
}
47115

48-
func newGRPCInvoker(cfg *config.LoaderConfiguration) *grpcInvoker {
116+
func newGRPCInvoker(cfg *config.LoaderConfiguration, invoker invoker) *grpcInvoker {
49117
return &grpcInvoker{
50-
cfg: cfg,
118+
cfg: cfg,
119+
invoker: invoker,
51120
}
52121
}
53122

@@ -89,42 +158,12 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
89158
defer gRPCConnectionClose(conn)
90159

91160
record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()
92-
93-
grpcClient := proto.NewExecutorClient(conn)
94161
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
95162
defer cancelExecution()
96-
97-
response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
98-
Message: "nothing",
99-
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
100-
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
101-
})
102-
103-
if err != nil {
104-
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
105-
106-
record.ResponseTime = time.Since(start).Microseconds()
107-
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
108-
record.FunctionTimeout = true
109-
110-
return false, record
111-
}
112-
113-
record.Instance = extractInstanceName(response.GetMessage())
163+
success := i.invoker.Invoke(function, runtimeSpec, conn, record, executionCxt)
114164
record.ResponseTime = time.Since(start).Microseconds()
115-
record.ActualDuration = response.DurationInMicroSec
116-
117-
if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
118-
record.MemoryAllocationTimeout = true
119-
} else {
120-
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
121-
}
122-
123-
logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
124-
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
125165
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)
126-
127-
return true, record
166+
return success, record
128167
}
129168

130169
func extractInstanceName(data string) string {
@@ -135,6 +174,17 @@ func extractInstanceName(data string) string {
135174

136175
return data[indexOfHyphen:]
137176
}
177+
func extractSwarmFunction(data string) string {
178+
index := strings.Index(data, "fn: ")
179+
verticalBarIndex := strings.Index(data, " |")
180+
if index == -1 {
181+
return data
182+
}
183+
if verticalBarIndex == -1 {
184+
return data[index+4:]
185+
}
186+
return data[index+4 : verticalBarIndex]
187+
}
138188

139189
func gRPCConnectionClose(conn *grpc.ClientConn) {
140190
if conn == nil {

pkg/driver/clients/grpc_client_test.go

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,19 @@
2525
package clients
2626

2727
import (
28+
"context"
2829
"fmt"
30+
"github.com/sirupsen/logrus"
31+
"github.com/vhive-serverless/loader/pkg/common"
2932
"github.com/vhive-serverless/loader/pkg/config"
33+
"github.com/vhive-serverless/loader/pkg/workload/standard"
34+
helloworld "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld"
35+
"google.golang.org/grpc"
36+
"google.golang.org/grpc/reflection"
37+
"net"
3038
"os"
3139
"testing"
3240
"time"
33-
34-
"github.com/sirupsen/logrus"
35-
"github.com/vhive-serverless/loader/pkg/common"
36-
"github.com/vhive-serverless/loader/pkg/workload/standard"
3741
)
3842

3943
func createFakeLoaderConfiguration() *config.LoaderConfiguration {
@@ -46,6 +50,17 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration {
4650
GRPCFunctionTimeoutSeconds: 15,
4751
}
4852
}
53+
func createFakeVSwarmLoaderConfiguration() *config.LoaderConfiguration {
54+
return &config.LoaderConfiguration{
55+
Platform: "Knative",
56+
InvokeProtocol: "grpc",
57+
OutputPathPrefix: "test",
58+
EnableZipkinTracing: true,
59+
GRPCConnectionTimeoutSeconds: 5,
60+
GRPCFunctionTimeoutSeconds: 15,
61+
VSwarm: true,
62+
}
63+
}
4964

5065
var testFunction = common.Function{
5166
Name: "test-function",
@@ -56,6 +71,29 @@ var testRuntimeSpecs = common.RuntimeSpecification{
5671
Memory: 128,
5772
}
5873

74+
type vSwarmServer struct {
75+
helloworld.UnimplementedGreeterServer
76+
}
77+
78+
func (s *vSwarmServer) SayHello(_ context.Context, req *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
79+
return &helloworld.HelloReply{
80+
Message: "Reply message",
81+
}, nil
82+
}
83+
84+
func startVSwarmGRPCServer(serverAddress string, serverPort int) {
85+
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", serverAddress, serverPort))
86+
if err != nil {
87+
logrus.Fatalf("failed to listen: %v", err)
88+
}
89+
90+
grpcServer := grpc.NewServer()
91+
92+
reflection.Register(grpcServer) // gRPC Server Reflection is used by gRPC CLI
93+
helloworld.RegisterGreeterServer(grpcServer, &vSwarmServer{})
94+
_ = grpcServer.Serve(lis)
95+
}
96+
5997
func TestGRPCClientWithServerUnreachable(t *testing.T) {
6098
cfg := createFakeLoaderConfiguration()
6199
cfg.EnableZipkinTracing = true
@@ -70,7 +108,24 @@ func TestGRPCClientWithServerUnreachable(t *testing.T) {
70108
success != false ||
71109
record.ConnectionTimeout != true {
72110

73-
t.Error("Error while testing an unreachable server.")
111+
t.Error("Error while testing an unreachable server for trace function.")
112+
}
113+
}
114+
115+
func TestVSwarmClientUnreachable(t *testing.T) {
116+
cfgSwarm := createFakeVSwarmLoaderConfiguration()
117+
118+
vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)
119+
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)
120+
121+
if record.Instance != "" ||
122+
record.RequestedDuration != uint32(testRuntimeSpecs.Runtime*1000) ||
123+
record.StartTime == 0 ||
124+
record.ResponseTime == 0 ||
125+
success != false ||
126+
record.ConnectionTimeout != true {
127+
128+
t.Error("Error while testing an unreachable server for vSwarm function.")
74129
}
75130
}
76131

@@ -98,7 +153,30 @@ func TestGRPCClientWithServerReachable(t *testing.T) {
98153
record.ActualDuration == 0 ||
99154
record.ActualMemoryUsage == 0 {
100155

101-
t.Error("Failed gRPC invocations.")
156+
t.Error("Failed gRPC invocations for trace function.")
157+
}
158+
}
159+
160+
func TestVSwarmClientWithServerReachable(t *testing.T) {
161+
address, port := "localhost", 18081
162+
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)
163+
164+
go startVSwarmGRPCServer(address, port)
165+
time.Sleep(2 * time.Second)
166+
167+
cfgSwarm := createFakeVSwarmLoaderConfiguration()
168+
vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)
169+
170+
start := time.Now()
171+
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)
172+
logrus.Info("Elapsed: ", time.Since(start).Milliseconds(), " ms")
173+
if !success ||
174+
record.MemoryAllocationTimeout != false ||
175+
record.ConnectionTimeout != false ||
176+
record.FunctionTimeout != false ||
177+
record.ResponseTime == 0 {
178+
179+
t.Error("Failed gRPC invocations for vSwarm function.")
102180
}
103181
}
104182

@@ -109,7 +187,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
109187
t.Error(err)
110188
}
111189

112-
address, port := "localhost", 18081
190+
address, port := "localhost", 18082
113191
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)
114192

115193
go standard.StartGRPCServer(address, port, standard.TraceFunction, "")
@@ -118,6 +196,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
118196
time.Sleep(2 * time.Second)
119197

120198
cfg := createFakeLoaderConfiguration()
199+
121200
invoker := CreateInvoker(cfg, nil, nil)
122201

123202
for i := 0; i < 50; i++ {
@@ -131,7 +210,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
131210
record.ActualDuration == 0 ||
132211
record.ActualMemoryUsage == 0 {
133212

134-
t.Error("Failed gRPC invocations.")
213+
t.Error("Failed gRPC invocations for trace function.")
135214
}
136215
}
137216
}

pkg/driver/clients/invoker.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr
1818
return newAWSLambdaInvoker(announceDoneExe)
1919
case "Dirigent", "Dirigent-RPS":
2020
if cfg.InvokeProtocol == "grpc" {
21-
return newGRPCInvoker(cfg)
21+
return newGRPCInvoker(cfg, ExecutorRPC{})
2222
} else {
2323
return newHTTPInvoker(cfg)
2424
}
2525
case "Dirigent-Dandelion", "Dirigent-Dandelion-RPS":
2626
return newHTTPInvoker(cfg)
2727
case "Knative", "Knative-RPS":
2828
if cfg.InvokeProtocol == "grpc" {
29-
return newGRPCInvoker(cfg)
29+
if !cfg.VSwarm {
30+
return newGRPCInvoker(cfg, ExecutorRPC{})
31+
} else {
32+
return newGRPCInvoker(cfg, SayHelloRPC{})
33+
}
3034
} else {
3135
return newHTTPInvoker(cfg)
3236
}

0 commit comments

Comments
 (0)