Skip to content

Commit fa54109

Browse files
yaron2elena-kolevska
authored andcommitted
Add data extension fields to outbox (dapr#7705)
* add data extension fields to outbox Signed-off-by: yaron2 <[email protected]> * fix typo Signed-off-by: yaron2 <[email protected]> * add pubsub field to finalized cloudevent Signed-off-by: yaron2 <[email protected]> * add non-overridable fields and tests Signed-off-by: yaron2 <[email protected]> * add validation of correct cloud events to intg test Signed-off-by: yaron2 <[email protected]> --------- Signed-off-by: yaron2 <[email protected]> Signed-off-by: Elena Kolevska <[email protected]>
1 parent ea7e979 commit fa54109

File tree

3 files changed

+92
-37
lines changed

3 files changed

+92
-37
lines changed

pkg/runtime/pubsub/outbox.go

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,23 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
163163
ceData = []byte(fmt.Sprintf("%v", sr.Value))
164164
}
165165

166-
ce := &CloudEvent{
167-
ID: tr.GetKey(),
168-
Source: source,
169-
Pubsub: c.outboxPubsub,
170-
Data: ceData,
171-
TraceID: traceID,
172-
TraceState: traceState,
173-
}
174-
166+
var dataContentType string
175167
if sr.ContentType != nil {
176-
ce.DataContentType = *sr.ContentType
168+
dataContentType = *sr.ContentType
177169
}
178170

179-
msg, err := NewCloudEvent(ce, nil)
180-
if err != nil {
181-
return nil, err
171+
ce := contribPubsub.NewCloudEventsEnvelope(tr.GetKey(), source, "", "", "", c.outboxPubsub, dataContentType, ceData, "", traceState)
172+
ce[contribPubsub.TraceIDField] = traceID
173+
174+
for k, v := range op.GetMetadata() {
175+
if k == contribPubsub.DataField || k == contribPubsub.IDField {
176+
continue
177+
}
178+
179+
ce[k] = v
182180
}
183181

184-
data, err := json.Marshal(msg)
182+
data, err := json.Marshal(ce)
185183
if err != nil {
186184
return nil, err
187185
}
@@ -228,10 +226,6 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string
228226
}
229227

230228
stateKey := o.cloudEventExtractorFn(cloudEvent, contribPubsub.IDField)
231-
data := []byte(o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataField))
232-
contentType := o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataContentTypeField)
233-
traceID := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceIDField)
234-
traceState := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceStateField)
235229

236230
store, ok := o.getStateFn(stateStore)
237231
if !ok {
@@ -274,24 +268,16 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string
274268
return err
275269
}
276270

277-
ce, err := NewCloudEvent(&CloudEvent{
278-
Data: data,
279-
DataContentType: contentType,
280-
Pubsub: c.publishPubSub,
281-
Source: appID,
282-
Topic: c.publishTopic,
283-
TraceID: traceID,
284-
TraceState: traceState,
285-
}, nil)
286-
if err != nil {
287-
return err
288-
}
271+
cloudEvent[contribPubsub.TopicField] = c.publishTopic
272+
cloudEvent[contribPubsub.PubsubField] = c.publishPubSub
289273

290-
b, err := json.Marshal(ce)
274+
b, err := json.Marshal(cloudEvent)
291275
if err != nil {
292276
return err
293277
}
294278

