Skip to content

Commit 4579e61

Browse files
committed
pull invoke + binding baggage to baggage dir
Signed-off-by: Cassandra Coyle <[email protected]>
1 parent 525dbe2 commit 4579e61

File tree

4 files changed

+367
-53
lines changed

4 files changed

+367
-53
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package baggage
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"net/http"
20+
"strings"
21+
"sync/atomic"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
grpcMetadata "google.golang.org/grpc/metadata"
27+
28+
"github.com/dapr/dapr/pkg/proto/runtime/v1"
29+
"github.com/dapr/dapr/tests/integration/framework"
30+
"github.com/dapr/dapr/tests/integration/framework/client"
31+
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
32+
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
33+
"github.com/dapr/dapr/tests/integration/suite"
34+
)
35+
36+
func init() {
37+
suite.Register(new(output))
38+
}
39+
40+
type output struct {
41+
httpapp *prochttp.HTTP
42+
daprd *daprd.Daprd
43+
44+
baggage atomic.Bool
45+
baggageVals atomic.Value
46+
}
47+
48+
func (b *output) Setup(t *testing.T) []framework.Option {
49+
handler := http.NewServeMux()
50+
handler.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
51+
if baggage := r.Header.Get("baggage"); baggage != "" {
52+
b.baggage.Store(true)
53+
b.baggageVals.Store(baggage)
54+
} else {
55+
b.baggage.Store(false)
56+
b.baggageVals.Store(baggage)
57+
}
58+
59+
w.Write([]byte(`OK`))
60+
})
61+
62+
b.httpapp = prochttp.New(t, prochttp.WithHandler(handler))
63+
64+
b.daprd = daprd.New(t,
65+
daprd.WithAppPort(b.httpapp.Port()),
66+
daprd.WithResourceFiles(fmt.Sprintf(`apiVersion: dapr.io/v1alpha1
67+
kind: Component
68+
metadata:
69+
name: http-binding-baggage
70+
spec:
71+
type: bindings.http
72+
version: v1
73+
metadata:
74+
- name: url
75+
value: http://127.0.0.1:%d/test
76+
`, b.httpapp.Port())))
77+
78+
return []framework.Option{
79+
framework.WithProcesses(b.httpapp, b.daprd),
80+
}
81+
}
82+
83+
func (b *output) Run(t *testing.T, ctx context.Context) {
84+
b.daprd.WaitUntilRunning(t, ctx)
85+
86+
httpClient := client.HTTP(t)
87+
client := b.daprd.GRPCClient(t, ctx)
88+
89+
t.Run("no baggage header provided", func(t *testing.T) {
90+
// invoke binding
91+
ctx := t.Context()
92+
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/bindings/http-binding-baggage", b.daprd.HTTPPort())
93+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, strings.NewReader("{\"operation\":\"get\"}"))
94+
require.NoError(t, err)
95+
resp, err := httpClient.Do(req)
96+
require.NoError(t, err)
97+
defer resp.Body.Close()
98+
assert.Equal(t, http.StatusOK, resp.StatusCode)
99+
assert.False(t, b.baggage.Load())
100+
101+
invokereq := runtime.InvokeBindingRequest{
102+
Name: "http-binding-baggage",
103+
Operation: "get",
104+
}
105+
106+
// invoke binding
107+
invokeresp, err := client.InvokeBinding(ctx, &invokereq)
108+
require.NoError(t, err)
109+
require.NotNil(t, invokeresp)
110+
assert.False(t, b.baggage.Load())
111+
})
112+
113+
t.Run("baggage headers provided", func(t *testing.T) {
114+
// invoke binding
115+
ctx := t.Context()
116+
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/bindings/http-binding-baggage", b.daprd.HTTPPort())
117+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, strings.NewReader("{\"operation\":\"get\"}"))
118+
require.NoError(t, err)
119+
120+
bag := "key1=value1,key2=value2"
121+
req.Header.Set("baggage", bag)
122+
123+
resp, err := httpClient.Do(req)
124+
require.NoError(t, err)
125+
defer resp.Body.Close()
126+
assert.Equal(t, http.StatusOK, resp.StatusCode)
127+
assert.True(t, b.baggage.Load())
128+
assert.Equal(t, "key1=value1,key2=value2", b.baggageVals.Load())
129+
130+
invokereq := runtime.InvokeBindingRequest{
131+
Name: "http-binding-baggage",
132+
Operation: "get",
133+
}
134+
135+
// invoke binding
136+
ctx = grpcMetadata.AppendToOutgoingContext(ctx,
137+
"baggage", bag,
138+
)
139+
invokeresp, err := client.InvokeBinding(ctx, &invokereq)
140+
require.NoError(t, err)
141+
require.NotNil(t, invokeresp)
142+
assert.True(t, b.baggage.Load())
143+
assert.Equal(t, "key1=value1,key2=value2", b.baggageVals.Load())
144+
})
145+
}
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package baggage
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"net/http"
20+
"strings"
21+
"sync/atomic"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
grpcMetadata "google.golang.org/grpc/metadata"
27+
28+
"github.com/dapr/dapr/pkg/proto/common/v1"
29+
"github.com/dapr/dapr/pkg/proto/runtime/v1"
30+
"github.com/dapr/dapr/tests/integration/framework"
31+
"github.com/dapr/dapr/tests/integration/framework/client"
32+
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
33+
procgrpc "github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
34+
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
35+
"github.com/dapr/dapr/tests/integration/suite"
36+
)
37+
38+
func init() {
39+
suite.Register(new(invoke))
40+
}
41+
42+
type invoke struct {
43+
grpcapp *procgrpc.App
44+
httpapp *prochttp.HTTP
45+
daprd *daprd.Daprd
46+
grpcdaprd *daprd.Daprd
47+
48+
baggage atomic.Bool
49+
baggageVals atomic.Value
50+
}
51+
52+
func (i *invoke) Setup(t *testing.T) []framework.Option {
53+
handler := http.NewServeMux()
54+
handler.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
55+
if baggage := r.Header.Get("baggage"); baggage != "" {
56+
i.baggage.Store(true)
57+
i.baggageVals.Store(baggage)
58+
} else {
59+
i.baggage.Store(false)
60+
i.baggageVals.Store("")
61+
}
62+
w.Write([]byte(`OK`))
63+
})
64+
65+
i.httpapp = prochttp.New(t, prochttp.WithHandler(handler))
66+
67+
i.grpcapp = procgrpc.New(t,
68+
procgrpc.WithOnInvokeFn(func(ctx context.Context, in *common.InvokeRequest) (*common.InvokeResponse, error) {
69+
switch in.GetMethod() {
70+
case "test":
71+
if md, ok := grpcMetadata.FromIncomingContext(ctx); ok {
72+
if baggage, exists := md["baggage"]; exists && len(baggage) > 0 {
73+
i.baggage.Store(true)
74+
i.baggageVals.Store(strings.Join(baggage, ","))
75+
} else {
76+
i.baggage.Store(false)
77+
i.baggageVals.Store("")
78+
}
79+
}
80+
}
81+
return nil, nil
82+
}),
83+
)
84+
85+
i.daprd = daprd.New(t, daprd.WithAppPort(i.httpapp.Port()))
86+
87+
i.grpcdaprd = daprd.New(t, daprd.WithAppPort(i.grpcapp.Port(t)), daprd.WithAppProtocol("grpc"))
88+
89+
return []framework.Option{
90+
framework.WithProcesses(i.httpapp, i.daprd, i.grpcapp, i.grpcdaprd),
91+
}
92+
}
93+
94+
func (i *invoke) Run(t *testing.T, ctx context.Context) {
95+
i.daprd.WaitUntilRunning(t, ctx)
96+
i.grpcdaprd.WaitUntilRunning(t, ctx)
97+
98+
httpClient := client.HTTP(t)
99+
client := i.daprd.GRPCClient(t, ctx)
100+
101+
t.Run("no baggage header provided", func(t *testing.T) {
102+
// invoke both grpc & http apps
103+
ctx := t.Context()
104+
appURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/test", i.daprd.HTTPPort(), i.daprd.AppID())
105+
appreq, err := http.NewRequestWithContext(ctx, http.MethodPost, appURL, strings.NewReader("{\"operation\":\"get\"}"))
106+
require.NoError(t, err)
107+
appresp, err := httpClient.Do(appreq)
108+
require.NoError(t, err)
109+
defer appresp.Body.Close()
110+
assert.Equal(t, http.StatusOK, appresp.StatusCode)
111+
assert.False(t, i.baggage.Load())
112+
113+
svcreq := runtime.InvokeServiceRequest{
114+
Id: i.daprd.AppID(),
115+
Message: &common.InvokeRequest{
116+
Method: "test",
117+
Data: nil,
118+
ContentType: "",
119+
HttpExtension: &common.HTTPExtension{
120+
Verb: common.HTTPExtension_GET,
121+
Querystring: "",
122+
},
123+
},
124+
}
125+
126+
ctx = t.Context()
127+
svcresp, err := client.InvokeService(ctx, &svcreq)
128+
require.NoError(t, err)
129+
require.NotNil(t, svcresp)
130+
assert.False(t, i.baggage.Load())
131+
132+
grpcappreq := runtime.InvokeServiceRequest{
133+
Id: i.grpcdaprd.AppID(),
134+
Message: &common.InvokeRequest{
135+
Method: "test",
136+
Data: nil,
137+
ContentType: "",
138+
HttpExtension: &common.HTTPExtension{
139+
Verb: common.HTTPExtension_GET,
140+
Querystring: "",
141+
},
142+
},
143+
}
144+
145+
// grpc app check
146+
grpcclient := i.grpcdaprd.GRPCClient(t, ctx)
147+
svcresp, err = grpcclient.InvokeService(ctx, &grpcappreq)
148+
require.NoError(t, err)
149+
require.NotNil(t, svcresp)
150+
assert.False(t, i.baggage.Load())
151+
})
152+
153+
t.Run("baggage headers provided", func(t *testing.T) {
154+
// invoke both grpc & http apps
155+
ctx := t.Context()
156+
appURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/test", i.daprd.HTTPPort(), i.daprd.AppID())
157+
appreq, err := http.NewRequestWithContext(ctx, http.MethodPost, appURL, strings.NewReader("{\"operation\":\"get\"}"))
158+
require.NoError(t, err)
159+
160+
bag := "key1=value1,key2=value2"
161+
appreq.Header.Set("baggage", bag)
162+
163+
appresp, err := httpClient.Do(appreq)
164+
require.NoError(t, err)
165+
defer appresp.Body.Close()
166+
assert.Equal(t, http.StatusOK, appresp.StatusCode)
167+
assert.True(t, i.baggage.Load())
168+
assert.Equal(t, "key1=value1,key2=value2", i.baggageVals.Load())
169+
170+
svcreq := runtime.InvokeServiceRequest{
171+
Id: i.daprd.AppID(),
172+
Message: &common.InvokeRequest{
173+
Method: "test",
174+
Data: nil,
175+
ContentType: "",
176+
HttpExtension: &common.HTTPExtension{
177+
Verb: common.HTTPExtension_GET,
178+
Querystring: "",
179+
},
180+
},
181+
}
182+
183+
tctx := grpcMetadata.AppendToOutgoingContext(t.Context(),
184+
"baggage", bag,
185+
)
186+
svcresp, err := client.InvokeService(tctx, &svcreq)
187+
require.NoError(t, err)
188+
require.NotNil(t, svcresp)
189+
assert.True(t, i.baggage.Load())
190+
assert.Equal(t, "key1=value1,key2=value2", i.baggageVals.Load())
191+
192+
grpcappreq := runtime.InvokeServiceRequest{
193+
Id: i.grpcdaprd.AppID(),
194+
Message: &common.InvokeRequest{
195+
Method: "test",
196+
Data: nil,
197+
ContentType: "",
198+
HttpExtension: &common.HTTPExtension{
199+
Verb: common.HTTPExtension_GET,
200+
Querystring: "",
201+
},
202+
},
203+
}
204+
205+
tctx = grpcMetadata.AppendToOutgoingContext(t.Context(),
206+
"baggage", bag,
207+
)
208+
grpcclient := i.grpcdaprd.GRPCClient(t, tctx)
209+
svcresp, err = grpcclient.InvokeService(tctx, &grpcappreq)
210+
require.NoError(t, err)
211+
require.NotNil(t, svcresp)
212+
assert.True(t, i.baggage.Load())
213+
assert.Equal(t, "key1=value1,key2=value2", i.baggageVals.Load())
214+
})
215+
}

0 commit comments

Comments
 (0)