|
5 | 5 | "errors" |
6 | 6 | "fmt" |
7 | 7 | "os" |
| 8 | + "reflect" |
8 | 9 | "sync" |
9 | 10 | "sync/atomic" |
10 | 11 | "testing" |
@@ -1079,9 +1080,33 @@ func TestPriorityQueue(t *testing.T) { |
1079 | 1080 | } |
1080 | 1081 |
|
1081 | 1082 | mu.Lock() |
1082 | | - expectedOrder := []int{0, 6, 7, 1, 2, 3, 4, 5} |
1083 | | - assert.Equal(t, expectedOrder, wfPriorityList, "expected workflow execution order %v, got %v", expectedOrder, wfPriorityList) |
| 1083 | + // Check if the expected order is either {0, 6, 7, ...} or {0, 7, 6, ...} |
| 1084 | + // This is because while tasks are dequeued in order, they can run asynchronously |
| 1085 | + // and one could set a value in wfPriorityList before the other |
| 1086 | + expectedOrder1 := []int{0, 6, 7, 1, 2, 3, 4, 5} |
| 1087 | + expectedOrder2 := []int{0, 7, 6, 1, 2, 3, 4, 5} |
| 1088 | + |
| 1089 | + validOrder := false |
| 1090 | + if reflect.DeepEqual(wfPriorityList, expectedOrder1) { |
| 1091 | + validOrder = true |
| 1092 | + } else if reflect.DeepEqual(wfPriorityList, expectedOrder2) { |
| 1093 | + validOrder = true |
| 1094 | + } |
| 1095 | + |
| 1096 | + assert.True(t, validOrder, "expected workflow execution order to be either %v or %v, got %v", |
| 1097 | + expectedOrder1, expectedOrder2, wfPriorityList) |
1084 | 1098 | mu.Unlock() |
1085 | 1099 |
|
| 1100 | + // Verify that handle6 and handle7 workflows were dequeued in FIFO order |
| 1101 | + // by checking that their StartedAt time is in the correct order (6 is before 7) |
| 1102 | + status6, err := handle6.GetStatus() |
| 1103 | + require.NoError(t, err, "failed to get status for workflow 6") |
| 1104 | + status7, err := handle7.GetStatus() |
| 1105 | + require.NoError(t, err, "failed to get status for workflow 7") |
| 1106 | + |
| 1107 | + assert.True(t, status6.StartedAt.Before(status7.StartedAt), |
| 1108 | + "expected workflow 6 to be dequeued before workflow 7, but got 6 started at %v and 7 started at %v", |
| 1109 | + status6.StartedAt, status7.StartedAt) |
| 1110 | + |
1086 | 1111 | require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after priority queue test") |
1087 | 1112 | } |
0 commit comments