diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index a58eeb8ea..684e871af 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -28,6 +28,7 @@ jobs: [ config, driver, + driver/clients, generator, trace, ] diff --git a/go.mod b/go.mod index e836f8a84..510c2a7e5 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,9 @@ require ( require ( github.com/aws/aws-lambda-go v1.47.0 github.com/stretchr/testify v1.10.0 + github.com/containerd/log v0.1.0 + github.com/google/uuid v1.6.0 + github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a go.mongodb.org/mongo-driver v1.17.1 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c @@ -36,7 +39,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-pdf/fpdf v0.9.0 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/kr/fs v0.1.0 // indirect github.com/pkg/sftp v1.13.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 4e0fb6e99..5dc389142 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A= github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY= github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -64,6 +66,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a h1:uT20mQeIhHlzRGgUznT7El03WbWfPt6J9xLPflEmx4E= +github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a/go.mod h1:e19QDifxTHn1xeHS7ZDFZzUW1EWeVmfaiqm0/jEEyUk= github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a h1:Wq/7eNz96WxQWPMEnhg3ai5sZQufCyplAUotEC+j5Kc= github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a/go.mod h1:7PjQe6bDZ5W5cWHTpNeKRobMy9NK0odj6ROXrfa/CLQ= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/config/parser.go b/pkg/config/parser.go index 060849bea..277343b7c 100644 --- a/pkg/config/parser.go +++ b/pkg/config/parser.go @@ -83,6 +83,7 @@ type LoaderConfiguration struct { EnableDAGDataset bool `json:"EnableDAGDataset"` Width int `json:"Width"` Depth int `json:"Depth"` + VSwarm bool `json:"VSwarm"` } func ReadConfigurationFile(path string) LoaderConfiguration { diff --git a/pkg/driver/clients/grpc_client.go b/pkg/driver/clients/grpc_client.go index ae983625b..936a84504 100644 --- a/pkg/driver/clients/grpc_client.go +++ b/pkg/driver/clients/grpc_client.go @@ -26,28 +26,97 @@ package clients import ( "context" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "strings" - "time" - + "github.com/google/uuid" + "github.com/sirupsen/logrus" "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/config" "github.com/vhive-serverless/loader/pkg/workload/proto" - - "github.com/sirupsen/logrus" + helloworld "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "strings" + "time" mc "github.com/vhive-serverless/loader/pkg/metric" ) +type invoker interface { + Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool +} + +type ExecutorRPC struct { +} + +func (i ExecutorRPC) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool { + grpcClient := proto.NewExecutorClient(conn) + + response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{ + Message: "nothing", + RuntimeInMilliSec: uint32(runtimeSpec.Runtime), + MemoryInMebiBytes: uint32(runtimeSpec.Memory), + }) + + if err != nil { + logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err) + + record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface + record.FunctionTimeout = true + + return false + } + + record.Instance = extractInstanceName(response.GetMessage()) + record.ActualDuration = response.DurationInMicroSec + + if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") { + record.MemoryAllocationTimeout = true + } else { + record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb) + } + + logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message, + float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb)) + + return true +} + +type SayHelloRPC struct { +} + +func (i SayHelloRPC) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification, conn *grpc.ClientConn, record *mc.ExecutionRecord, executionCxt context.Context) bool { + grpcClient := helloworld.NewGreeterClient(conn) + response, err := grpcClient.SayHello(executionCxt, &helloworld.HelloRequest{ + Name: "Invoke Relay", + VHiveMetadata: MakeVHiveMetadata( + uuid.New().String(), + uuid.New().String(), + time.Now().UTC(), + ), + }) + if err != nil { + logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err) + record.ConnectionTimeout = true + record.FunctionTimeout = true + + return false + } + record.ActualDuration = 0 + record.Instance = extractSwarmFunction(response.GetMessage()) + record.ActualMemoryUsage = common.Kib2Mib(0) //Memory usage may not be available for all vSwarm benchmarks + + return true +} + type grpcInvoker struct { - cfg *config.LoaderConfiguration + cfg *config.LoaderConfiguration + invoker invoker } -func newGRPCInvoker(cfg *config.LoaderConfiguration) *grpcInvoker { +func newGRPCInvoker(cfg *config.LoaderConfiguration, invoker invoker) *grpcInvoker { return &grpcInvoker{ - cfg: cfg, + cfg: cfg, + invoker: invoker, } } @@ -89,42 +158,12 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt defer gRPCConnectionClose(conn) record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds() - - grpcClient := proto.NewExecutorClient(conn) executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second) defer cancelExecution() - - response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{ - Message: "nothing", - RuntimeInMilliSec: uint32(runtimeSpec.Runtime), - MemoryInMebiBytes: uint32(runtimeSpec.Memory), - }) - - if err != nil { - logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err) - - record.ResponseTime = time.Since(start).Microseconds() - record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface - record.FunctionTimeout = true - - return false, record - } - - record.Instance = extractInstanceName(response.GetMessage()) + success := i.invoker.Invoke(function, runtimeSpec, conn, record, executionCxt) record.ResponseTime = time.Since(start).Microseconds() - record.ActualDuration = response.DurationInMicroSec - - if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") { - record.MemoryAllocationTimeout = true - } else { - record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb) - } - - logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message, - float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb)) logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3) - - return true, record + return success, record } func extractInstanceName(data string) string { @@ -135,6 +174,17 @@ func extractInstanceName(data string) string { return data[indexOfHyphen:] } +func extractSwarmFunction(data string) string { + index := strings.Index(data, "fn: ") + verticalBarIndex := strings.Index(data, " |") + if index == -1 { + return data + } + if verticalBarIndex == -1 { + return data[index+4:] + } + return data[index+4 : verticalBarIndex] +} func gRPCConnectionClose(conn *grpc.ClientConn) { if conn == nil { diff --git a/pkg/driver/clients/grpc_client_test.go b/pkg/driver/clients/grpc_client_test.go index c755e1ff5..370591fbf 100644 --- a/pkg/driver/clients/grpc_client_test.go +++ b/pkg/driver/clients/grpc_client_test.go @@ -25,15 +25,19 @@ package clients import ( + "context" "fmt" + "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/config" + "github.com/vhive-serverless/loader/pkg/workload/standard" + helloworld "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "net" "os" "testing" "time" - - "github.com/sirupsen/logrus" - "github.com/vhive-serverless/loader/pkg/common" - "github.com/vhive-serverless/loader/pkg/workload/standard" ) func createFakeLoaderConfiguration() *config.LoaderConfiguration { @@ -46,6 +50,17 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration { GRPCFunctionTimeoutSeconds: 15, } } +func createFakeVSwarmLoaderConfiguration() *config.LoaderConfiguration { + return &config.LoaderConfiguration{ + Platform: "Knative", + InvokeProtocol: "grpc", + OutputPathPrefix: "test", + EnableZipkinTracing: true, + GRPCConnectionTimeoutSeconds: 5, + GRPCFunctionTimeoutSeconds: 15, + VSwarm: true, + } +} var testFunction = common.Function{ Name: "test-function", @@ -56,6 +71,29 @@ var testRuntimeSpecs = common.RuntimeSpecification{ Memory: 128, } +type vSwarmServer struct { + helloworld.UnimplementedGreeterServer +} + +func (s *vSwarmServer) SayHello(_ context.Context, req *helloworld.HelloRequest) (*helloworld.HelloReply, error) { + return &helloworld.HelloReply{ + Message: "Reply message", + }, nil +} + +func startVSwarmGRPCServer(serverAddress string, serverPort int) { + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", serverAddress, serverPort)) + if err != nil { + logrus.Fatalf("failed to listen: %v", err) + } + + grpcServer := grpc.NewServer() + + reflection.Register(grpcServer) // gRPC Server Reflection is used by gRPC CLI + helloworld.RegisterGreeterServer(grpcServer, &vSwarmServer{}) + _ = grpcServer.Serve(lis) +} + func TestGRPCClientWithServerUnreachable(t *testing.T) { cfg := createFakeLoaderConfiguration() cfg.EnableZipkinTracing = true @@ -70,7 +108,24 @@ func TestGRPCClientWithServerUnreachable(t *testing.T) { success != false || record.ConnectionTimeout != true { - t.Error("Error while testing an unreachable server.") + t.Error("Error while testing an unreachable server for trace function.") + } +} + +func TestVSwarmClientUnreachable(t *testing.T) { + cfgSwarm := createFakeVSwarmLoaderConfiguration() + + vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil) + success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs) + + if record.Instance != "" || + record.RequestedDuration != uint32(testRuntimeSpecs.Runtime*1000) || + record.StartTime == 0 || + record.ResponseTime == 0 || + success != false || + record.ConnectionTimeout != true { + + t.Error("Error while testing an unreachable server for vSwarm function.") } } @@ -98,7 +153,30 @@ func TestGRPCClientWithServerReachable(t *testing.T) { record.ActualDuration == 0 || record.ActualMemoryUsage == 0 { - t.Error("Failed gRPC invocations.") + t.Error("Failed gRPC invocations for trace function.") + } +} + +func TestVSwarmClientWithServerReachable(t *testing.T) { + address, port := "localhost", 18081 + testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port) + + go startVSwarmGRPCServer(address, port) + time.Sleep(2 * time.Second) + + cfgSwarm := createFakeVSwarmLoaderConfiguration() + vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil) + + start := time.Now() + success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs) + logrus.Info("Elapsed: ", time.Since(start).Milliseconds(), " ms") + if !success || + record.MemoryAllocationTimeout != false || + record.ConnectionTimeout != false || + record.FunctionTimeout != false || + record.ResponseTime == 0 { + + t.Error("Failed gRPC invocations for vSwarm function.") } } @@ -109,7 +187,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) { t.Error(err) } - address, port := "localhost", 18081 + address, port := "localhost", 18082 testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port) go standard.StartGRPCServer(address, port, standard.TraceFunction, "") @@ -118,6 +196,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) { time.Sleep(2 * time.Second) cfg := createFakeLoaderConfiguration() + invoker := CreateInvoker(cfg, nil, nil) for i := 0; i < 50; i++ { @@ -131,7 +210,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) { record.ActualDuration == 0 || record.ActualMemoryUsage == 0 { - t.Error("Failed gRPC invocations.") + t.Error("Failed gRPC invocations for trace function.") } } } diff --git a/pkg/driver/clients/invoker.go b/pkg/driver/clients/invoker.go index 8c4718b32..51918f862 100644 --- a/pkg/driver/clients/invoker.go +++ b/pkg/driver/clients/invoker.go @@ -19,7 +19,7 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr return newAWSLambdaInvoker(announceDoneExe) case "Dirigent": if cfg.InvokeProtocol == "grpc" { - return newGRPCInvoker(cfg) + return newGRPCInvoker(cfg, ExecutorRPC{}) } else { return newHTTPInvoker(cfg) } @@ -27,7 +27,11 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr return newHTTPInvoker(cfg) case "Knative": if cfg.InvokeProtocol == "grpc" { - return newGRPCInvoker(cfg) + if !cfg.VSwarm { + return newGRPCInvoker(cfg, ExecutorRPC{}) + } else { + return newGRPCInvoker(cfg, SayHelloRPC{}) + } } else { return newHTTPInvoker(cfg) } diff --git a/pkg/driver/clients/vhivemetadata.go b/pkg/driver/clients/vhivemetadata.go new file mode 100644 index 000000000..90f527aba --- /dev/null +++ b/pkg/driver/clients/vhivemetadata.go @@ -0,0 +1,58 @@ +package clients + +import ( + "encoding/json" + "time" + + ctrdlog "github.com/containerd/log" + "github.com/sirupsen/logrus" +) + +type vHiveMetadata struct { + WorkflowId string `json:"WorkflowId"` + InvocationId string `json:"InvocationId"` + InvokedOn time.Time `json:"InvokedOn"` +} + +func GetWorkflowId(d []byte) string { + return unmarshalVHiveMetadata(d).WorkflowId +} + +func GetInvocationId(d []byte) string { + return unmarshalVHiveMetadata(d).InvocationId +} + +func GetInvokedOn(d []byte) time.Time { + return unmarshalVHiveMetadata(d).InvokedOn +} + +func unmarshalVHiveMetadata(d []byte) (vhm vHiveMetadata) { + if err := json.Unmarshal(d, &vhm); err != nil { + logrus.Fatal("failed to unmarshal vhivemetadata", err) + } + return +} + +func MakeVHiveMetadata(WorkflowId, InvocationId string, InvokedOn time.Time) []byte { + return marshalVHiveMetadata(vHiveMetadata{ + WorkflowId: WorkflowId, + InvocationId: InvocationId, + InvokedOn: InvokedOn, + }) +} + +func marshalVHiveMetadata(vhm vHiveMetadata) []byte { + d, err := json.Marshal(struct { + WorkflowId string `json:"WorkflowId"` + InvocationId string `json:"InvocationId"` + InvokedOn string `json:"InvokedOn"` + }{ + WorkflowId: vhm.WorkflowId, + InvocationId: vhm.InvocationId, + InvokedOn: vhm.InvokedOn.Format(ctrdlog.RFC3339NanoFixed), + }) + if err != nil { + logrus.Fatal("failed to marshal vHiveMetadata", err) + } + return d +} \ No newline at end of file