279+
contentType := cloudEvent[contribPubsub.DataContentTypeField].(string)
280+
295281
err = o.publishFn(ctx, &contribPubsub.PublishRequest{
296282
PubsubName: c.publishPubSub,
297283
Data: b,

pkg/runtime/pubsub/outbox_test.go

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,60 @@ func TestPublishInternal(t *testing.T) {
259259
require.NoError(t, err)
260260
})
261261

262+
t.Run("valid operation, correct overridden parameters", func(t *testing.T) {
263+
o := newTestOutbox().(*outboxImpl)
264+
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
265+
var cloudEvent map[string]interface{}
266+
err := json.Unmarshal(pr.Data, &cloudEvent)
267+
require.NoError(t, err)
268+
269+
assert.Equal(t, "test", cloudEvent["data"])
270+
assert.Equal(t, "a", pr.PubsubName)
271+
assert.Equal(t, "testapp1outbox", pr.Topic)
272+
assert.Equal(t, "testsource", cloudEvent["source"])
273+
assert.Equal(t, "text/plain", cloudEvent["datacontenttype"])
274+
assert.Equal(t, "a", cloudEvent["pubsubname"])
275+
276+
return nil
277+
}
278+
279+
o.AddOrUpdateOutbox(v1alpha1.Component{
280+
ObjectMeta: metav1.ObjectMeta{
281+
Name: "test",
282+
},
283+
Spec: v1alpha1.ComponentSpec{
284+
Metadata: []common.NameValuePair{
285+
{
286+
Name: outboxPublishPubsubKey,
287+
Value: common.DynamicValue{
288+
JSON: v1.JSON{
289+
Raw: []byte("a"),
290+
},
291+
},
292+
},
293+
{
294+
Name: outboxPublishTopicKey,
295+
Value: common.DynamicValue{
296+
JSON: v1.JSON{
297+
Raw: []byte("1"),
298+
},
299+
},
300+
},
301+
},
302+
},
303+
})
304+
305+
_, err := o.PublishInternal(context.Background(), "test", []state.TransactionalStateOperation{
306+
state.SetRequest{
307+
Key: "key",
308+
Value: "test",
309+
Metadata: map[string]string{"source": "testsource"},
310+
},
311+
}, "testapp", "", "")
312+
313+
require.NoError(t, err)
314+
})
315+
262316
t.Run("valid operation, no datacontenttype", func(t *testing.T) {
263317
o := newTestOutbox().(*outboxImpl)
264318
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
@@ -479,7 +533,7 @@ func TestPublishInternal(t *testing.T) {
479533
}
480534

481535
func TestSubscribeToInternalTopics(t *testing.T) {
482-
t.Run("correct configuration with trace", func(t *testing.T) {
536+
t.Run("correct configuration with trace, custom field and nonoverridable fields", func(t *testing.T) {
483537
o := newTestOutbox().(*outboxImpl)
484538
o.cloudEventExtractorFn = extractCloudEventProperty
485539

@@ -496,20 +550,31 @@ func TestSubscribeToInternalTopics(t *testing.T) {
496550
internalCalledCh := make(chan struct{})
497551
externalCalledCh := make(chan struct{})
498552

553+
var closed bool
554+
499555
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
500556
if pr.Topic == outboxTopic {
501557
close(internalCalledCh)
502558
} else if pr.Topic == "1" {
503-
close(externalCalledCh)
559+
if !closed {
560+
close(externalCalledCh)
561+
closed = true
562+
}
504563
}
505564

506565
ce := map[string]string{}
507566
json.Unmarshal(pr.Data, &ce)
508567

509568
traceID := ce[contribPubsub.TraceIDField]
510569
traceState := ce[contribPubsub.TraceStateField]
570+
customField := ce["outbox.cloudevent.customfield"]
571+
data := ce[contribPubsub.DataField]
572+
id := ce[contribPubsub.IDField]
511573
assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceID)
512574
assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceState)
575+
assert.Equal(t, "a", customField)
576+
assert.Equal(t, "hello", data)
577+
assert.Contains(t, id, "outbox-")
513578

514579
return psMock.Publish(ctx, pr)
515580
}
@@ -556,8 +621,9 @@ func TestSubscribeToInternalTopics(t *testing.T) {
556621
go func() {
557622
trs, pErr := o.PublishInternal(context.Background(), "test", []state.TransactionalStateOperation{
558623
state.SetRequest{
559-
Key: "1",
560-
Value: "hello",
624+
Key: "1",
625+
Value: "hello",
626+
Metadata: map[string]string{"outbox.cloudevent.customfield": "a", "data": "a", "id": "b"},
561627
},
562628
}, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01")
563629

tests/integration/suite/daprd/outbox/http/basic.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,9 @@ func (o *basic) Run(t *testing.T, ctx context.Context) {
130130

131131
postURL := fmt.Sprintf("http://localhost:%d/v1.0/state/mystore/transaction", o.daprd.HTTPPort())
132132
stateReq := state.SetRequest{
133-
Key: "1",
134-
Value: "2",
133+
Key: "1",
134+
Value: "2",
135+
Metadata: map[string]string{"outbox.cloudevent.myapp": "myapp1", "data": "a", "id": "b"},
135136
}
136137

137138
tr := stateTransactionRequestBody{
@@ -174,5 +175,7 @@ func (o *basic) Run(t *testing.T, ctx context.Context) {
174175
//nolint:testifylint
175176
assert.NoError(c, err)
176177
assert.Equal(c, "2", ce["data"])
178+
assert.Equal(c, "myapp1", ce["outbox.cloudevent.myapp"])
179+
assert.Contains(c, ce["id"], "outbox-")
177180
}, time.Second*10, time.Millisecond*10)
178181
}

0 commit comments

Comments
 (0)