Skip to content

Commit bc812d2

Browse files
yaron2elena-kolevska
authored andcommitted
Allow outbox payload transformation (dapr#7718)
* first commit Signed-off-by: yaron2 <[email protected]> * add tests Signed-off-by: yaron2 <[email protected]> * linter Signed-off-by: yaron2 <[email protected]> --------- Signed-off-by: yaron2 <[email protected]> Signed-off-by: Elena Kolevska <[email protected]>
1 parent 3b1714d commit bc812d2

File tree

6 files changed

+395
-14
lines changed

6 files changed

+395
-14
lines changed

pkg/api/grpc/grpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -961,14 +961,14 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
961961
if outboxEnabled {
962962
span := diagUtils.SpanFromContext(ctx)
963963
corID, traceState := diag.TraceIDAndStateFromSpan(span)
964-
trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
964+
ops, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
965965
if err != nil {
966966
nerr := apierrors.PubSubOutbox(a.AppID(), err)
967967
apiServerLogger.Debug(nerr)
968968
return &emptypb.Empty{}, nerr
969969
}
970970

971-
operations = append(operations, trs...)
971+
operations = ops
972972
}
973973

974974
start := time.Now()

pkg/api/http/http.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,15 +1578,15 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
15781578
if outboxEnabled {
15791579
span := diagUtils.SpanFromContext(reqCtx)
15801580
corID, traceState := diag.TraceIDAndStateFromSpan(span)
1581-
trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
1581+
ops, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
15821582
if err != nil {
15831583
nerr := apierrors.PubSubOutbox(a.universal.AppID(), err)
15841584
universalFastHTTPErrorResponder(reqCtx, nerr)
15851585
log.Debug(nerr)
15861586
return
15871587
}
15881588

1589-
operations = append(operations, trs...)
1589+
operations = ops
15901590
}
15911591

15921592
start := time.Now()

pkg/runtime/pubsub/outbox.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func transaction() (state.TransactionalStateOperation, error) {
129129
}, nil
130130
}
131131

132-
// PublishInternal publishes the state to an internal topic for outbox processing
132+
// PublishInternal publishes the state to an internal topic for outbox processing and returns the updated list of transactions
133133
func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, operations []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error) {
134134
o.lock.RLock()
135135
c, ok := o.outboxStores[stateStore]
@@ -139,7 +139,21 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
139139
return nil, fmt.Errorf("error publishing internal outbox message: could not find outbox configuration on state store %s", stateStore)
140140
}
141141

142-
trs := make([]state.TransactionalStateOperation, 0, len(operations))
142+
projections := map[string]state.SetRequest{}
143+
144+
for i, op := range operations {
145+
sr, ok := op.(state.SetRequest)
146+
147+
if ok {
148+
for k, v := range sr.Metadata {
149+
if k == "outbox.projection" && utils.IsTruthy(v) {
150+
projections[sr.Key] = sr
151+
operations = append(operations[:i], operations[i+1:]...)
152+
}
153+
}
154+
}
155+
}
156+
143157
for _, op := range operations {
144158
sr, ok := op.(state.SetRequest)
145159
if ok {
@@ -148,24 +162,41 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
148162
return nil, err
149163
}
150164

165+
var payload any
166+
var contentType string
167+
168+
if proj, ok := projections[sr.Key]; ok {
169+
payload = proj.Value
170+
171+
if proj.ContentType != nil {
172+
contentType = *proj.ContentType
173+
}
174+
} else {
175+
payload = sr.Value
176+
177+
if sr.ContentType != nil {
178+
contentType = *sr.ContentType
179+
}
180+
}
181+
151182
var ceData []byte
152-
bt, ok := sr.Value.([]byte)
183+
bt, ok := payload.([]byte)
153184
if ok {
154185
ceData = bt
155-
} else if sr.ContentType != nil && strings.EqualFold(*sr.ContentType, "application/json") {
156-
b, sErr := json.Marshal(sr.Value)
186+
} else if contentType != "" && strings.EqualFold(contentType, "application/json") {
187+
b, sErr := json.Marshal(payload)
157188
if sErr != nil {
158189
return nil, sErr
159190
}
160191

161192
ceData = b
162193
} else {
163-
ceData = []byte(fmt.Sprintf("%v", sr.Value))
194+
ceData = []byte(fmt.Sprintf("%v", payload))
164195
}
165196

166197
var dataContentType string
167-
if sr.ContentType != nil {
168-
dataContentType = *sr.ContentType
198+
if contentType != "" {
199+
dataContentType = contentType
169200
}
170201

171202
ce := contribPubsub.NewCloudEventsEnvelope(tr.GetKey(), source, "", "", "", c.outboxPubsub, dataContentType, ceData, "", traceState)
@@ -193,11 +224,11 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
193224
return nil, err
194225
}
195226

196-
trs = append(trs, tr)
227+
operations = append(operations, tr)
197228
}
198229
}
199230

200-
return trs, nil
231+
return operations, nil
201232
}
202233

203234
func outboxTopic(appID, topic, namespace string) string {

pkg/runtime/pubsub/outbox_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,8 @@ func TestSubscribeToInternalTopics(t *testing.T) {
627627
},
628628
}, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01")
629629

