Skip to content

Commit 8a03abe

Browse files
committed
fix: Include CE attributes in sink headers
- Forward CloudEvent attributes to the sink - Prevent default Camel header filter strategy removing the CE headers
1 parent cf38502 commit 8a03abe

File tree

7 files changed

+30
-2
lines changed

7 files changed

+30
-2
lines changed

aws-eventbridge-sink/src/main/resources/camel/sink.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ spec:
2727
name: default
2828
properties:
2929
type: ""
30+
dataTypes:
31+
out:
32+
scheme: http
33+
format: application/cloudevents
3034
sink:
3135
ref:
3236
kind: Kamelet

aws-eventbridge-sink/src/test/java/dev/knative/eventing/connector/AwsEventBridgeSinkTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void shouldConsumeEvents() {
8484
.body(eventData)
8585
.header("ce-id", "citrus:randomPattern([0-9A-Z]{15}-[0-9]{16})@")
8686
.header("ce-type", "dev.knative.eventing.aws.eventbridge")
87-
.header("ce-source", "dev.knative.eventing.aws-eventbridge-source")
87+
.header("ce-source", "aws.s3")
8888
.header("ce-subject", "aws-eventbridge-source")
8989
);
9090

aws-s3-sink/src/main/resources/camel/sink.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ spec:
2727
name: default
2828
properties:
2929
type: ""
30+
dataTypes:
31+
out:
32+
scheme: http
33+
format: application/cloudevents
3034
sink:
3135
ref:
3236
kind: Kamelet

aws-sns-sink/src/main/resources/camel/sink.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ spec:
2727
name: default
2828
properties:
2929
type: ""
30+
dataTypes:
31+
out:
32+
scheme: http
33+
format: application/cloudevents
3034
sink:
3135
ref:
3236
kind: Kamelet

aws-sns-sink/src/test/java/dev/knative/eventing/connector/AwsSnsSinkTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public void shouldConsumeEvents() {
8181
.header("ce-type", "dev.knative.eventing.aws.sns")
8282
.header("ce-source", "dev.knative.eventing.aws-sns-source")
8383
.header("ce-subject", "aws-sns-source")
84+
.header("ce-apiVersion", "v1")
8485
);
8586

8687
tc.when(
@@ -110,7 +111,14 @@ private void verifySnsData(TestContext context) {
110111

111112
try {
112113
Assertions.assertEquals(1, receiveMessageResponse.messages().size());
113-
Assertions.assertTrue(receiveMessageResponse.messages().get(0).body().contains(snsData));
114+
String body = receiveMessageResponse.messages().getFirst().body();
115+
Assertions.assertTrue(body.contains("\"Message\": \"%s\"".formatted(snsData)));
116+
Assertions.assertTrue(body.contains("\"Subject\": \"aws-sns-source\""));
117+
Assertions.assertTrue(body.contains("ce-id"));
118+
Assertions.assertTrue(body.contains("ce-type"));
119+
Assertions.assertTrue(body.contains("ce-source"));
120+
Assertions.assertTrue(body.contains("ce-subject"));
121+
Assertions.assertTrue(body.contains("ce-apiVersion"));
114122
} catch (AssertionFailedError error) {
115123
throw new CitrusRuntimeException("SNS data verification failed", error);
116124
}

aws-sqs-sink/src/main/resources/camel/sink.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ spec:
2727
name: default
2828
properties:
2929
type: ""
30+
dataTypes:
31+
out:
32+
scheme: http
33+
format: application/cloudevents
3034
sink:
3135
ref:
3236
kind: Kamelet

log-sink/src/main/resources/camel/sink.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ spec:
2727
name: default
2828
properties:
2929
type: ""
30+
dataTypes:
31+
out:
32+
scheme: http
33+
format: application/cloudevents
3034
sink:
3135
ref:
3236
kind: Kamelet

0 commit comments

Comments
 (0)