Skip to content

Commit 4c372fb

Browse files
committed
Tweaked invoker and added tests
Signed-off-by: aryans1204 <arshar1204@gmail.com> Add VHIVE_REPO and LOADER_REPO to setup script config Signed-off-by: Mohsen Ghasemi <mohsenghasemi8156@gmail.com> Added tests and created new vSwarm invoker Fixed linting Fixed VSwarm Invoker for Knative mode Committer: aryans1204 arshar1204@gmail.com Fixed VSwarm Invoker for Knative mode
1 parent cca2d40 commit 4c372fb

File tree

12 files changed

+412
-137
lines changed

12 files changed

+412
-137
lines changed

.github/workflows/unit-tests.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
[
2929
config,
3030
driver,
31+
driver/clients,
3132
generator,
3233
trace,
3334
]

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ require (
3939
github.com/kr/fs v0.1.0 // indirect
4040
github.com/pkg/sftp v1.13.4 // indirect
4141
github.com/pmezard/go-difflib v1.0.0 // indirect
42+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a // indirect
4243
go.opentelemetry.io/otel/exporters/zipkin v1.28.0 // indirect
4344
go.opentelemetry.io/otel/metric v1.28.0 // indirect
4445
golang.org/x/crypto v0.29.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
6464
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6565
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
6666
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
67+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a h1:uT20mQeIhHlzRGgUznT7El03WbWfPt6J9xLPflEmx4E=
68+
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a/go.mod h1:e19QDifxTHn1xeHS7ZDFZzUW1EWeVmfaiqm0/jEEyUk=
6769
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a h1:Wq/7eNz96WxQWPMEnhg3ai5sZQufCyplAUotEC+j5Kc=
6870
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a/go.mod h1:7PjQe6bDZ5W5cWHTpNeKRobMy9NK0odj6ROXrfa/CLQ=
6971
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

pkg/config/parser.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type LoaderConfiguration struct {
8080
GRPCConnectionTimeoutSeconds int `json:"GRPCConnectionTimeoutSeconds"`
8181
GRPCFunctionTimeoutSeconds int `json:"GRPCFunctionTimeoutSeconds"`
8282
DAGMode bool `json:"DAGMode"`
83+
VSwarm bool `json:VSwarm`
8384
}
8485

8586
func ReadConfigurationFile(path string) LoaderConfiguration {

pkg/driver/clients/grpc_client.go

Lines changed: 115 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -22,126 +22,118 @@
2222
* SOFTWARE.
2323
*/
2424

25-
package clients
26-
27-
import (
28-
"context"
29-
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
30-
"strings"
31-
"time"
32-
33-
"github.com/vhive-serverless/loader/pkg/common"
34-
"github.com/vhive-serverless/loader/pkg/config"
35-
"github.com/vhive-serverless/loader/pkg/workload/proto"
36-
37-
"github.com/sirupsen/logrus"
38-
"google.golang.org/grpc"
39-
"google.golang.org/grpc/credentials/insecure"
40-
41-
mc "github.com/vhive-serverless/loader/pkg/metric"
42-
)
43-
44-
type grpcInvoker struct {
45-
cfg *config.LoaderConfiguration
46-
}
47-
48-
func newGRPCInvoker(cfg *config.LoaderConfiguration) *grpcInvoker {
49-
return &grpcInvoker{
50-
cfg: cfg,
51-
}
52-
}
53-
54-
func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
55-
logrus.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)
56-
57-
record := &mc.ExecutionRecord{
58-
ExecutionRecordBase: mc.ExecutionRecordBase{
59-
RequestedDuration: uint32(runtimeSpec.Runtime * 1e3),
60-
},
61-
}
62-
63-
////////////////////////////////////
64-
// INVOKE FUNCTION
65-
////////////////////////////////////
66-
start := time.Now()
67-
record.StartTime = start.UnixMicro()
68-
69-
var dialOptions []grpc.DialOption
70-
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
71-
if strings.Contains(strings.ToLower(i.cfg.Platform), "dirigent") {
72-
dialOptions = append(dialOptions, grpc.WithAuthority(function.Name)) // Dirigent specific
73-
}
74-
if i.cfg.EnableZipkinTracing {
75-
dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
76-
}
77-
78-
grpcStart := time.Now()
79-
80-
conn, err := grpc.NewClient(function.Endpoint, dialOptions...)
81-
if err != nil {
82-
logrus.Debugf("Failed to establish a gRPC connection - %v\n", err)
83-
84-
record.ResponseTime = time.Since(start).Microseconds()
85-
record.ConnectionTimeout = true
86-
87-
return false, record
88-
}
89-
defer gRPCConnectionClose(conn)
90-
91-
record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()
92-
93-
grpcClient := proto.NewExecutorClient(conn)
94-
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
95-
defer cancelExecution()
96-
97-
response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
98-
Message: "nothing",
99-
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
100-
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
101-
})
102-
103-
if err != nil {
104-
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
105-
106-
record.ResponseTime = time.Since(start).Microseconds()
107-
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
108-
record.FunctionTimeout = true
109-
110-
return false, record
111-
}
112-
113-
record.Instance = extractInstanceName(response.GetMessage())
114-
record.ResponseTime = time.Since(start).Microseconds()
115-
record.ActualDuration = response.DurationInMicroSec
116-
117-
if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
118-
record.MemoryAllocationTimeout = true
119-
} else {
120-
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
121-
}
122-
123-
logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
124-
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
125-
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)
126-
127-
return true, record
128-
}
129-
130-
func extractInstanceName(data string) string {
131-
indexOfHyphen := strings.LastIndex(data, common.FunctionNamePrefix)
132-
if indexOfHyphen == -1 {
133-
return data
134-
}
135-
136-
return data[indexOfHyphen:]
137-
}
138-
139-
func gRPCConnectionClose(conn *grpc.ClientConn) {
140-
if conn == nil {
141-
return
142-
}
143-
144-
if err := conn.Close(); err != nil {
145-
logrus.Warnf("Error while closing gRPC connection - %s\n", err)
146-
}
147-
}
25+
package clients
26+
27+
import (
28+
"context"
29+
"github.com/sirupsen/logrus"
30+
"github.com/vhive-serverless/loader/pkg/common"
31+
"github.com/vhive-serverless/loader/pkg/config"
32+
protoExec "github.com/vhive-serverless/loader/pkg/workload/proto"
33+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
34+
"google.golang.org/grpc"
35+
"google.golang.org/grpc/credentials/insecure"
36+
"strings"
37+
"time"
38+
39+
mc "github.com/vhive-serverless/loader/pkg/metric"
40+
)
41+
42+
type grpcInvoker struct {
43+
cfg *config.LoaderConfiguration
44+
}
45+
46+
func newGRPCInvoker(cfg *config.LoaderConfiguration) *grpcInvoker {
47+
return &grpcInvoker{
48+
cfg: cfg,
49+
}
50+
}
51+
52+
func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
53+
logrus.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)
54+
55+
record := &mc.ExecutionRecord{
56+
ExecutionRecordBase: mc.ExecutionRecordBase{
57+
RequestedDuration: uint32(runtimeSpec.Runtime * 1e3),
58+
},
59+
}
60+
61+
////////////////////////////////////
62+
// INVOKE FUNCTION
63+
////////////////////////////////////
64+
start := time.Now()
65+
record.StartTime = start.UnixMicro()
66+
67+
var dialOptions []grpc.DialOption
68+
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
69+
if i.cfg.EnableZipkinTracing {
70+
dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
71+
}
72+
73+
grpcStart := time.Now()
74+
75+
conn, err := grpc.NewClient(function.Endpoint, dialOptions...)
76+
if err != nil {
77+
logrus.Debugf("Failed to establish a gRPC connection - %v\n", err)
78+
79+
record.ResponseTime = time.Since(start).Microseconds()
80+
record.ConnectionTimeout = true
81+
82+
return false, record
83+
}
84+
defer gRPCConnectionClose(conn)
85+
86+
record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()
87+
88+
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
89+
90+
defer cancelExecution()
91+
grpcClient := protoExec.NewExecutorClient(conn)
92+
response, err := grpcClient.Execute(executionCxt, &protoExec.FaasRequest{
93+
Message: "nothing",
94+
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
95+
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
96+
})
97+
if err != nil {
98+
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
99+
100+
record.ResponseTime = time.Since(start).Microseconds()
101+
record.FunctionTimeout = true
102+
103+
return false, record
104+
}
105+
record.Instance = extractInstanceName(response.GetMessage())
106+
record.ResponseTime = time.Since(start).Microseconds()
107+
record.ActualDuration = response.DurationInMicroSec
108+
109+
if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
110+
record.MemoryAllocationTimeout = true
111+
} else {
112+
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
113+
}
114+
logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
115+
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
116+
117+
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)
118+
119+
return true, record
120+
}
121+
122+
func extractInstanceName(data string) string {
123+
indexOfHyphen := strings.LastIndex(data, common.FunctionNamePrefix)
124+
if indexOfHyphen == -1 {
125+
return data
126+
}
127+
128+
return data[indexOfHyphen:]
129+
}
130+
131+
func gRPCConnectionClose(conn *grpc.ClientConn) {
132+
if conn == nil {
133+
return
134+
}
135+
136+
if err := conn.Close(); err != nil {
137+
logrus.Warnf("Error while closing gRPC connection - %s\n", err)
138+
}
139+
}

0 commit comments

Comments
 (0)