Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
[
config,
driver,
driver/clients,
generator,
trace,
]
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ require (
require (
github.com/aws/aws-lambda-go v1.47.0
github.com/stretchr/testify v1.10.0
github.com/containerd/log v0.1.0
github.com/google/uuid v1.6.0
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a
go.mongodb.org/mongo-driver v1.17.1
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
Expand All @@ -36,7 +39,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-pdf/fpdf v0.9.0 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/pkg/sftp v1.13.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s
github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY=
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -64,6 +66,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a h1:uT20mQeIhHlzRGgUznT7El03WbWfPt6J9xLPflEmx4E=
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a/go.mod h1:e19QDifxTHn1xeHS7ZDFZzUW1EWeVmfaiqm0/jEEyUk=
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a h1:Wq/7eNz96WxQWPMEnhg3ai5sZQufCyplAUotEC+j5Kc=
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a/go.mod h1:7PjQe6bDZ5W5cWHTpNeKRobMy9NK0odj6ROXrfa/CLQ=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
1 change: 1 addition & 0 deletions pkg/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type LoaderConfiguration struct {
EnableDAGDataset bool `json:"EnableDAGDataset"`
Width int `json:"Width"`
Depth int `json:"Depth"`
VSwarm bool `json:"VSwarm"`
}

func ReadConfigurationFile(path string) LoaderConfiguration {
Expand Down
132 changes: 91 additions & 41 deletions pkg/driver/clients/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,97 @@ package clients

import (
"context"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"strings"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/vhive-serverless/loader/pkg/common"
"github.com/vhive-serverless/loader/pkg/config"
"github.com/vhive-serverless/loader/pkg/workload/proto"

"github.com/sirupsen/logrus"
helloworld "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strings"
"time"

mc "github.com/vhive-serverless/loader/pkg/metric"
)

type invoker interface {
Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool
}

type ExecutorRPC struct {
}

func (i ExecutorRPC) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool {
grpcClient := proto.NewExecutorClient(conn)

response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
Message: "nothing",
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
})

if err != nil {
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)

record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
record.FunctionTimeout = true

return false
}

record.Instance = extractInstanceName(response.GetMessage())
record.ActualDuration = response.DurationInMicroSec

if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
record.MemoryAllocationTimeout = true
} else {
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
}

logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))

return true
}

type SayHelloRPC struct {
}

func (i SayHelloRPC) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool {
grpcClient := helloworld.NewGreeterClient(conn)
response, err := grpcClient.SayHello(executionCxt, &helloworld.HelloRequest{
Name: "Invoke Relay",
VHiveMetadata: MakeVHiveMetadata(
uuid.New().String(),
uuid.New().String(),
time.Now().UTC(),
),
})
if err != nil {
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)
record.ConnectionTimeout = true
record.FunctionTimeout = true

return false
}
record.ActualDuration = 0
record.Instance = extractSwarmFunction(response.GetMessage())
record.ActualMemoryUsage = common.Kib2Mib(0) //Memory usage may not be available for all vSwarm benchmarks

return true
}

type grpcInvoker struct {
cfg *config.LoaderConfiguration
cfg *config.LoaderConfiguration
invoker invoker
}

func newGRPCInvoker(cfg *config.LoaderConfiguration) *grpcInvoker {
func newGRPCInvoker(cfg *config.LoaderConfiguration, invoker invoker) *grpcInvoker {
return &grpcInvoker{
cfg: cfg,
cfg: cfg,
invoker: invoker,
}
}

Expand Down Expand Up @@ -89,42 +158,12 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
defer gRPCConnectionClose(conn)

record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()

grpcClient := proto.NewExecutorClient(conn)
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
defer cancelExecution()

response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
Message: "nothing",
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
})

if err != nil {
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)

record.ResponseTime = time.Since(start).Microseconds()
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
record.FunctionTimeout = true

return false, record
}

record.Instance = extractInstanceName(response.GetMessage())
success := i.invoker.Invoke(function, runtimeSpec, conn, record, executionCxt)
record.ResponseTime = time.Since(start).Microseconds()
record.ActualDuration = response.DurationInMicroSec

if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
record.MemoryAllocationTimeout = true
} else {
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
}

logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))
logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)

return true, record
return success, record
}

