Skip to content

Commit 892acfb

Browse files
authored
Fix incorrect content-length being sent to HTTP published message (dapr#7659)
* Fix incorrect content-length being sent to HTTP published message PR dapr#7537 reverse revered the change which removed the content-length from being set on HTTP headers on sent messages. We still need to remove the content-length from messages from pubsub subscriptions as the pubsub may report a message with a content-length which does not actually match the size of the delivered message to the app. content-length is only removed on HTTP published messages. Change should be backported to 1.13 Signed-off-by: joshvanl <[email protected]> * Adds grpc app subscriber to content-length tests Signed-off-by: joshvanl <[email protected]> * Adds tests for content-length gRPC subscribed app Signed-off-by: joshvanl <[email protected]> * framework/socket: skip if test is windows Signed-off-by: joshvanl <[email protected]> * Fix socket runtime GOOS import Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 353fef6 commit 892acfb

File tree

23 files changed

+723
-162
lines changed

23 files changed

+723
-162
lines changed

pkg/messaging/v1/invoke_method_request.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ func (imr *InvokeMethodRequest) WithHTTPExtension(verb string, querystring strin
159159
// WithCustomHTTPMetadata applies a metadata map to a InvokeMethodRequest.
160160
func (imr *InvokeMethodRequest) WithCustomHTTPMetadata(md map[string]string) *InvokeMethodRequest {
161161
for k, v := range md {
162+
if strings.EqualFold(k, ContentLengthHeader) {
163+
// There is no use of the original payload's content-length because
164+
// the entire data is already in the cloud event.
165+
continue
166+
}
167+
162168
if imr.r.GetMetadata() == nil {
163169
imr.r.Metadata = make(map[string]*internalv1pb.ListStringValue)
164170
}

tests/integration/framework/process/daprd/options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/dapr/dapr/tests/integration/framework/process/exec"
2525
"github.com/dapr/dapr/tests/integration/framework/process/logline"
26+
"github.com/dapr/dapr/tests/integration/framework/socket"
2627
)
2728

2829
// Option is a function that configures the dapr process.
@@ -280,3 +281,9 @@ func WithControlPlaneTrustDomain(trustDomain string) Option {
280281
o.controlPlaneTrustDomain = &trustDomain
281282
}
282283
}
284+
285+
func WithSocket(t *testing.T, socket *socket.Socket) Option {
286+
return WithExecOptions(exec.WithEnvVars(t,
287+
"DAPR_COMPONENTS_SOCKETS_FOLDER", socket.Directory(),
288+
))
289+
}

tests/integration/framework/process/grpc/app/app.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@ func New(t *testing.T, fopts ...Option) *App {
4141
return &App{
4242
GRPC: procgrpc.New(t, append(opts.grpcopts, procgrpc.WithRegister(func(s *grpc.Server) {
4343
srv := &server{
44-
onInvokeFn: opts.onInvokeFn,
45-
onTopicEventFn: opts.onTopicEventFn,
46-
listTopicSubFn: opts.listTopicSubFn,
47-
listInputBindFn: opts.listInputBindFn,
48-
onBindingEventFn: opts.onBindingEventFn,
49-
healthCheckFn: opts.healthCheckFn,
44+
onInvokeFn: opts.onInvokeFn,
45+
onTopicEventFn: opts.onTopicEventFn,
46+
onBulkTopicEventFn: opts.onBulkTopicEventFn,
47+
listTopicSubFn: opts.listTopicSubFn,
48+
listInputBindFn: opts.listInputBindFn,
49+
onBindingEventFn: opts.onBindingEventFn,
50+
healthCheckFn: opts.healthCheckFn,
5051
}
5152
rtv1.RegisterAppCallbackServer(s, srv)
53+
rtv1.RegisterAppCallbackAlphaServer(s, srv)
5254
rtv1.RegisterAppCallbackHealthCheckServer(s, srv)
5355
if opts.withRegister != nil {
5456
opts.withRegister(s)

tests/integration/framework/process/grpc/app/options.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ import (
2626

2727
// options contains the options for running a GRPC server in integration tests.
2828
type options struct {
29-
grpcopts []procgrpc.Option
30-
withRegister func(s *grpc.Server)
31-
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
32-
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
33-
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
34-
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
35-
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
36-
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
29+
grpcopts []procgrpc.Option
30+
withRegister func(s *grpc.Server)
31+
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
32+
onBulkTopicEventFn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)
33+
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
34+
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
35+
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
36+
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
37+
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
3738
}
3839

3940
func WithGRPCOptions(opts ...procgrpc.Option) func(*options) {
@@ -48,6 +49,12 @@ func WithOnTopicEventFn(fn func(context.Context, *rtv1.TopicEventRequest) (*rtv1
4849
}
4950
}
5051

52+
func WithOnBulkTopicEventFn(fn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)) func(*options) {
53+
return func(opts *options) {
54+
opts.onBulkTopicEventFn = fn
55+
}
56+
}
57+
5158
func WithOnInvokeFn(fn func(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)) func(*options) {
5259
return func(opts *options) {
5360
opts.onInvokeFn = fn

tests/integration/framework/process/grpc/app/server.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import (
2323
)
2424

2525
type server struct {
26-
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
27-
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
28-
listTopicSubFn func(context.Context, *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
29-
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
30-
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
31-
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
26+
onInvokeFn func(context.Context, *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error)
27+
onTopicEventFn func(context.Context, *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error)
28+
onBulkTopicEventFn func(context.Context, *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error)
29+
listTopicSubFn func(context.Context, *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
30+
listInputBindFn func(context.Context, *emptypb.Empty) (*rtv1.ListInputBindingsResponse, error)
31+
onBindingEventFn func(context.Context, *rtv1.BindingEventRequest) (*rtv1.BindingEventResponse, error)
32+
healthCheckFn func(context.Context, *emptypb.Empty) (*rtv1.HealthCheckResponse, error)
3233
}
3334

3435
func (s *server) OnInvoke(ctx context.Context, in *commonv1.InvokeRequest) (*commonv1.InvokeResponse, error) {
@@ -66,6 +67,13 @@ func (s *server) OnTopicEvent(ctx context.Context, in *rtv1.TopicEventRequest) (
6667
return s.onTopicEventFn(ctx, in)
6768
}
6869

70+
func (s *server) OnBulkTopicEventAlpha1(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) {
71+
if s.onBulkTopicEventFn == nil {
72+
return new(rtv1.TopicEventBulkResponse), nil
73+
}
74+
return s.onBulkTopicEventFn(ctx, in)
75+
}
76+
6977
func (s *server) HealthCheck(ctx context.Context, e *emptypb.Empty) (*rtv1.HealthCheckResponse, error) {
7078
if s.healthCheckFn == nil {
7179
return new(rtv1.HealthCheckResponse), nil

tests/integration/framework/process/grpc/subscriber/options.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,21 @@ limitations under the License.
1313

1414
package subscriber
1515

16+
import (
17+
"context"
18+
19+
"google.golang.org/protobuf/types/known/emptypb"
20+
21+
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
22+
)
23+
1624
// options contains the options for running a pubsub subscriber gRPC server app.
17-
type options struct{}
25+
type options struct {
26+
listTopicSubFn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)
27+
}
28+
29+
func WithListTopicSubscriptions(fn func(ctx context.Context, in *emptypb.Empty) (*rtv1.ListTopicSubscriptionsResponse, error)) func(*options) {
30+
return func(opts *options) {
31+
opts.listTopicSubFn = fn
32+
}
33+
}

tests/integration/framework/process/grpc/subscriber/subscriber.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ import (
2929
type Option func(*options)
3030

3131
type Subscriber struct {
32-
app *app.App
33-
inCh chan *rtv1.TopicEventRequest
34-
closeCh chan struct{}
32+
app *app.App
33+
inCh chan *rtv1.TopicEventRequest
34+
inBulkCh chan *rtv1.TopicEventBulkRequest
35+
closeCh chan struct{}
3536
}
3637

3738
func New(t *testing.T, fopts ...Option) *Subscriber {
@@ -43,12 +44,15 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
4344
}
4445

4546
inCh := make(chan *rtv1.TopicEventRequest, 100)
47+
inBulkCh := make(chan *rtv1.TopicEventBulkRequest, 100)
4648
closeCh := make(chan struct{})
4749

4850
return &Subscriber{
49-
inCh: inCh,
50-
closeCh: closeCh,
51+
inCh: inCh,
52+
inBulkCh: inBulkCh,
53+
closeCh: closeCh,
5154
app: app.New(t,
55+
app.WithListTopicSubscriptions(opts.listTopicSubFn),
5256
app.WithOnTopicEventFn(func(ctx context.Context, in *rtv1.TopicEventRequest) (*rtv1.TopicEventResponse, error) {
5357
select {
5458
case inCh <- in:
@@ -57,6 +61,21 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
5761
}
5862
return new(rtv1.TopicEventResponse), nil
5963
}),
64+
app.WithOnBulkTopicEventFn(func(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) {
65+
select {
66+
case inBulkCh <- in:
67+
case <-ctx.Done():
68+
case <-closeCh:
69+
}
70+
stats := make([]*rtv1.TopicEventBulkResponseEntry, len(in.GetEntries()))
71+
for i, e := range in.GetEntries() {
72+
stats[i] = &rtv1.TopicEventBulkResponseEntry{
73+
EntryId: e.GetEntryId(),
74+
Status: rtv1.TopicEventResponse_SUCCESS,
75+
}
76+
}
77+
return &rtv1.TopicEventBulkResponse{Statuses: stats}, nil
78+
}),
6079
),
6180
}
6281
}
@@ -92,11 +111,31 @@ func (s *Subscriber) Receive(t *testing.T, ctx context.Context) *rtv1.TopicEvent
92111
}
93112
}
94113

114+
func (s *Subscriber) ReceiveBulk(t *testing.T, ctx context.Context) *rtv1.TopicEventBulkRequest {
115+
t.Helper()
116+
117+
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
118+
defer cancel()
119+
120+
select {
121+
case <-ctx.Done():
122+
require.Fail(t, "timed out waiting for event response")
123+
return nil
124+
case in := <-s.inBulkCh:
125+
return in
126+
}
127+
}
128+
95129
func (s *Subscriber) AssertEventChanLen(t *testing.T, l int) {
96130
t.Helper()
97131
assert.Len(t, s.inCh, l)
98132
}
99133

134+
func (s *Subscriber) AssertBulkEventChanLen(t *testing.T, l int) {
135+
t.Helper()
136+
assert.Len(t, s.inBulkCh, l)
137+
}
138+
100139
func (s *Subscriber) ExpectPublishReceive(t *testing.T, ctx context.Context, daprd *daprd.Daprd, req *rtv1.PublishEventRequest) {
101140
t.Helper()
102141
_, err := daprd.GRPCClient(t, ctx).PublishEvent(ctx, req)

tests/integration/framework/process/http/subscriber/subscriber.go

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ limitations under the License.
1414
package subscriber
1515

1616
import (
17+
"bytes"
1718
"context"
1819
"encoding/json"
1920
"fmt"
@@ -26,6 +27,7 @@ import (
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829

30+
"github.com/dapr/dapr/pkg/runtime/pubsub"
2931
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
3032
"github.com/dapr/dapr/tests/integration/framework/process/http/app"
3133
"github.com/dapr/dapr/tests/integration/framework/util"
@@ -46,10 +48,24 @@ type PublishRequest struct {
4648
DataContentType *string
4749
}
4850

51+
type PublishBulkRequestEntry struct {
52+
EntryID string `json:"entryId"`
53+
Event string `json:"event"`
54+
ContentType string `json:"contentType,omitempty"`
55+
}
56+
57+
type PublishBulkRequest struct {
58+
Daprd *daprd.Daprd
59+
PubSubName string
60+
Topic string
61+
Entries []PublishBulkRequestEntry
62+
}
63+
4964
type Subscriber struct {
5065
app *app.App
5166
client *http.Client
5267
inCh chan *RouteEvent
68+
inBulk chan *pubsub.BulkSubscribeEnvelope
5369
closeCh chan struct{}
5470
}
5571

@@ -62,9 +78,10 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
6278
}
6379

6480
inCh := make(chan *RouteEvent, 100)
81+
inBulk := make(chan *pubsub.BulkSubscribeEnvelope, 100)
6582
closeCh := make(chan struct{})
6683

67-
appOpts := make([]app.Option, 0, len(opts.routes)+len(opts.handlerFuncs))
84+
appOpts := make([]app.Option, 0, len(opts.routes)+len(opts.bulkRoutes)+len(opts.handlerFuncs))
6885
for _, route := range opts.routes {
6986
appOpts = append(appOpts, app.WithHandlerFunc(route, func(w http.ResponseWriter, r *http.Request) {
7087
var ce event.Event
@@ -76,13 +93,39 @@ func New(t *testing.T, fopts ...Option) *Subscriber {
7693
}
7794
}))
7895
}
96+
for _, route := range opts.bulkRoutes {
97+
appOpts = append(appOpts, app.WithHandlerFunc(route, func(w http.ResponseWriter, r *http.Request) {
98+
var ce pubsub.BulkSubscribeEnvelope
99+
require.NoError(t, json.NewDecoder(r.Body).Decode(&ce))
100+
select {
101+
case inBulk <- &ce:
102+
case <-closeCh:
103+
case <-r.Context().Done():
104+
}
105+
106+
type statusT struct {
107+
EntryID string `json:"entryId"`
108+
Status string `json:"status"`
109+
}
110+
type respT struct {
111+
Statuses []statusT `json:"statuses"`
112+
}
113+
114+
var resp respT
115+
for _, entry := range ce.Entries {
116+
resp.Statuses = append(resp.Statuses, statusT{EntryID: entry.EntryId, Status: "SUCCESS"})
117+
}
118+
json.NewEncoder(w).Encode(resp)
119+
}))
120+
}
79121

80122
appOpts = append(appOpts, opts.handlerFuncs...)
81123

82124
return &Subscriber{
83125
app: app.New(t, appOpts...),
84126
client: util.HTTPClient(t),
85127
inCh: inCh,
128+
inBulk: inBulk,
86129
closeCh: closeCh,
87130
}
88131
}
@@ -117,6 +160,21 @@ func (s *Subscriber) Receive(t *testing.T, ctx context.Context) *RouteEvent {
117160
}
118161
}
119162

163+
func (s *Subscriber) ReceiveBulk(t *testing.T, ctx context.Context) *pubsub.BulkSubscribeEnvelope {
164+
t.Helper()
165+
166+
ctx, cancel := context.WithTimeout(ctx, time.Second)
167+
defer cancel()
168+
169+
select {
170+
case <-ctx.Done():
171+
require.Fail(t, "timed out waiting for event response")
172+
return nil
173+
case in := <-s.inBulk:
174+
return in
175+
}
176+
}
177+
120178
func (s *Subscriber) AssertEventChanLen(t *testing.T, l int) {
121179
t.Helper()
122180
assert.Len(t, s.inCh, l)
@@ -151,6 +209,13 @@ func (s *Subscriber) Publish(t *testing.T, ctx context.Context, req PublishReque
151209
require.Equal(t, http.StatusNoContent, resp.StatusCode)
152210
}
153211

212+
func (s *Subscriber) PublishBulk(t *testing.T, ctx context.Context, req PublishBulkRequest) {
213+
t.Helper()
214+
//nolint:bodyclose
215+
resp := s.publishBulk(t, ctx, req)
216+
require.Equal(t, http.StatusNoContent, resp.StatusCode)
217+
}
218+
154219
func (s *Subscriber) publish(t *testing.T, ctx context.Context, req PublishRequest) *http.Response {
155220
t.Helper()
156221
reqURL := fmt.Sprintf("http://%s/v1.0/publish/%s/%s", req.Daprd.HTTPAddress(), req.PubSubName, req.Topic)
@@ -164,3 +229,18 @@ func (s *Subscriber) publish(t *testing.T, ctx context.Context, req PublishReque
164229
require.NoError(t, resp.Body.Close())
165230
return resp
166231
}
232+
233+
func (s *Subscriber) publishBulk(t *testing.T, ctx context.Context, req PublishBulkRequest) *http.Response {
234+
t.Helper()
235+
236+
payload, err := json.Marshal(req.Entries)
237+
require.NoError(t, err)
238+
reqURL := fmt.Sprintf("http://%s/v1.0-alpha1/publish/bulk/%s/%s", req.Daprd.HTTPAddress(), req.PubSubName, req.Topic)
239+
hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, bytes.NewReader(payload))
240+
require.NoError(t, err)
241+
hreq.Header.Add("Content-Type", "application/json")
242+
resp, err := s.client.Do(hreq)
243+
require.NoError(t, err)
244+
require.NoError(t, resp.Body.Close())
245+
return resp
246+
}

0 commit comments

Comments
 (0)