@@ -7,58 +7,78 @@ use crate::{
7
7
GCPPubSubConsumerPlugin , RabbitMqConsumerPlugin , RedisConsumerPlugin , SqsConsumerPlugin ,
8
8
} ;
9
9
use serde:: Deserialize ;
10
- use svix_bridge_types:: { ReceiverOutput , SenderInput , SenderOutputOpts } ;
10
+ use svix_bridge_types:: {
11
+ ReceiverOutput , SenderInput , SenderOutputOpts , TransformationConfig , TransformerInputFormat ,
12
+ } ;
11
13
12
14
#[ derive( Deserialize ) ]
13
15
pub struct QueueConsumerConfig {
14
16
pub name : String ,
15
17
pub input : SenderInputOpts ,
16
18
#[ serde( default ) ]
17
- pub transformation : Option < String > ,
19
+ pub transformation : Option < TransformationConfig > ,
18
20
pub output : SenderOutputOpts ,
19
21
}
20
22
21
23
impl QueueConsumerConfig {
22
- pub fn into_sender_input ( self ) -> Box < dyn SenderInput > {
24
+ pub fn into_sender_input ( self ) -> Result < Box < dyn SenderInput > , & ' static str > {
23
25
match self . input {
24
- SenderInputOpts :: GCPPubSub ( input) => Box :: new ( GCPPubSubConsumerPlugin :: new (
25
- self . name ,
26
- input,
27
- self . transformation ,
28
- self . output ,
29
- ) ) ,
30
- SenderInputOpts :: RabbitMQ ( input) => Box :: new ( RabbitMqConsumerPlugin :: new (
26
+ SenderInputOpts :: GCPPubSub ( input) => Ok ( Box :: new ( GCPPubSubConsumerPlugin :: new (
31
27
self . name ,
32
28
input,
33
29
self . transformation ,
34
30
self . output ,
35
- ) ) ,
36
- SenderInputOpts :: Redis ( input) => Box :: new ( RedisConsumerPlugin :: new (
31
+ ) ) ) ,
32
+ SenderInputOpts :: RabbitMQ ( input) => Ok ( Box :: new ( RabbitMqConsumerPlugin :: new (
37
33
self . name ,
38
34
input,
39
35
self . transformation ,
40
36
self . output ,
41
- ) ) ,
42
- SenderInputOpts :: SQS ( input) => Box :: new ( SqsConsumerPlugin :: new (
37
+ ) ) ) ,
38
+ SenderInputOpts :: Redis ( input) => {
39
+ if let Some ( xform) = & self . transformation {
40
+ if xform. format ( ) != TransformerInputFormat :: Json {
41
+ return Err ( "redis only supports json formatted transformations" ) ;
42
+ }
43
+ }
44
+ Ok ( Box :: new ( RedisConsumerPlugin :: new (
45
+ self . name ,
46
+ input,
47
+ self . transformation ,
48
+ self . output ,
49
+ ) ) )
50
+ }
51
+ SenderInputOpts :: SQS ( input) => Ok ( Box :: new ( SqsConsumerPlugin :: new (
43
52
self . name ,
44
53
input,
45
54
self . transformation ,
46
55
self . output ,
47
- ) ) ,
56
+ ) ) ) ,
48
57
}
49
58
}
50
59
}
51
60
52
61
pub async fn into_receiver_output (
53
62
name : String ,
54
63
opts : ReceiverOutputOpts ,
64
+ // Annoying to have to pass this, but certain backends (redis) only work with certain transformations (json).
65
+ transformation : & Option < TransformationConfig > ,
55
66
) -> Result < Box < dyn ReceiverOutput > , crate :: Error > {
56
67
let forwarder = match opts {
57
68
ReceiverOutputOpts :: GCPPubSub ( opts) => {
58
69
QueueForwarder :: from_gcp_pupsub_cfg ( name, opts) . await ?
59
70
}
60
71
ReceiverOutputOpts :: RabbitMQ ( opts) => QueueForwarder :: from_rabbitmq_cfg ( name, opts) . await ?,
61
- ReceiverOutputOpts :: Redis ( opts) => QueueForwarder :: from_redis_cfg ( name, opts) . await ?,
72
+ ReceiverOutputOpts :: Redis ( opts) => {
73
+ if let Some ( t) = transformation {
74
+ if t. format ( ) != TransformerInputFormat :: Json {
75
+ return Err ( crate :: Error :: Generic (
76
+ "redis only supports json formatted transformations" . to_string ( ) ,
77
+ ) ) ;
78
+ }
79
+ }
80
+ QueueForwarder :: from_redis_cfg ( name, opts) . await ?
81
+ }
62
82
ReceiverOutputOpts :: SQS ( opts) => QueueForwarder :: from_sqs_cfg ( name, opts) . await ?,
63
83
} ;
64
84
Ok ( Box :: new ( forwarder) )
@@ -84,3 +104,73 @@ pub enum ReceiverOutputOpts {
84
104
Redis ( RedisOutputOpts ) ,
85
105
SQS ( SqsOutputOpts ) ,
86
106
}
107
+
108
+ #[ cfg( test) ]
109
+ mod tests {
110
+ use super :: into_receiver_output;
111
+ use super :: QueueConsumerConfig ;
112
+ use crate :: config:: { ReceiverOutputOpts , SenderInputOpts } ;
113
+ use crate :: redis:: { RedisInputOpts , RedisOutputOpts } ;
114
+ use svix_bridge_types:: {
115
+ SenderOutputOpts , SvixSenderOutputOpts , TransformationConfig , TransformerInputFormat ,
116
+ } ;
117
+
118
+ // FIXME: can't support raw payload access for redis because it requires JSON internally.
119
+ // Revisit after `omniqueue` adoption.
120
+ #[ test]
121
+ fn redis_sender_with_string_transformation_is_err ( ) {
122
+ let cfg = QueueConsumerConfig {
123
+ name : "redis-with-string-transformation" . to_string ( ) ,
124
+ input : SenderInputOpts :: Redis ( RedisInputOpts {
125
+ dsn : "" . to_string ( ) ,
126
+ max_connections : 0 ,
127
+ reinsert_on_nack : false ,
128
+ queue_key : "" . to_string ( ) ,
129
+ consumer_group : "" . to_string ( ) ,
130
+ consumer_name : "" . to_string ( ) ,
131
+ } ) ,
132
+ transformation : Some ( TransformationConfig :: Explicit {
133
+ format : TransformerInputFormat :: String ,
134
+ src : String :: new ( ) ,
135
+ } ) ,
136
+ output : SenderOutputOpts :: Svix ( SvixSenderOutputOpts {
137
+ token : "" . to_string ( ) ,
138
+ options : None ,
139
+ } ) ,
140
+ } ;
141
+
142
+ assert_eq ! (
143
+ cfg. into_sender_input( )
144
+ . err( )
145
+ . expect( "invalid config didn't result in error" ) ,
146
+ "redis only supports json formatted transformations"
147
+ )
148
+ }
149
+
150
+ // FIXME: can't support raw payload access for redis because it requires JSON internally.
151
+ // Revisit after `omniqueue` adoption.
152
+ #[ tokio:: test]
153
+ async fn test_redis_receiver_string_transform_is_err ( ) {
154
+ let redis_out = ReceiverOutputOpts :: Redis ( RedisOutputOpts {
155
+ dsn : "" . to_string ( ) ,
156
+ max_connections : 0 ,
157
+ queue_key : "" . to_string ( ) ,
158
+ } ) ;
159
+
160
+ // Explicit String fails
161
+ let res = into_receiver_output (
162
+ "" . to_string ( ) ,
163
+ redis_out,
164
+ & Some ( TransformationConfig :: Explicit {
165
+ src : String :: new ( ) ,
166
+ format : TransformerInputFormat :: String ,
167
+ } ) ,
168
+ )
169
+ . await ;
170
+ assert ! ( matches!(
171
+ res. err( )
172
+ . expect( "invalid config didn't result in error" ) ,
173
+ crate :: error:: Error :: Generic ( msg) if msg == "redis only supports json formatted transformations"
174
+ ) ) ;
175
+ }
176
+ }
0 commit comments