Skip to content

Commit f3b274d

Browse files
aryans1204aryans
authored andcommitted
Tweaked invoker and added tests
Signed-off-by: aryans1204 <arshar1204@gmail.com> Add VHIVE_REPO and LOADER_REPO to setup script config Signed-off-by: Mohsen Ghasemi <mohsenghasemi8156@gmail.com> Added tests and created new vSwarm invoker Fixed linting Fixed VSwarm Invoker for Knative mode Committer: aryans1204 arshar1204@gmail.com Fixed VSwarm Invoker for Knative mode Added new invoker for vSwarm functions and added tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed formatting under grpc_client.go Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed driver tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed linting and slimmed VSwarm tests Signed-off-by: aryans1204 <arshar1204@gmail.com> Fixed linting errors Signed-off-by: aryans1204 <arshar1204@gmail.com>
1 parent 06e0651 commit f3b274d

File tree

10 files changed

+282
-21
lines changed

10 files changed

+282
-21
lines changed

.github/workflows/unit-tests.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
[
2929
config,
3030
driver,
31+
driver/clients,
3132
generator,
3233
trace,
3334
]

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ require (
1818
require (
1919
github.com/aws/aws-lambda-go v1.47.0
2020
github.com/stretchr/testify v1.10.0
21+
github.com/containerd/log v0.1.0
22+
github.com/google/uuid v1.6.0
23+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a
2124
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a
2225
go.mongodb.org/mongo-driver v1.17.1
2326
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
@@ -36,7 +39,6 @@ require (
3639
github.com/go-logr/stdr v1.2.2 // indirect
3740
github.com/go-pdf/fpdf v0.9.0 // indirect
3841
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
39-
github.com/google/uuid v1.6.0 // indirect
4042
github.com/kr/fs v0.1.0 // indirect
4143
github.com/pkg/sftp v1.13.4 // indirect
4244
github.com/pmezard/go-difflib v1.0.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s
1111
github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
1212
github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY=
1313
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
14+
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
15+
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
1416
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1517
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1618
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -64,6 +66,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
6466
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6567
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
6668
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
69+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a h1:uT20mQeIhHlzRGgUznT7El03WbWfPt6J9xLPflEmx4E=
70+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a/go.mod h1:e19QDifxTHn1xeHS7ZDFZzUW1EWeVmfaiqm0/jEEyUk=
6771
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a h1:Wq/7eNz96WxQWPMEnhg3ai5sZQufCyplAUotEC+j5Kc=
6872
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a/go.mod h1:7PjQe6bDZ5W5cWHTpNeKRobMy9NK0odj6ROXrfa/CLQ=
6973
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

pkg/config/parser.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type LoaderConfiguration struct {
8383
EnableDAGDataset bool `json:"EnableDAGDataset"`
8484
Width int `json:"Width"`
8585
Depth int `json:"Depth"`
86+
VSwarm bool `json:"VSwarm"`
8687
}
8788

8889
func ReadConfigurationFile(path string) LoaderConfiguration {

pkg/driver/clients/grpc_client.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,15 @@ package clients
2626

2727
import (
2828
"context"
29-
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
30-
"strings"
31-
"time"
32-
29+
"github.com/sirupsen/logrus"
3330
"github.com/vhive-serverless/loader/pkg/common"
3431
"github.com/vhive-serverless/loader/pkg/config"
35-
"github.com/vhive-serverless/loader/pkg/workload/proto"
36-
37-
"github.com/sirupsen/logrus"
32+
protoExec "github.com/vhive-serverless/loader/pkg/workload/proto"
33+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3834
"google.golang.org/grpc"
3935
"google.golang.org/grpc/credentials/insecure"
36+
"strings"
37+
"time"
4038

4139
mc "github.com/vhive-serverless/loader/pkg/metric"
4240
)
@@ -68,9 +66,11 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
6866

6967
var dialOptions []grpc.DialOption
7068
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
69+
7170
if strings.Contains(strings.ToLower(i.cfg.Platform), "dirigent") {
7271
dialOptions = append(dialOptions, grpc.WithAuthority(function.Name)) // Dirigent specific
7372
}
73+
7474
if i.cfg.EnableZipkinTracing {
7575
dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
7676
}
@@ -90,26 +90,23 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
9090

9191
record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()
9292

93-
grpcClient := proto.NewExecutorClient(conn)
9493
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
95-
defer cancelExecution()
9694

97-
response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
95+
defer cancelExecution()
96+
grpcClient := protoExec.NewExecutorClient(conn)
97+
response, err := grpcClient.Execute(executionCxt, &protoExec.FaasRequest{
9898
Message: "nothing",
9999
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
100100
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
101101
})
102-
103102
if err != nil {
104103
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
105104

106105
record.ResponseTime = time.Since(start).Microseconds()
107-
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
108106
record.FunctionTimeout = true
109107

110108
return false, record
111109
}
112-
113110
record.Instance = extractInstanceName(response.GetMessage())
114111
record.ResponseTime = time.Since(start).Microseconds()
115112
record.ActualDuration = response.DurationInMicroSec
@@ -119,9 +116,9 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
119116
} else {
120117
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
121118
}
122-
123119
logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
124120
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
121+
125122
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)
126123

127124
return true, record

pkg/driver/clients/grpc_client_test.go

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration {
4646
GRPCFunctionTimeoutSeconds: 15,
4747
}
4848
}
49+
func createFakeVSwarmLoaderConfiguration() *config.LoaderConfiguration {
50+
return &config.LoaderConfiguration{
51+
Platform: "Knative",
52+
InvokeProtocol: "grpc",
53+
OutputPathPrefix: "test",
54+
EnableZipkinTracing: true,
55+
GRPCConnectionTimeoutSeconds: 5,
56+
GRPCFunctionTimeoutSeconds: 15,
57+
VSwarm: true,
58+
}
59+
}
4960

5061
var testFunction = common.Function{
5162
Name: "test-function",
@@ -68,9 +79,26 @@ func TestGRPCClientWithServerUnreachable(t *testing.T) {
6879
record.StartTime == 0 ||
6980
record.ResponseTime == 0 ||
7081
success != false ||
71-
record.ConnectionTimeout != true {
82+
record.FunctionTimeout != true {
83+
84+
t.Error("Error while testing an unreachable server for trace function.")
85+
}
86+
87+
}
88+
func TestVSwarmClientUnreachable(t *testing.T) {
89+
cfgSwarm := createFakeVSwarmLoaderConfiguration()
90+
91+
vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)
92+
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)
93+
94+
if record.Instance != "" ||
95+
record.RequestedDuration != uint32(testRuntimeSpecs.Runtime*1000) ||
96+
record.StartTime == 0 ||
97+
record.ResponseTime == 0 ||
98+
success != false ||
99+
record.FunctionTimeout != true {
72100

73-
t.Error("Error while testing an unreachable server.")
101+
t.Error("Error while testing an unreachable server for vSwarm function.")
74102
}
75103
}
76104

@@ -98,7 +126,32 @@ func TestGRPCClientWithServerReachable(t *testing.T) {
98126
record.ActualDuration == 0 ||
99127
record.ActualMemoryUsage == 0 {
100128

101-
t.Error("Failed gRPC invocations.")
129+
t.Error("Failed gRPC invocations for trace function.")
130+
}
131+
132+
}
133+
func TestVSwarmClientWithServerReachable(t *testing.T) {
134+
address, port := "localhost", 18081
135+
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)
136+
137+
go standard.StartVSwarmGRPCServer(address, port)
138+
time.Sleep(2 * time.Second)
139+
140+
cfgSwarm := createFakeVSwarmLoaderConfiguration()
141+
vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)
142+
143+
start := time.Now()
144+
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)
145+
logrus.Info("Elapsed: ", time.Since(start).Milliseconds(), " ms")
146+
147+
if !success ||
148+
record.MemoryAllocationTimeout != false ||
149+
record.ConnectionTimeout != false ||
150+
record.FunctionTimeout != false ||
151+
record.ResponseTime == 0 ||
152+
record.ActualDuration == 0 {
153+
154+
t.Error("Failed gRPC invocations for vSwarm function.")
102155
}
103156
}
104157