630+
trs = append(trs[:0], trs[0+1:]...)
631+
630632
if pErr != nil {
631633
errCh <- pErr
632634
return
@@ -781,6 +783,8 @@ func TestSubscribeToInternalTopics(t *testing.T) {
781783
},
782784
}, appID, "", "")
783785

786+
trs = append(trs[:0], trs[0+1:]...)
787+
784788
if pErr != nil {
785789
errCh <- pErr
786790
return
@@ -906,6 +910,8 @@ func TestSubscribeToInternalTopics(t *testing.T) {
906910
},
907911
}, appID, "", "")
908912

913+
trs = append(trs[:0], trs[0+1:]...)
914+
909915
if pErr != nil {
910916
errCh <- pErr
911917
return
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package grpc
15+
16+
import (
17+
"context"
18+
"sync"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
"google.golang.org/grpc"
25+
"google.golang.org/grpc/credentials/insecure"
26+
27+
"github.com/dapr/dapr/pkg/proto/common/v1"
28+
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
29+
"github.com/dapr/dapr/tests/integration/framework"
30+
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
31+
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
32+
"github.com/dapr/dapr/tests/integration/suite"
33+
)
34+
35+
func init() {
36+
suite.Register(new(projection))
37+
}
38+
39+
type projection struct {
40+
daprd *daprd.Daprd
41+
lock sync.Mutex
42+
msg []byte
43+
}
44+
45+
func (o *projection) Setup(t *testing.T) []framework.Option {
46+
onTopicEvent := func(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
47+
o.lock.Lock()
48+
defer o.lock.Unlock()
49+
o.msg = in.GetData()
50+
return &runtimev1pb.TopicEventResponse{
51+
Status: runtimev1pb.TopicEventResponse_SUCCESS,
52+
}, nil
53+
}
54+
55+
srv1 := app.New(t, app.WithOnTopicEventFn(onTopicEvent))
56+
o.daprd = daprd.New(t, daprd.WithAppID("outboxtest"), daprd.WithAppPort(srv1.Port(t)), daprd.WithAppProtocol("grpc"), daprd.WithResourceFiles(`
57+
apiVersion: dapr.io/v1alpha1
58+
kind: Component
59+
metadata:
60+
name: mystore
61+
spec:
62+
type: state.in-memory
63+
version: v1
64+
metadata:
65+
- name: outboxPublishPubsub
66+
value: "mypubsub"
67+
- name: outboxPublishTopic
68+
value: "test"
69+
`,
70+
`
71+
apiVersion: dapr.io/v1alpha1
72+
kind: Component
73+
metadata:
74+
name: 'mypubsub'
75+
spec:
76+
type: pubsub.in-memory
77+
version: v1
78+
`,
79+
`
80+
apiVersion: dapr.io/v2alpha1
81+
kind: Subscription
82+
metadata:
83+
name: 'order'
84+
spec:
85+
topic: 'test'
86+
routes:
87+
default: '/test'
88+
pubsubname: 'mypubsub'
89+
scopes:
90+
- outboxtest
91+
`))
92+
93+
return []framework.Option{
94+
framework.WithProcesses(srv1, o.daprd),
95+
}
96+
}
97+
98+
func (o *projection) Run(t *testing.T, ctx context.Context) {
99+
o.daprd.WaitUntilRunning(t, ctx)
100+
101+
conn, err := grpc.DialContext(ctx, o.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
102+
require.NoError(t, err)
103+
t.Cleanup(func() { require.NoError(t, conn.Close()) })
104+
105+
_, err = runtimev1pb.NewDaprClient(conn).ExecuteStateTransaction(ctx, &runtimev1pb.ExecuteStateTransactionRequest{
106+
StoreName: "mystore",
107+
Operations: []*runtimev1pb.TransactionalStateOperation{
108+
{
109+
OperationType: "upsert",
110+
Request: &common.StateItem{
111+
Key: "1",
112+
Value: []byte("2"),
113+
},
114+
},
115+
{
116+
OperationType: "upsert",
117+
Request: &common.StateItem{
118+
Key: "1",
119+
Value: []byte("3"),
120+
Metadata: map[string]string{
121+
"outbox.projection": "true",
122+
},
123+
},
124+
},
125+
},
126+
})
127+
require.NoError(t, err)
128+
129+
assert.Eventually(t, func() bool {
130+
o.lock.Lock()
131+
defer o.lock.Unlock()
132+
return string(o.msg) == "3"
133+
}, time.Second*5, time.Millisecond*10, "failed to receive message in time")
134+
135+
assert.Eventually(t, func() bool {
136+
o.lock.Lock()
137+
defer o.lock.Unlock()
138+
139+
resp, err := runtimev1pb.NewDaprClient(conn).GetState(ctx, &runtimev1pb.GetStateRequest{
140+
Key: "1",
141+
StoreName: "mystore",
142+
})
143+
require.NoError(t, err)
144+
return string(resp.GetData()) == "2"
145+
}, time.Second*5, time.Millisecond*10, "failed to receive message in time")
146+
}

0 commit comments

Comments
 (0)