Skip to content

Commit 6e12449

Browse files
antontroshinJoshVanLdapr-bot
authored
Add message ordering integration and e2e test (dapr#8669)
* Add message ordering integration and e2e test Signed-off-by: Anton Troshin <[email protected]> * fix tests and lint Signed-off-by: Anton Troshin <[email protected]> * update test app deps Signed-off-by: Anton Troshin <[email protected]> * check ctx done and stop publishing on first error as the test will fail anyway Signed-off-by: Anton Troshin <[email protected]> --------- Signed-off-by: Anton Troshin <[email protected]> Co-authored-by: Josh van Leeuwen <[email protected]> Co-authored-by: Dapr Bot <[email protected]>
1 parent b807dc7 commit 6e12449

File tree

15 files changed

+1032
-1
lines changed

15 files changed

+1032
-1
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
#
13+
14+
FROM golang:1.24.1 AS build_env
15+
16+
ENV CGO_ENABLED=0
17+
WORKDIR /app
18+
19+
# Copy go.mod and go.sum first for better caching
20+
COPY go.mod go.sum /app/
21+
RUN go mod download
22+
23+
# Copy source
24+
COPY *.go /app/
25+
RUN go build -o app .
26+
27+
FROM gcr.io/distroless/static:nonroot
28+
WORKDIR /
29+
COPY --from=build_env /app/app /
30+
CMD ["/app"]
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"encoding/json"
19+
"log"
20+
"net"
21+
"net/http"
22+
"os"
23+
"strconv"
24+
"sync/atomic"
25+
"time"
26+
27+
"github.com/gorilla/mux"
28+
"google.golang.org/grpc"
29+
"google.golang.org/grpc/credentials/insecure"
30+
31+
"github.com/dapr/dapr/tests/apps/utils"
32+
"github.com/dapr/go-sdk/client"
33+
)
34+
35+
const (
36+
pubsubName = "messagebus"
37+
topicName = "pubsub-topic-streaming"
38+
)
39+
40+
var (
41+
appPort = 3000
42+
daprGRPCPort int
43+
sdkClient client.Client
44+
)
45+
46+
func init() {
47+
p := os.Getenv("PORT")
48+
if p != "" && p != "0" {
49+
appPort, _ = strconv.Atoi(p)
50+
}
51+
}
52+
53+
// indexHandler is the handler for root path
54+
func indexHandler(w http.ResponseWriter, r *http.Request) {
55+
w.WriteHeader(http.StatusOK)
56+
}
57+
58+
// appRouter initializes restful api router
59+
func appRouter() http.Handler {
60+
router := mux.NewRouter().StrictSlash(true)
61+
62+
// Log requests and their processing time
63+
router.Use(utils.LoggerMiddleware)
64+
65+
router.HandleFunc("/", indexHandler).Methods("GET")
66+
router.HandleFunc("/tests/streaming-order-publish", streamingOrderPublishHandler).Methods("POST")
67+
68+
router.Use(mux.CORSMethodMiddleware(router))
69+
70+
return router
71+
}
72+
73+
func streamingOrderPublishHandler(w http.ResponseWriter, r *http.Request) {
74+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
75+
defer cancel()
76+
77+
count := r.URL.Query().Get("count")
78+
numberOfMessages, err := strconv.Atoi(count)
79+
if err != nil {
80+
log.Printf("error parsing count: %v", err)
81+
w.WriteHeader(http.StatusBadRequest)
82+
return
83+
}
84+
85+
sentMessagesChan := make(chan int, numberOfMessages)
86+
messageCount := atomic.Int32{}
87+
messageCount.Store(0)
88+
89+
for {
90+
messageID := int(messageCount.Load())
91+
err := sdkClient.PublishEvent(ctx, pubsubName, topicName, []byte(strconv.Itoa(messageID)))
92+
if err != nil {
93+
log.Printf("error publishing message: %v", err)
94+
}
95+
sentMessagesChan <- messageID
96+
messageCount.Add(1)
97+
98+
if int(messageCount.Load()) == numberOfMessages {
99+
log.Printf("all messages sent successfully")
100+
break
101+
}
102+
}
103+
104+
messages := make([]int, 0)
105+
for msg := range sentMessagesChan {
106+
messages = append(messages, msg)
107+
if len(messages) == numberOfMessages {
108+
break
109+
}
110+
}
111+
112+
close(sentMessagesChan)
113+
114+
w.WriteHeader(http.StatusOK)
115+
err = json.NewEncoder(w).Encode(messages)
116+
if err != nil {
117+
log.Printf("error writing response: %v", err)
118+
}
119+
}
120+
121+
func main() {
122+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
123+
defer cancel()
124+
125+
daprGRPCPort = utils.PortFromEnv("DAPR_GRPC_PORT", 50001)
126+
conn, err := grpc.DialContext(
127+
ctx,
128+
net.JoinHostPort("127.0.0.1", strconv.Itoa(daprGRPCPort)),
129+
grpc.WithTransportCredentials(insecure.NewCredentials()),
130+
grpc.WithBlock(),
131+
)
132+
if err != nil {
133+
log.Fatalf("Error connecting to Dapr's gRPC endpoint on port %d: %v", daprGRPCPort, err)
134+
}
135+
136+
sdkClient = client.NewClientWithConnection(conn)
137+
138+
log.Printf("pubsub-publisher-streaming - listening on http://localhost:%d", appPort)
139+
utils.StartServer(appPort, appRouter, true, false)
140+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module github.com/dapr/dapr/tests/apps/pubsub-publisher-streaming
2+
3+
go 1.24.1
4+
5+
require (
6+
github.com/dapr/dapr v1.15.4
7+
github.com/dapr/go-sdk v1.12.0
8+
github.com/gorilla/mux v1.8.1
9+
google.golang.org/grpc v1.70.0
10+
)
11+
12+
require (
13+
github.com/google/uuid v1.6.0 // indirect
14+
go.opentelemetry.io/otel v1.34.0 // indirect
15+
golang.org/x/net v0.38.0 // indirect
16+
golang.org/x/sys v0.31.0 // indirect
17+
golang.org/x/text v0.23.0 // indirect
18+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect
19+
google.golang.org/protobuf v1.36.4 // indirect
20+
gopkg.in/yaml.v3 v3.0.1 // indirect
21+
)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
github.com/dapr/dapr v1.15.4 h1:1aasasoR7JFMIfwlGOWrYl0Tg5Cn738ZAdbDJSdD4FE=
2+
github.com/dapr/dapr v1.15.4/go.mod h1:U9XfdsvWo3a72cpvUOFpgcDdDKo/wAQSFTk0bi16uHk=
3+
github.com/dapr/go-sdk v1.12.0 h1:+9IHZ1faWwNg/HvZk1ht0oIU8eqOa9nvGMk+Nr+0qkc=
4+
github.com/dapr/go-sdk v1.12.0/go.mod h1:RpZJ/pNfODlyk6x+whdtCrFI1/o0X67LCSwZeAZa64U=
5+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
6+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
8+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
9+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
10+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
11+
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
12+
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
13+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
14+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
15+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
16+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
17+
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
18+
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
19+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
20+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
21+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
22+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
23+
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
24+
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
25+
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
26+
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
27+
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
28+
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
29+
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
30+
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
31+
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
32+
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
33+
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
34+
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
35+
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
36+
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
37+
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
38+
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
39+
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
40+
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
41+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8=
42+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk=
43+
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
44+
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
45+
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
46+
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
47+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
48+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
49+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
50+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# In e2e test, this will not be used to deploy the app to test cluster.
2+
# This is created for testing purpose in order to deploy this app using kubectl
3+
# before writing e2e test.
4+
5+
kind: Service
6+
apiVersion: v1
7+
metadata:
8+
name: pubsub-publisher-streaming
9+
labels:
10+
testapp: pubsub-publisher-streaming
11+
spec:
12+
selector:
13+
testapp: pubsub-publisher-streaming
14+
ports:
15+
- protocol: TCP
16+
port: 80
17+
targetPort: 3000
18+
type: LoadBalancer
19+
20+
---
21+
apiVersion: apps/v1
22+
kind: Deployment
23+
metadata:
24+
name: pubsub-publisher-streaming
25+
labels:
26+
testapp: pubsub-publisher-streaming
27+
spec:
28+
replicas: 1
29+
selector:
30+
matchLabels:
31+
testapp: pubsub-publisher-streaming
32+
template:
33+
metadata:
34+
labels:
35+
testapp: pubsub-publisher-streaming
36+
annotations:
37+
dapr.io/enabled: "true"
38+
dapr.io/app-id: "pubsub-publisher-streaming"
39+
dapr.io/app-port: "3000"
40+
spec:
41+
containers:
42+
- name: pubsub-publisher-streaming
43+
# image: dapriotest/e2e-pubsub-publisher-streaming
44+
image: localhost:5000/dapr/e2e-pubsub-publisher-streaming:dev-linux-arm64
45+
ports:
46+
- containerPort: 3000
47+
imagePullPolicy: Always
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
#
13+
14+
FROM golang:1.24.1 AS build_env
15+
16+
ENV CGO_ENABLED=0
17+
WORKDIR /app
18+
19+
# Copy go.mod and go.sum first for better caching
20+
COPY go.mod go.sum /app/
21+
RUN go mod download
22+
23+
# Copy source
24+
COPY *.go /app/
25+
RUN go build -o app .
26+
27+
FROM gcr.io/distroless/static:nonroot
28+
WORKDIR /
29+
COPY --from=build_env /app/app /
30+
CMD ["/app"]

0 commit comments

Comments
 (0)