12
12
namespace Symfony \Component \Messenger \Transport \RedisExt ;
13
13
14
14
use Symfony \Component \Messenger \Exception \InvalidArgumentException ;
15
- use Symfony \Component \Messenger \Exception \LogicException ;
15
+ use Symfony \Component \Messenger \Exception \TransportException ;
16
16
17
17
/**
18
18
* A Redis connection.
@@ -77,17 +77,21 @@ public function get(): ?array
77
77
$ messageId = '0 ' ; // will receive consumers pending messages
78
78
}
79
79
80
- $ messages = $ this ->connection ->xreadgroup (
81
- $ this ->group ,
82
- $ this ->consumer ,
83
- [$ this ->stream => $ messageId ],
84
- 1 ,
85
- $ this ->blockingTimeout
86
- );
87
-
88
- if (false === $ messages ) {
89
- throw new LogicException (
90
- $ this ->connection ->getLastError () ?: 'Unexpected redis stream error happened. '
80
+ $ e = null ;
81
+ try {
82
+ $ messages = $ this ->connection ->xReadGroup (
83
+ $ this ->group ,
84
+ $ this ->consumer ,
85
+ [$ this ->stream => $ messageId ],
86
+ 1 ,
87
+ $ this ->blockingTimeout
88
+ );
89
+ } catch (\RedisException $ e ) {
90
+ }
91
+
92
+ if (false === $ messages || $ e ) {
93
+ throw new TransportException (
94
+ ($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? 'Could not read messages from the redis stream. '
91
95
);
92
96
}
93
97
@@ -113,23 +117,51 @@ public function get(): ?array
113
117
114
118
public function ack (string $ id ): void
115
119
{
116
- $ this ->connection ->xack ($ this ->stream , $ this ->group , [$ id ]);
120
+ $ e = null ;
121
+ try {
122
+ $ acknowledged = $ this ->connection ->xAck ($ this ->stream , $ this ->group , [$ id ]);
123
+ } catch (\RedisException $ e ) {
124
+ }
125
+
126
+ if (!$ acknowledged || $ e ) {
127
+ throw new TransportException (($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? sprintf ('Could not acknowledge redis message "%s". ' , $ id ), 0 , $ e );
128
+ }
117
129
}
118
130
119
131
public function reject (string $ id ): void
120
132
{
121
- $ this ->connection ->xdel ($ this ->stream , [$ id ]);
133
+ $ e = null ;
134
+ try {
135
+ $ deleted = $ this ->connection ->xDel ($ this ->stream , [$ id ]);
136
+ } catch (\RedisException $ e ) {
137
+ }
138
+
139
+ if (!$ deleted || $ e ) {
140
+ throw new TransportException (($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? sprintf ('Could not delete message "%s" from the redis stream. ' , $ id ), 0 , $ e );
141
+ }
122
142
}
123
143
124
144
public function add (string $ body , array $ headers )
125
145
{
126
- $ this ->connection ->xadd ($ this ->stream , '* ' , ['message ' => json_encode (
127
- ['body ' => $ body , 'headers ' => $ headers ]
128
- )]);
146
+ $ e = null ;
147
+ try {
148
+ $ added = $ this ->connection ->xAdd ($ this ->stream , '* ' , ['message ' => json_encode (
149
+ ['body ' => $ body , 'headers ' => $ headers ]
150
+ )]);
151
+ } catch (\RedisException $ e ) {
152
+ }
153
+
154
+ if (!$ added || $ e ) {
155
+ throw new TransportException (($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? 'Could not add a message to the redis stream. ' , 0 , $ e );
156
+ }
129
157
}
130
158
131
159
public function setup (): void
132
160
{
133
- $ this ->connection ->xgroup ('CREATE ' , $ this ->stream , $ this ->group , 0 , true );
161
+ try {
162
+ $ this ->connection ->xGroup ('CREATE ' , $ this ->stream , $ this ->group , 0 , true );
163
+ } catch (\RedisException $ e ) {
164
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
165
+ }
134
166
}
135
167
}
0 commit comments