Skip to content

Commit d21e6ac

Browse files
ktropsKatie Atropslongquanzheng
authored
Add external storage support for next state input & setting data attr… (#584)
Co-authored-by: Katie Atrops <[email protected]> Co-authored-by: Quanzheng Long <[email protected]>
1 parent 63cac35 commit d21e6ac

File tree

6 files changed

+757
-16
lines changed

6 files changed

+757
-16
lines changed

integ/s3_wf_start_input_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,20 @@ func doTestWorkflowWithS3StartInput(t *testing.T, backendType service.BackendTyp
8383

8484
_, history := wfHandler.GetTestResult()
8585

86-
assertions.Equal(history["S1_start_input"], iwfidl.EncodedObject{
87-
Encoding: iwfidl.PtrString("json"),
88-
Data: iwfidl.PtrString("\"12345678901\""),
89-
}, "S1_start_input is not equal")
90-
assertions.Equal(history["S1_decide_input"], iwfidl.EncodedObject{
91-
Encoding: iwfidl.PtrString("json"),
92-
Data: iwfidl.PtrString("\"12345678901\""),
93-
}, "S1_decide_input is not equal")
86+
// The handler should receive objects with both the loaded data AND preserved external storage references
87+
s1StartInput := history["S1_start_input"].(iwfidl.EncodedObject)
88+
s1DecideInput := history["S1_decide_input"].(iwfidl.EncodedObject)
89+
90+
// Verify the data content is correct
91+
assertions.Equal(*s1StartInput.Data, "\"12345678901\"", "S1_start_input data should match")
92+
assertions.Equal(*s1StartInput.Encoding, "json", "S1_start_input encoding should match")
93+
assertions.Nil(s1StartInput.ExtStoreId)
94+
assertions.Nil(s1StartInput.ExtPath)
95+
96+
assertions.Equal(*s1DecideInput.Data, "\"12345678901\"", "S1_decide_input data should match")
97+
assertions.Equal(*s1DecideInput.Encoding, "json", "S1_decide_input encoding should match")
98+
assertions.Nil(s1DecideInput.ExtStoreId)
99+
assertions.Nil(s1DecideInput.ExtPath)
94100

95101
assertions.Equal(history["S1_start"], int64(1), "S1_start is not equal")
96102
assertions.Equal(history["S1_decide"], int64(1), "S1_decide is not equal")
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package integ
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
s3_state_input_optimization "github.com/indeedeng/iwf/integ/workflow/s3-state-input-optimization"
10+
11+
"github.com/indeedeng/iwf/service/common/ptr"
12+
13+
"github.com/indeedeng/iwf/gen/iwfidl"
14+
"github.com/indeedeng/iwf/service"
15+
"github.com/stretchr/testify/assert"
16+
)
17+
18+
func TestS3WorkflowStateInputOptimizationTemporal(t *testing.T) {
19+
if !*temporalIntegTest {
20+
t.Skip()
21+
}
22+
23+
for i := 0; i < *repeatIntegTest; i++ {
24+
doTestWorkflowWithS3StateInputOptimization(t, service.BackendTypeTemporal)
25+
smallWaitForFastTest()
26+
}
27+
}
28+
29+
func TestS3WorkflowStateInputOptimizationCadence(t *testing.T) {
30+
if !*cadenceIntegTest {
31+
t.Skip()
32+
}
33+
for i := 0; i < *repeatIntegTest; i++ {
34+
doTestWorkflowWithS3StateInputOptimization(t, service.BackendTypeCadence)
35+
smallWaitForFastTest()
36+
}
37+
}
38+
39+
func doTestWorkflowWithS3StateInputOptimization(t *testing.T, backendType service.BackendType) {
40+
// start test workflow server
41+
wfHandler := s3_state_input_optimization.NewHandler()
42+
closeFunc1 := startWorkflowWorker(wfHandler, t)
43+
defer closeFunc1()
44+
45+
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
46+
BackendType: backendType,
47+
S3TestThreshold: 10, // Set low threshold so our test data gets stored in S3
48+
})
49+
defer closeFunc2()
50+
51+
// start a workflow
52+
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
53+
Servers: []iwfidl.ServerConfiguration{
54+
{
55+
URL: "http://localhost:" + testIwfServerPort,
56+
},
57+
},
58+
})
59+
wfId := s3_state_input_optimization.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
60+
61+
// Create large input that will be stored in S3
62+
wfInput := &iwfidl.EncodedObject{
63+
Encoding: iwfidl.PtrString("json"),
64+
Data: iwfidl.PtrString("\"this-is-a-large-input-that-exceeds-threshold\""), // 50+ bytes
65+
}
66+
67+
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
68+
startReq := iwfidl.WorkflowStartRequest{
69+
WorkflowId: wfId,
70+
IwfWorkflowType: s3_state_input_optimization.WorkflowType,
71+
WorkflowTimeoutSeconds: 100,
72+
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
73+
StartStateId: ptr.Any(s3_state_input_optimization.State1),
74+
StateInput: wfInput,
75+
}
76+
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
77+
failTestAtHttpError(err, httpResp, t)
78+
79+
req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
80+
_, httpResp2, err2 := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
81+
WorkflowId: wfId,
82+
}).Execute()
83+
failTestAtHttpError(err2, httpResp2, t)
84+
85+
assertions := assert.New(t)
86+
87+
_, history := wfHandler.GetTestResult()
88+
89+
// Verify all states received the correct input
90+
assertions.Equal(history["S1_start"], int64(1), "S1_start should be called once")
91+
assertions.Equal(history["S1_decide"], int64(1), "S1_decide should be called once")
92+
assertions.Equal(history["S2_start"], int64(1), "S2_start should be called once")
93+
assertions.Equal(history["S2_decide"], int64(1), "S2_decide should be called once")
94+
assertions.Equal(history["S3_start"], int64(1), "S3_start should be called once")
95+
assertions.Equal(history["S3_decide"], int64(1), "S3_decide should be called once")
96+
97+
// Verify input data was correctly loaded at each state
98+
expectedData := "\"this-is-a-large-input-that-exceeds-threshold\""
99+
assertions.Equal(history["S1_input_data"], expectedData, "S1 should receive correct input data")
100+
assertions.Equal(history["S2_input_data"], expectedData, "S2 should receive correct input data (same as S1)")
101+
assertions.Equal(history["S3_input_data"], expectedData, "S3 should receive correct input data (same as S1 and S2)")
102+
103+
// Verify optimization: should only have 1 object in S3 despite being used 3 times
104+
// because the same data gets reused instead of duplicated
105+
objectCount, err := globalBlobStore.CountWorkflowObjectsForTesting(context.Background(), wfId)
106+
assertions.Nil(err)
107+
assertions.Equal(int64(1), objectCount, "Should only have 1 object in S3 due to deduplication optimization")
108+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package integ
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
s3_upsert_data_objects "github.com/indeedeng/iwf/integ/workflow/s3-upsert-data-objects"
10+
11+
"github.com/indeedeng/iwf/service/common/ptr"
12+
13+
"github.com/indeedeng/iwf/gen/iwfidl"
14+
"github.com/indeedeng/iwf/service"
15+
"github.com/stretchr/testify/assert"
16+
)
17+
18+
func TestS3WorkflowUpsertDataObjectsTemporal(t *testing.T) {
19+
if !*temporalIntegTest {
20+
t.Skip()
21+
}
22+
23+
for i := 0; i < *repeatIntegTest; i++ {
24+
doTestWorkflowWithS3UpsertDataObjects(t, service.BackendTypeTemporal)
25+
smallWaitForFastTest()
26+
}
27+
}
28+
29+
func TestS3WorkflowUpsertDataObjectsCadence(t *testing.T) {
30+
if !*cadenceIntegTest {
31+
t.Skip()
32+
}
33+
for i := 0; i < *repeatIntegTest; i++ {
34+
doTestWorkflowWithS3UpsertDataObjects(t, service.BackendTypeCadence)
35+
smallWaitForFastTest()
36+
}
37+
}
38+
39+
func doTestWorkflowWithS3UpsertDataObjects(t *testing.T, backendType service.BackendType) {
40+
// start test workflow server
41+
wfHandler := s3_upsert_data_objects.NewHandler()
42+
closeFunc1 := startWorkflowWorker(wfHandler, t)
43+
defer closeFunc1()
44+
45+
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
46+
BackendType: backendType,
47+
S3TestThreshold: 10, // Set low threshold so our test data gets stored in S3
48+
})
49+
defer closeFunc2()
50+
51+
// start a workflow
52+
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
53+
Servers: []iwfidl.ServerConfiguration{
54+
{
55+
URL: "http://localhost:" + testIwfServerPort,
56+
},
57+
},
58+
})
59+
wfId := s3_upsert_data_objects.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
60+
61+
// Create small input
62+
wfInput := &iwfidl.EncodedObject{
63+
Encoding: iwfidl.PtrString("json"),
64+
Data: iwfidl.PtrString("\"test\""),
65+
}
66+
67+
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
68+
startReq := iwfidl.WorkflowStartRequest{
69+
WorkflowId: wfId,
70+
IwfWorkflowType: s3_upsert_data_objects.WorkflowType,
71+
WorkflowTimeoutSeconds: 100,
72+
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
73+
StartStateId: ptr.Any(s3_upsert_data_objects.State1),
74+
StateInput: wfInput,
75+
}
76+
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
77+
failTestAtHttpError(err, httpResp, t)
78+
79+
req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
80+
_, httpResp2, err2 := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
81+
WorkflowId: wfId,
82+
}).Execute()
83+
failTestAtHttpError(err2, httpResp2, t)
84+
85+
assertions := assert.New(t)
86+
87+
_, history := wfHandler.GetTestResult()
88+
89+
// Verify all states were executed
90+
assertions.Equal(history["S1_start"], int64(1), "S1_start should be called once")
91+
assertions.Equal(history["S1_decide"], int64(1), "S1_decide should be called once")
92+
assertions.Equal(history["S2_start"], int64(1), "S2_start should be called once")
93+
assertions.Equal(history["S2_decide"], int64(1), "S2_decide should be called once")
94+
95+
// Verify State2 received the large data objects that were upserted by State1
96+
assertions.Equal(history["S2_received_large_obj1"], true, "S2 should receive large_obj1 from State1's upsert")
97+
assertions.Equal(history["S2_received_large_obj2"], true, "S2 should receive large_obj2 from State1's upsert")
98+
assertions.Equal(history["S2_received_small_obj3"], true, "S2 should receive small_obj3 from State1's upsert")
99+
100+
// Verify the data content matches what State1 upserted
101+
assertions.Equal(history["S2_large_obj1_data"], s3_upsert_data_objects.LargeDataContent1, "S2 large_obj1 data should match")
102+
assertions.Equal(history["S2_large_obj2_data"], s3_upsert_data_objects.LargeDataContent2, "S2 large_obj2 data should match")
103+
assertions.Equal(history["S2_small_obj3_data"], s3_upsert_data_objects.SmallDataContent3, "S2 small_obj3 data should match")
104+
105+
// Verify external storage usage: 2 large objects should be in S3, small one should stay in memory
106+
objectCount, err := globalBlobStore.CountWorkflowObjectsForTesting(context.Background(), wfId)
107+
assertions.Nil(err)
108+
assertions.Equal(int64(2), objectCount, "Should have 2 objects in S3 (large_obj1 and large_obj2)")
109+
}

0 commit comments

Comments
 (0)