func extractInstanceName(data string) string {
Expand All @@ -135,6 +174,17 @@ func extractInstanceName(data string) string {

return data[indexOfHyphen:]
}
func extractSwarmFunction(data string) string {
index := strings.Index(data, "fn: ")
verticalBarIndex := strings.Index(data, " |")
if index == -1 {
return data
}
if verticalBarIndex == -1 {
return data[index+4:]
}
return data[index+4 : verticalBarIndex]
}

func gRPCConnectionClose(conn *grpc.ClientConn) {
if conn == nil {
Expand Down
95 changes: 87 additions & 8 deletions pkg/driver/clients/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
package clients

import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/vhive-serverless/loader/pkg/common"
"github.com/vhive-serverless/loader/pkg/config"
"github.com/vhive-serverless/loader/pkg/workload/standard"
helloworld "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"net"
"os"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/vhive-serverless/loader/pkg/common"
"github.com/vhive-serverless/loader/pkg/workload/standard"
)

func createFakeLoaderConfiguration() *config.LoaderConfiguration {
Expand All @@ -46,6 +50,17 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration {
GRPCFunctionTimeoutSeconds: 15,
}
}
func createFakeVSwarmLoaderConfiguration() *config.LoaderConfiguration {
return &config.LoaderConfiguration{
Platform: "Knative",
InvokeProtocol: "grpc",
OutputPathPrefix: "test",
EnableZipkinTracing: true,
GRPCConnectionTimeoutSeconds: 5,
GRPCFunctionTimeoutSeconds: 15,
VSwarm: true,
}
}

var testFunction = common.Function{
Name: "test-function",
Expand All @@ -56,6 +71,29 @@ var testRuntimeSpecs = common.RuntimeSpecification{
Memory: 128,
}

type vSwarmServer struct {
helloworld.UnimplementedGreeterServer
}

func (s *vSwarmServer) SayHello(_ context.Context, req *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
return &helloworld.HelloReply{
Message: "Reply message",
}, nil
}

func startVSwarmGRPCServer(serverAddress string, serverPort int) {
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", serverAddress, serverPort))
if err != nil {
logrus.Fatalf("failed to listen: %v", err)
}

grpcServer := grpc.NewServer()

reflection.Register(grpcServer) // gRPC Server Reflection is used by gRPC CLI
helloworld.RegisterGreeterServer(grpcServer, &vSwarmServer{})
_ = grpcServer.Serve(lis)
}

func TestGRPCClientWithServerUnreachable(t *testing.T) {
cfg := createFakeLoaderConfiguration()
cfg.EnableZipkinTracing = true
Expand All @@ -70,7 +108,24 @@ func TestGRPCClientWithServerUnreachable(t *testing.T) {
success != false ||
record.ConnectionTimeout != true {

t.Error("Error while testing an unreachable server.")
t.Error("Error while testing an unreachable server for trace function.")
}
}

func TestVSwarmClientUnreachable(t *testing.T) {
cfgSwarm := createFakeVSwarmLoaderConfiguration()

vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)

if record.Instance != "" ||
record.RequestedDuration != uint32(testRuntimeSpecs.Runtime*1000) ||
record.StartTime == 0 ||
record.ResponseTime == 0 ||
success != false ||
record.ConnectionTimeout != true {

t.Error("Error while testing an unreachable server for vSwarm function.")
}
}

Expand Down Expand Up @@ -98,7 +153,30 @@ func TestGRPCClientWithServerReachable(t *testing.T) {
record.ActualDuration == 0 ||
record.ActualMemoryUsage == 0 {

t.Error("Failed gRPC invocations.")
t.Error("Failed gRPC invocations for trace function.")
}
}

func TestVSwarmClientWithServerReachable(t *testing.T) {
address, port := "localhost", 18081
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)

go startVSwarmGRPCServer(address, port)
time.Sleep(2 * time.Second)

cfgSwarm := createFakeVSwarmLoaderConfiguration()
vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)

start := time.Now()
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)
logrus.Info("Elapsed: ", time.Since(start).Milliseconds(), " ms")
if !success ||
record.MemoryAllocationTimeout != false ||
record.ConnectionTimeout != false ||
record.FunctionTimeout != false ||
record.ResponseTime == 0 {

t.Error("Failed gRPC invocations for vSwarm function.")
}
}

Expand All @@ -109,7 +187,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
t.Error(err)
}

address, port := "localhost", 18081
address, port := "localhost", 18082
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)

go standard.StartGRPCServer(address, port, standard.TraceFunction, "")
Expand All @@ -118,6 +196,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
time.Sleep(2 * time.Second)

cfg := createFakeLoaderConfiguration()

invoker := CreateInvoker(cfg, nil, nil)

for i := 0; i < 50; i++ {
Expand All @@ -131,7 +210,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
record.ActualDuration == 0 ||
record.ActualMemoryUsage == 0 {

t.Error("Failed gRPC invocations.")
t.Error("Failed gRPC invocations for trace function.")
}
}
}
8 changes: 6 additions & 2 deletions pkg/driver/clients/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr
return newAWSLambdaInvoker(announceDoneExe)
case "Dirigent":
if cfg.InvokeProtocol == "grpc" {
return newGRPCInvoker(cfg)
return newGRPCInvoker(cfg, ExecutorRPC{})
} else {
return newHTTPInvoker(cfg)
}
case "Dirigent-Dandelion":
return newHTTPInvoker(cfg)
case "Knative":
if cfg.InvokeProtocol == "grpc" {
return newGRPCInvoker(cfg)
if !cfg.VSwarm {
return newGRPCInvoker(cfg, ExecutorRPC{})
} else {
return newGRPCInvoker(cfg, SayHelloRPC{})
}
} else {
return newHTTPInvoker(cfg)
}
Expand Down
Loading
Loading