Skip to content

Commit 60a96f2

Browse files
authored
Merge branch 'main' into 3318-RavenDB-state-store-new
2 parents be15d17 + 0ab52f7 commit 60a96f2

File tree

8 files changed

+86
-43
lines changed

8 files changed

+86
-43
lines changed

bindings/zeebe/command/activate_jobs.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ var (
3232
)
3333

3434
type activateJobsPayload struct {
35-
JobType string `json:"jobType"`
36-
MaxJobsToActivate *int32 `json:"maxJobsToActivate"`
37-
Timeout metadata.Duration `json:"timeout"`
38-
WorkerName string `json:"workerName"`
39-
FetchVariables []string `json:"fetchVariables"`
40-
RequestTimeout metadata.Duration `json:"requestTimeout"`
35+
JobType string `json:"jobType"`
36+
MaxJobsToActivate *int32 `json:"maxJobsToActivate"`
37+
Timeout *metadata.Duration `json:"timeout,omitempty"`
38+
WorkerName string `json:"workerName"`
39+
FetchVariables []string `json:"fetchVariables"`
40+
RequestTimeout *metadata.Duration `json:"requestTimeout,omitempty"`
4141
}
4242

4343
func (z *ZeebeCommand) activateJobs(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -59,7 +59,7 @@ func (z *ZeebeCommand) activateJobs(ctx context.Context, req *bindings.InvokeReq
5959
JobType(payload.JobType).
6060
MaxJobsToActivate(*payload.MaxJobsToActivate)
6161

62-
if payload.Timeout.Duration != time.Duration(0) {
62+
if payload.Timeout != nil && payload.Timeout.Duration != time.Duration(0) {
6363
cmd = cmd.Timeout(payload.Timeout.Duration)
6464
}
6565

@@ -72,7 +72,7 @@ func (z *ZeebeCommand) activateJobs(ctx context.Context, req *bindings.InvokeReq
7272
}
7373

7474
var response []entities.Job
75-
if payload.RequestTimeout.Duration != time.Duration(0) {
75+
if payload.RequestTimeout != nil && payload.RequestTimeout.Duration != time.Duration(0) {
7676
ctxWithTimeout, cancel := context.WithTimeout(ctx, payload.RequestTimeout.Duration)
7777
defer cancel()
7878
response, err = cmd.Send(ctxWithTimeout)

bindings/zeebe/command/activate_jobs_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func TestActivateJobs(t *testing.T) {
148148
payload := activateJobsPayload{
149149
JobType: "a",
150150
MaxJobsToActivate: new(int32),
151-
Timeout: kitmd.Duration{Duration: 1 * time.Second},
151+
Timeout: &kitmd.Duration{Duration: 1 * time.Second},
152152
WorkerName: "b",
153153
FetchVariables: []string{"a", "b", "c"},
154154
}

bindings/zeebe/command/create_instance.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ var (
3232
)
3333

3434
type createInstancePayload struct {
35-
BpmnProcessID string `json:"bpmnProcessId"`
36-
ProcessDefinitionKey *int64 `json:"processDefinitionKey"`
37-
Version *int32 `json:"version"`
38-
Variables interface{} `json:"variables"`
39-
WithResult bool `json:"withResult"`
40-
FetchVariables []string `json:"fetchVariables"`
41-
RequestTimeout metadata.Duration `json:"requestTimeout"`
35+
BpmnProcessID string `json:"bpmnProcessId"`
36+
ProcessDefinitionKey *int64 `json:"processDefinitionKey"`
37+
Version *int32 `json:"version"`
38+
Variables interface{} `json:"variables"`
39+
WithResult bool `json:"withResult"`
40+
FetchVariables []string `json:"fetchVariables"`
41+
RequestTimeout *metadata.Duration `json:"requestTimeout,omitempty"`
4242
}
4343

4444
func (z *ZeebeCommand) createInstance(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -87,7 +87,7 @@ func (z *ZeebeCommand) createInstance(ctx context.Context, req *bindings.InvokeR
8787
//
8888
// From a code perspective, there are two Send methods in the Zeebe client. One if WithResult was used and
8989
// which extracts the request timeout from the context and another one which will not use any timeout.
90-
if payload.WithResult && payload.RequestTimeout.Duration != time.Duration(0) {
90+
if payload.WithResult && payload.RequestTimeout != nil && payload.RequestTimeout.Duration != time.Duration(0) {
9191
ctxWithTimeout, cancel := context.WithTimeout(ctx, payload.RequestTimeout.Duration)
9292
defer cancel()
9393
response, err = cmd3.WithResult().FetchVariables(payload.FetchVariables...).Send(ctxWithTimeout)

bindings/zeebe/command/fail_job.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import (
2929
var ErrMissingRetries = errors.New("retries is a required attribute")
3030

3131
type failJobPayload struct {
32-
JobKey *int64 `json:"jobKey"`
33-
Retries *int32 `json:"retries"`
34-
ErrorMessage string `json:"errorMessage"`
35-
RetryBackOff metadata.Duration `json:"retryBackOff"`
36-
Variables interface{} `json:"variables"`
32+
JobKey *int64 `json:"jobKey"`
33+
Retries *int32 `json:"retries"`
34+
ErrorMessage string `json:"errorMessage"`
35+
RetryBackOff *metadata.Duration `json:"retryBackOff,omitempty"`
36+
Variables interface{} `json:"variables"`
3737
}
3838

3939
func (z *ZeebeCommand) failJob(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -59,7 +59,7 @@ func (z *ZeebeCommand) failJob(ctx context.Context, req *bindings.InvokeRequest)
5959
cmd = cmd.ErrorMessage(payload.ErrorMessage)
6060
}
6161

62-
if payload.RetryBackOff.Duration != time.Duration(0) {
62+
if payload.RetryBackOff != nil && payload.RetryBackOff.Duration != time.Duration(0) {
6363
cmd = cmd.RetryBackoff(payload.RetryBackOff.Duration)
6464
}
6565

bindings/zeebe/command/publish_message.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727
var ErrMissingMessageName = errors.New("messageName is a required attribute")
2828

2929
type publishMessagePayload struct {
30-
MessageName string `json:"messageName"`
31-
CorrelationKey string `json:"correlationKey"`
32-
MessageID string `json:"messageId"`
33-
TimeToLive metadata.Duration `json:"timeToLive"`
34-
Variables interface{} `json:"variables"`
30+
MessageName string `json:"messageName"`
31+
CorrelationKey string `json:"correlationKey"`
32+
MessageID string `json:"messageId"`
33+
TimeToLive *metadata.Duration `json:"timeToLive,omitempty"`
34+
Variables interface{} `json:"variables"`
3535
}
3636

3737
func (z *ZeebeCommand) publishMessage(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -53,7 +53,7 @@ func (z *ZeebeCommand) publishMessage(ctx context.Context, req *bindings.InvokeR
5353
cmd = cmd.MessageId(payload.MessageID)
5454
}
5555

56-
if payload.TimeToLive.Duration != time.Duration(0) {
56+
if payload.TimeToLive != nil && payload.TimeToLive.Duration != time.Duration(0) {
5757
cmd = cmd.TimeToLive(payload.TimeToLive.Duration)
5858
}
5959

bindings/zeebe/command/publish_message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestPublishMessage(t *testing.T) {
136136
MessageName: "a",
137137
CorrelationKey: "b",
138138
MessageID: "c",
139-
TimeToLive: kitmd.Duration{Duration: 1 * time.Second},
139+
TimeToLive: &kitmd.Duration{Duration: 1 * time.Second},
140140
Variables: map[string]interface{}{
141141
"key": "value",
142142
},

state/azure/cosmosdb/cosmosdb.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,14 +567,27 @@ func (c *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
567567
}
568568

569569
if !batchResponse.Success {
570-
// Transaction failed, look for the offending operation
570+
var transactionError error
571+
571572
for index, operation := range batchResponse.OperationResults {
573+
// delete operations with no etag check are allowed to fail with a 404
574+
deleteReq, isDelete := request.Operations[index].(state.DeleteRequest)
575+
if isDelete && operation.StatusCode == http.StatusNotFound && !deleteReq.HasETag() {
576+
continue
577+
}
578+
572579
if operation.StatusCode != http.StatusFailedDependency {
573580
c.logger.Errorf("Transaction failed due to operation %v which failed with status code %d", index, operation.StatusCode)
574581
return fmt.Errorf("transaction failed due to operation %v which failed with status code %d", index, operation.StatusCode)
575582
}
583+
transactionError = errors.New("transaction failed")
584+
}
585+
586+
// If all errors are from delete operations with a 404 (and no etag check), we end up here with a nil error.
587+
// This is expected, as we allow delete operations to fail with a 404.
588+
if transactionError != nil {
589+
return transactionError
576590
}
577-
return errors.New("transaction failed")
578591
}
579592

580593
// Transaction succeeded

tests/conformance/state/state.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
296296
query: `
297297
{
298298
"filter": {
299-
"OR": [
300-
{
299+
"OR": [
300+
{
301301
"AND": [
302302
{
303303
"EQ": {"message": "` + key + `message"}
@@ -310,7 +310,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
310310
}
311311
]
312312
},
313-
{
313+
{
314314
"AND": [
315315
{
316316
"EQ": {"message": "` + key + `message"}
@@ -702,6 +702,27 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
702702
}
703703
})
704704

705+
t.Run("delete non-existent key", func(t *testing.T) {
706+
// for CosmosDB
707+
partitionMetadata := map[string]string{
708+
"partitionKey": "myPartition",
709+
}
710+
operations := []state.TransactionalStateOperation{
711+
state.DeleteRequest{
712+
Key: "non-existent-key",
713+
},
714+
}
715+
716+
transactionStore, ok := statestore.(state.TransactionalStore)
717+
require.True(t, ok)
718+
719+
err := transactionStore.Multi(t.Context(), &state.TransactionalStateRequest{
720+
Operations: operations,
721+
Metadata: partitionMetadata,
722+
})
723+
require.NoError(t, err)
724+
})
725+
705726
t.Run("transaction-order", func(t *testing.T) {
706727
// Arrange
707728
firstKey := key + "-key1"
@@ -796,6 +817,12 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
796817

797818
metadataTest1 := map[string]string{
798819
"contentType": "application/json",
820+
// for CosmosDB
821+
"partitionKey": "myPartition",
822+
}
823+
metadataTest2 := map[string]string{
824+
// for CosmosDB
825+
"partitionKey": "myPartition",
799826
}
800827

801828
operations := []state.TransactionalStateOperation{
@@ -817,13 +844,17 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
817844

818845
expectedMetadata := map[string]map[string]string{
819846
keyTest1: metadataTest1,
847+
keyTest2: metadataTest2,
820848
}
821849

822850
// Act
823851
transactionStore, ok := statestore.(state.TransactionalStore)
824852
assert.True(t, ok)
825853
err := transactionStore.Multi(t.Context(), &state.TransactionalStateRequest{
826854
Operations: operations,
855+
Metadata: map[string]string{
856+
"partitionKey": "myPartition",
857+
},
827858
})
828859
require.NoError(t, err)
829860

@@ -833,17 +864,16 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
833864
Key: k,
834865
Metadata: expectedMetadata[k],
835866
})
836-
expectedValue := res.Data
867+
require.NoError(t, err)
868+
receivedValue := res.Data
837869

838870
// In redisjson when set the value with contentType = application/Json store the value in base64
839-
if strings.HasPrefix(string(expectedValue), "\"ey") {
840-
valueBase64 := strings.Trim(string(expectedValue), "\"")
841-
expectedValueDecoded, _ := base64.StdEncoding.DecodeString(valueBase64)
842-
require.NoError(t, err)
843-
assert.Equal(t, expectedValueDecoded, v)
871+
if strings.HasPrefix(string(receivedValue), "\"ey") {
872+
valueBase64 := strings.Trim(string(receivedValue), "\"")
873+
receivedValueDecoded, _ := base64.StdEncoding.DecodeString(valueBase64)
874+
assert.JSONEq(t, string(v), string(receivedValueDecoded))
844875
} else {
845-
require.NoError(t, err)
846-
assert.Equal(t, expectedValue, v)
876+
assert.JSONEq(t, string(v), string(receivedValue))
847877
}
848878
}
849879
}

0 commit comments

Comments
 (0)