Skip to content

Commit aa3d6dd

Browse files
committed
Added RPS support for dirigent-dandelion-workflows
Signed-off-by: Tobias Stocker <tstocker@student.ethz.ch>
1 parent 197810b commit aa3d6dd

File tree

11 files changed

+426
-48
lines changed

11 files changed

+426
-48
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
"Seed": 42,
3+
4+
"Platform": "Dirigent-Dandelion-Workflow",
5+
"InvokeProtocol" : "http1",
6+
"EndpointPort": 80,
7+
8+
"DirigentControlPlaneIP": "10.0.1.253:9091",
9+
"BusyLoopOnSandboxStartup": false,
10+
11+
"AsyncMode": false,
12+
"AsyncResponseURL": "10.0.1.253:8082",
13+
"AsyncWaitToCollectMin": 1,
14+
15+
"RpsTarget": 1,
16+
"RpsColdStartRatioPercentage": 0,
17+
"RpsCooldownSeconds": 10,
18+
"RpsImage": "workloads/dandelion_workflows/basic_chain.txt",
19+
"RpsRuntimeMs": 10,
20+
"RpsMemoryMB": 2048,
21+
"RpsIterationMultiplier": 80,
22+
23+
"TracePath": "RPS",
24+
"Granularity": "minute",
25+
"OutputPathPrefix": "data/out/experiment",
26+
"IATDistribution": "equidistant",
27+
"CPULimit": "1vCPU",
28+
"ExperimentDuration": 1,
29+
"WarmupDuration": 0,
30+
"PrepullMode": "",
31+
32+
"IsPartiallyPanic": false,
33+
"EnableZipkinTracing": false,
34+
"EnableMetricsScrapping": false,
35+
"MetricScrapingPeriodSeconds": 15,
36+
"AutoscalingMetric": "concurrency",
37+
38+
"GRPCConnectionTimeoutSeconds": 5,
39+
"GRPCFunctionTimeoutSeconds": 900,
40+
41+
"WorkflowConfigPath": "workloads/dandelion_workflows/config_basic_chain.json"
42+
}

cmd/loader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func main() {
9696
"AWSLambda",
9797
"Dirigent",
9898
"Dirigent-Dandelion",
99+
"Dirigent-Dandelion-Workflow",
99100
}
100101