@@ -109,7 +162,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
109162
t.Error(err)
110163
}
111164

112-
address, port := "localhost", 18081
165+
address, port := "localhost", 18082
113166
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)
114167

115168
go standard.StartGRPCServer(address, port, standard.TraceFunction, "")
@@ -118,6 +171,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
118171
time.Sleep(2 * time.Second)
119172

120173
cfg := createFakeLoaderConfiguration()
174+
121175
invoker := CreateInvoker(cfg, nil, nil)
122176

123177
for i := 0; i < 50; i++ {
@@ -131,7 +185,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
131185
record.ActualDuration == 0 ||
132186
record.ActualMemoryUsage == 0 {
133187

134-
t.Error("Failed gRPC invocations.")
188+
t.Error("Failed gRPC invocations for trace function.")
135189
}
136190
}
137191
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package clients
2+
3+
import (
4+
"github.com/vhive-serverless/loader/pkg/config"
5+
"github.com/vhive-serverless/loader/pkg/common"
6+
proto "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld"
7+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
8+
mc "github.com/vhive-serverless/loader/pkg/metric"
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/credentials/insecure"
11+
"github.com/sirupsen/logrus"
12+
"strings"
13+
"time"
14+
"github.com/google/uuid"
15+
"context"
16+
)
17+
18+
type grpcVSwarmInvoker struct {
19+
cfg *config.LoaderConfiguration
20+
}
21+
22+
func newGRPCVSwarmInvoker(cfg *config.LoaderConfiguration) *grpcVSwarmInvoker {
23+
return &grpcVSwarmInvoker{
24+
cfg: cfg,
25+
}
26+
}
27+
28+
func (i *grpcVSwarmInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
29+
logrus.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)
30+
31+
record := &mc.ExecutionRecord{
32+
ExecutionRecordBase: mc.ExecutionRecordBase{
33+
RequestedDuration: uint32(runtimeSpec.Runtime * 1e3),
34+
},
35+
}
36+
37+
////////////////////////////////////
38+
// INVOKE FUNCTION
39+
////////////////////////////////////
40+
start := time.Now()
41+
record.StartTime = start.UnixMicro()
42+
43+
var dialOptions []grpc.DialOption
44+
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
45+
if i.cfg.EnableZipkinTracing {
46+
dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
47+
}
48+
49+
grpcStart := time.Now()
50+
51+
conn, err := grpc.NewClient(function.Endpoint, dialOptions...)
52+
if err != nil {
53+
logrus.Debugf("Failed to establish a gRPC connection - %v\n", err)
54+
55+
record.ResponseTime = time.Since(start).Microseconds()
56+
record.ConnectionTimeout = true
57+
58+
return false, record
59+
}
60+
defer gRPCConnectionClose(conn)
61+
62+
record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()
63+
64+
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
65+
66+
defer cancelExecution()
67+
68+
grpcClient := proto.NewGreeterClient(conn)
69+
response, err := grpcClient.SayHello(executionCxt, &proto.HelloRequest{
70+
Name: "Invoke Relay",
71+
VHiveMetadata: MakeVHiveMetadata(
72+
uuid.New().String(),
73+
uuid.New().String(),
74+
time.Now().UTC(),
75+
),
76+
})
77+
if err != nil {
78+
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
79+
80+
record.ResponseTime = time.Since(start).Microseconds()
81+
record.FunctionTimeout = true
82+
83+
return false, record
84+
}
85+
record.ResponseTime = time.Since(start).Microseconds()
86+
record.ActualDuration = uint32(record.ResponseTime)
87+
record.Instance = extractInstanceName(response.GetMessage())
88+
if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
89+
record.MemoryAllocationTimeout = true
90+
} else {
91+
record.ActualMemoryUsage = common.Kib2Mib(0)
92+
}
93+
94+
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)
95+
96+
return true, record
97+
}

pkg/driver/clients/invoker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr
2626
return newHTTPInvoker(cfg)
2727
case "Knative", "Knative-RPS":
2828
if cfg.InvokeProtocol == "grpc" {
29-
return newGRPCInvoker(cfg)
29+
if !cfg.VSwarm {
30+
return newGRPCInvoker(cfg)
31+
} else {
32+
return newGRPCVSwarmInvoker(cfg)
33+
}
3034
} else {
3135
return newHTTPInvoker(cfg)
3236
}

0 commit comments

Comments
 (0)