|
| 1 | +package integ |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "strconv" |
| 6 | + "testing" |
| 7 | + "time" |
| 8 | + |
| 9 | + s3_init_data_attributes "github.com/indeedeng/iwf/integ/workflow/s3-init-data-attributes" |
| 10 | + |
| 11 | + "github.com/indeedeng/iwf/service/common/ptr" |
| 12 | + |
| 13 | + "github.com/indeedeng/iwf/gen/iwfidl" |
| 14 | + "github.com/indeedeng/iwf/service" |
| 15 | + "github.com/stretchr/testify/assert" |
| 16 | +) |
| 17 | + |
| 18 | +func TestS3WorkflowInitDataAttributesTemporal(t *testing.T) { |
| 19 | + if !*temporalIntegTest { |
| 20 | + t.Skip() |
| 21 | + } |
| 22 | + |
| 23 | + for i := 0; i < *repeatIntegTest; i++ { |
| 24 | + doTestWorkflowWithS3InitDataAttributes(t, service.BackendTypeTemporal) |
| 25 | + smallWaitForFastTest() |
| 26 | + } |
| 27 | +} |
| 28 | + |
| 29 | +func TestS3WorkflowInitDataAttributesCadence(t *testing.T) { |
| 30 | + if !*cadenceIntegTest { |
| 31 | + t.Skip() |
| 32 | + } |
| 33 | + for i := 0; i < *repeatIntegTest; i++ { |
| 34 | + doTestWorkflowWithS3InitDataAttributes(t, service.BackendTypeCadence) |
| 35 | + smallWaitForFastTest() |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +func doTestWorkflowWithS3InitDataAttributes(t *testing.T, backendType service.BackendType) { |
| 40 | + // start test workflow server |
| 41 | + wfHandler := s3_init_data_attributes.NewHandler() |
| 42 | + closeFunc1 := startWorkflowWorker(wfHandler, t) |
| 43 | + defer closeFunc1() |
| 44 | + |
| 45 | + _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ |
| 46 | + BackendType: backendType, |
| 47 | + S3TestThreshold: 10, // Set low threshold so our test data gets stored in S3 |
| 48 | + }) |
| 49 | + defer closeFunc2() |
| 50 | + |
| 51 | + // start a workflow |
| 52 | + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ |
| 53 | + Servers: []iwfidl.ServerConfiguration{ |
| 54 | + { |
| 55 | + URL: "http://localhost:" + testIwfServerPort, |
| 56 | + }, |
| 57 | + }, |
| 58 | + }) |
| 59 | + wfId := s3_init_data_attributes.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) |
| 60 | + |
| 61 | + // Create initial data attributes - mix of large (stored in S3) and small (kept in memory) |
| 62 | + initialDataAttributes := []iwfidl.KeyValue{ |
| 63 | + { |
| 64 | + Key: iwfidl.PtrString(s3_init_data_attributes.TestDataAttrKey1), |
| 65 | + Value: &s3_init_data_attributes.TestDataAttributeVal1, // Large - will go to S3 |
| 66 | + }, |
| 67 | + { |
| 68 | + Key: iwfidl.PtrString(s3_init_data_attributes.TestDataAttrKey2), |
| 69 | + Value: &s3_init_data_attributes.TestDataAttributeVal2, // Large - will go to S3 |
| 70 | + }, |
| 71 | + { |
| 72 | + Key: iwfidl.PtrString(s3_init_data_attributes.TestDataAttrKey3), |
| 73 | + Value: &s3_init_data_attributes.TestDataAttributeVal3, // Small - will stay in memory |
| 74 | + }, |
| 75 | + } |
| 76 | + |
| 77 | + wfInput := &iwfidl.EncodedObject{ |
| 78 | + Encoding: iwfidl.PtrString("json"), |
| 79 | + Data: iwfidl.PtrString("\"test-input\""), |
| 80 | + } |
| 81 | + |
| 82 | + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) |
| 83 | + startReq := iwfidl.WorkflowStartRequest{ |
| 84 | + WorkflowId: wfId, |
| 85 | + IwfWorkflowType: s3_init_data_attributes.WorkflowType, |
| 86 | + WorkflowTimeoutSeconds: 100, |
| 87 | + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, |
| 88 | + StartStateId: ptr.Any(s3_init_data_attributes.State1), |
| 89 | + StateInput: wfInput, |
| 90 | + WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ |
| 91 | + DataAttributes: initialDataAttributes, |
| 92 | + }, |
| 93 | + } |
| 94 | + _, httpResp, err := req.WorkflowStartRequest(startReq).Execute() |
| 95 | + failTestAtHttpError(err, httpResp, t) |
| 96 | + |
| 97 | + req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) |
| 98 | + _, httpResp2, err2 := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ |
| 99 | + WorkflowId: wfId, |
| 100 | + }).Execute() |
| 101 | + failTestAtHttpError(err2, httpResp2, t) |
| 102 | + |
| 103 | + assertions := assert.New(t) |
| 104 | + |
| 105 | + _, history := wfHandler.GetTestResult() |
| 106 | + |
| 107 | + // Verify State1 start (waitUntil) received data attributes from S3 matching initial values |
| 108 | + assertions.Equal(history["S1_start"], int64(1), "S1_start should be called once") |
| 109 | + assertions.Equal(history["S1_start_attr1_found"], true, "S1_start should find data attribute 1") |
| 110 | + assertions.Equal(history["S1_start_attr2_found"], true, "S1_start should find data attribute 2") |
| 111 | + assertions.Equal(history["S1_start_attr3_found"], true, "S1_start should find data attribute 3") |
| 112 | + assertions.Equal(history["S1_start_total_attrs"], 3, "S1_start should receive exactly 3 data attributes (no duplicates)") |
| 113 | + assertions.Equal(history["S1_start_attr1_data"], *s3_init_data_attributes.TestDataAttributeVal1.Data, "S1_start attr1 data should match initial value") |
| 114 | + assertions.Equal(history["S1_start_attr2_data"], *s3_init_data_attributes.TestDataAttributeVal2.Data, "S1_start attr2 data should match initial value") |
| 115 | + assertions.Equal(history["S1_start_attr3_data"], *s3_init_data_attributes.TestDataAttributeVal3.Data, "S1_start attr3 data should match initial value") |
| 116 | + assertions.Equal(history["S1_start_validation_pass"], true, "S1_start validation should pass - all data attributes match initial values exactly") |
| 117 | + |
| 118 | + // Verify State1 decide (execute) was called |
| 119 | + assertions.Equal(history["S1_decide"], int64(1), "S1_decide should be called once") |
| 120 | + |
| 121 | + // Verify State2 start (waitUntil) was called |
| 122 | + assertions.Equal(history["S2_start"], int64(1), "S2_start should be called once") |
| 123 | + |
| 124 | + // Verify State2 decide (execute) received data attributes from S3 matching initial values |
| 125 | + assertions.Equal(history["S2_decide"], int64(1), "S2_decide should be called once") |
| 126 | + assertions.Equal(history["S2_decide_attr1_found"], true, "S2_decide should find data attribute 1") |
| 127 | + assertions.Equal(history["S2_decide_attr2_found"], true, "S2_decide should find data attribute 2") |
| 128 | + assertions.Equal(history["S2_decide_attr3_found"], true, "S2_decide should find data attribute 3") |
| 129 | + assertions.Equal(history["S2_decide_total_attrs"], 3, "S2_decide should receive exactly 3 data attributes (no duplicates)") |
| 130 | + assertions.Equal(history["S2_decide_attr1_data"], *s3_init_data_attributes.TestDataAttributeVal1.Data, "S2_decide attr1 data should match initial value") |
| 131 | + assertions.Equal(history["S2_decide_attr2_data"], *s3_init_data_attributes.TestDataAttributeVal2.Data, "S2_decide attr2 data should match initial value") |
| 132 | + assertions.Equal(history["S2_decide_attr3_data"], *s3_init_data_attributes.TestDataAttributeVal3.Data, "S2_decide attr3 data should match initial value") |
| 133 | + assertions.Equal(history["S2_decide_validation_pass"], true, "S2_decide validation should pass - all data attributes match initial values exactly") |
| 134 | +} |
0 commit comments