101102
if !slices.Contains(supportedPlatforms, cfg.Platform) {
@@ -151,7 +152,8 @@ func parseYAMLSpecification(cfg *config.LoaderConfiguration) string {
151152
case "firecracker":
152153
return "workloads/firecracker/trace_func_go.yaml"
153154
default:
154-
if cfg.Platform != "Dirigent" && cfg.Platform != "Dirigent-Dandelion" {
155+
if cfg.Platform != "Dirigent" && cfg.Platform != "Dirigent-Dandelion" &&
156+
cfg.Platform != "Dirigent-Dandelion-Workflow" {
155157
log.Fatal("Invalid 'YAMLSelector' parameter.")
156158
}
157159
}

pkg/common/trace_types.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ type DirigentMetadata struct {
8383
IOPercentage int `json:"IOPercentage"`
8484
EnvVars []string `json:"EnvVars"`
8585
ProgramArgs []string `json:"ProgramArgs"`
86+
87+
// dandelion only
88+
NumArgs int `json:"NumArgs"`
89+
NumRets int `json:"NumRets"`
90+
}
91+
92+
type WorkflowMetadata struct {
93+
InvocationRequest string
8694
}
8795

8896
type Function struct {
@@ -104,6 +112,9 @@ type Function struct {
104112
CPULimitsMilli int
105113

106114
Specification *FunctionSpecification
115+
116+
// used only for dirigent workflows
117+
WorkflowMetadata *WorkflowMetadata
107118
}
108119

109120
type Node struct {

pkg/config/parser.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,25 @@ type LoaderConfiguration struct {
8484
Width int `json:"Width"`
8585
Depth int `json:"Depth"`
8686
VSwarm bool `json:"VSwarm"`
87+
88+
// used only for dirigent-dandelion workflows
89+
WorkflowConfigPath string `json:"WorkflowConfigPath"`
90+
}
91+
92+
type WorkflowFunction struct {
93+
FunctionName string `json:"FunctionName"`
94+
FunctionPath string `json:"FunctionPath"`
95+
NumArgs int `json:"NumArgs"`
96+
NumRets int `json:"NumRets"`
97+
}
98+
type CompositionConfig struct {
99+
Name string `json:"Name"`
100+
InDataPaths [][]string `json:"InDataPaths"`
101+
}
102+
type WorkflowConfig struct {
103+
Name string `json:"Name"`
104+
Functions []WorkflowFunction `json:"Functions"`
105+
Compositions []CompositionConfig `json:"Compositions"`
87106
}
88107

89108
func ReadConfigurationFile(path string) LoaderConfiguration {
@@ -118,3 +137,18 @@ func ReadFailureConfiguration(path string) *FailureConfiguration {
118137

119138
return &config
120139
}
140+
141+
func ReadWorkflowConfig(path string) WorkflowConfig {
142+
byteValue, err := os.ReadFile(path)
143+
if err != nil {
144+
log.Fatal(err)
145+
}
146+
147+
var config WorkflowConfig
148+
err = json.Unmarshal(byteValue, &config)
149+
if err != nil {
150+
log.Fatal(err)
151+
}
152+
153+
return config
154+
}

pkg/driver/clients/dandelion_interface.go

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"github.com/vhive-serverless/loader/pkg/common"
88
"github.com/vhive-serverless/loader/pkg/metric"
99
"go.mongodb.org/mongo-driver/bson"
10+
"net/url"
11+
"os"
12+
"path/filepath"
1013
"strings"
1114
)
1215

@@ -87,18 +90,89 @@ func composeBusyLoopBody(functionName, image string, runtime, iterations int) *b
8790
return bytes.NewBuffer(body)
8891
}
8992

90-
func DeserializeDandelionResponse(function *common.Function, body []byte, record *metric.ExecutionRecord) error {
93+
func filenameFromPath(path string) string {
94+
ident := filepath.Base(path)
95+
if pos := strings.LastIndexByte(ident, '.'); pos != -1 {
96+
ident = ident[:pos]
97+
}
98+
return ident
99+
}
100+
101+
func CreateDandelionRequest(serviceName string, dataPaths [][]string) *DandelionRequest {
102+
logrus.Debugf("Creating dandelion request for '%s' with following data:", serviceName)
103+
sets := make([]InputSet, len(dataPaths))
104+
for setIdx, setPaths := range dataPaths {
105+
items := make([]InputItem, len(setPaths))
106+
for itmIdx, itmPath := range setPaths {
107+
var data []byte
108+
var ident string
109+
if itmPath == "" {
110+
data = []byte{}
111+
ident = "empty"
112+
} else {
113+
var err error
114+
data, err = os.ReadFile(itmPath)
115+
if err != nil {
116+
logrus.Fatalf("Failed to read file '%s': %v", itmPath, err)
117+
}
118+
ident = filenameFromPath(itmPath)
119+
}
120+
items[itmIdx] = InputItem{
121+
Identifier: ident,
122+
Key: int64(itmIdx),
123+
Data: data,
124+
}
125+
logrus.Debugf(" set %d, item %d -> %s (size=%d)\n", setIdx, itmIdx, ident, len(data))
126+
}
127+
sets[setIdx] = InputSet{
128+
Identifier: filenameFromPath(items[0].Identifier),
129+
Items: items,
130+
}
131+
}
132+
return &DandelionRequest{
133+
Name: serviceName,
134+
Sets: sets,
135+
}
136+
}
137+
138+
func WorkflowInvocationBody(wfName string, inData *DandelionRequest) string {
139+
var wfInput []byte
140+
var err error
141+
if inData == nil {
142+
wfInput = []byte{}
143+
} else {
144+
wfInput, err = bson.Marshal(inData)
145+
if err != nil {
146+
logrus.Errorf("Error encoding input data - %v\n", err)
147+
return ""
148+
}
149+
}
150+
151+
body := url.Values{
152+
"name": {wfName},
153+
"input": {string(wfInput)},
154+
}
155+
return body.Encode()
156+
}
157+
158+
func DeserializeDandelionResponse(function *common.Function, body []byte, record *metric.ExecutionRecord, allowEmptyResponse bool) error {
91159
var result DandelionDeserializeResponse
92160
err := bson.Unmarshal(body, &result)
93161
if err != nil {
94162
return fmt.Errorf("error deserializing response body - %v", err)
95163
}
96164

97-
rawResponseData := result.Sets[0].Items[0].Data
98-
data := strings.Split(string(rawResponseData), ",")
165+
if len(result.Sets) > 0 && len(result.Sets[0].Items) > 0 {
166+
rawResponseData := result.Sets[0].Items[0].Data
167+
data := strings.Split(string(rawResponseData), ",")
99168

100-
if len(data) > 0 && !strings.Contains(strings.ToLower(data[0]), "ok") {
101-
record.FunctionTimeout = false
169+
if len(data) > 0 && !strings.Contains(strings.ToLower(data[0]), "ok") {
170+
record.FunctionTimeout = false
171+
}
172+
} else {
173+
if allowEmptyResponse {
174+
record.FunctionTimeout = false
175+
}
102176
}
103177

104178
record.Instance = function.Name

pkg/driver/clients/http_client.go

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,8 @@ func newHTTPInvoker(cfg *config.LoaderConfiguration) *httpInvoker {
3333
}
3434
}
3535

36-
func (i *httpInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
37-
isDandelion := strings.Contains(strings.ToLower(i.cfg.Platform), "dandelion")
38-
isKnative := strings.Contains(strings.ToLower(i.cfg.Platform), "knative")
39-
40-
log.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)
41-
42-
record := &mc.ExecutionRecord{
43-
ExecutionRecordBase: mc.ExecutionRecordBase{
44-
RequestedDuration: uint32(runtimeSpec.Runtime * 1e3),
45-
},
46-
}
47-
48-
////////////////////////////////////
49-
// INVOKE FUNCTION
50-
////////////////////////////////////
51-
start := time.Now()
52-
record.StartTime = start.UnixMicro()
36+
func functionInvocationRequest(function *common.Function, runtimeSpec *common.RuntimeSpecification,
37+
isKnative bool, isDandelion bool) *http.Request {
5338

5439
requestBody := &bytes.Buffer{}
5540
/*if body := composeDandelionMatMulBody(function.Name); isDandelion && body != nil {
@@ -62,11 +47,7 @@ func (i *httpInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
6247
req, err := http.NewRequest("POST", "http://"+function.Endpoint, requestBody)
6348
if err != nil {
6449
log.Errorf("Failed to create a HTTP request - %v\n", err)
65-
66-
record.ResponseTime = time.Since(start).Microseconds()
67-
record.ConnectionTimeout = true
68-
69-
return false, record
50+
return nil
7051
}
7152

7253
// add system specific stuff
@@ -85,6 +66,60 @@ func (i *httpInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
8566
req.URL.Path = "/hot/matmul"
8667
}
8768

69+
return req
70+
}
71+
72+
func workflowInvocationRequest(wf *common.Function) *http.Request {
73+
if wf.WorkflowMetadata == nil {
74+
log.Fatal("Failed to create workflow invocation request: workflow metadata is nil")
75+
}
76+
77+
// create request
78+
reqBody := bytes.NewBufferString(wf.WorkflowMetadata.InvocationRequest)
79+
req, err := http.NewRequest("POST", "http://"+wf.Endpoint+"/workflow", reqBody)
80+
if err != nil {
81+
log.Errorf("Failed to create a HTTP request - %v\n", err)
82+
return nil
83+
}
84+
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
85+
req.Host = wf.Name // dirigent takes request name from this
86+
87+
return req
88+
}
89+
90+
func (i *httpInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
91+
isDandelion := strings.Contains(strings.ToLower(i.cfg.Platform), "dandelion")
92+
isKnative := strings.Contains(strings.ToLower(i.cfg.Platform), "knative")
93+
isWorkflow := strings.Contains(strings.ToLower(i.cfg.Platform), "workflow")
94+
95+
log.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)
96+
97+
record := &mc.ExecutionRecord{
98+
ExecutionRecordBase: mc.ExecutionRecordBase{
99+
RequestedDuration: uint32(runtimeSpec.Runtime * 1e3),
100+
},
101+
}
102+
start := time.Now()
103+
record.StartTime = start.UnixMicro()
104+
record.Instance = function.Name // may get overwritten
105+
106+
// create request
107+
var req *http.Request
108+
if !isWorkflow {
109+
req = functionInvocationRequest(function, runtimeSpec, isKnative, isDandelion)
110+
} else {
111+
if !isDandelion {
112+
log.Fatalf("Dirigent workflows are only supported for Dandelion so far!")
113+
}
114+
req = workflowInvocationRequest(function)
115+
}
116+
if req == nil {
117+
record.ResponseTime = time.Since(start).Microseconds()
118+
record.ConnectionTimeout = true
119+
return false, record
120+
}
121+
122+
// send request
88123
resp, err := i.client.Do(req)
89124
if err != nil {
90125
log.Errorf("%s - Failed to send an HTTP request to the server - %v\n", function.Name, err)
@@ -116,7 +151,7 @@ func (i *httpInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
116151
}
117152

118153
if isDandelion {
119-
err = DeserializeDandelionResponse(function, body, record)
154+
err = DeserializeDandelionResponse(function, body, record, isWorkflow)
120155
if err != nil {
121156
log.Warnf("Failed to deserialize Dandelion response - %v - %v", string(body), err)
122157
}

pkg/driver/clients/invoker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr
2323
} else {
2424
return newHTTPInvoker(cfg)
2525
}
26-
case "Dirigent-Dandelion":
26+
case "Dirigent-Dandelion", "Dirigent-Dandelion-Workflow":
2727
return newHTTPInvoker(cfg)
2828
case "Knative":
2929
if cfg.InvokeProtocol == "grpc" {

pkg/driver/deployment/deployer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ func CreateDeployer(cfg *config.Configuration) FunctionDeployer {
1414
switch cfg.LoaderConfiguration.Platform {
1515
case "AWSLambda":
1616
return newAWSLambdaDeployer()
17+
case "Dirigent-Dandelion-Workflow":
18+
return newDirigentDeployer(true)
1719
case "Dirigent", "Dirigent-Dandelion":
18-
return newDirigentDeployer()
20+
return newDirigentDeployer(false)
1921
case "Knative":
2022
return newKnativeDeployer()
2123
case "OpenWhisk":

0 commit comments

Comments
 (0)