@@ -120,6 +120,41 @@ export class Api {
120120 url,
121121 // scripting: https://github.com/redis/node-redis/#lua-scripts
122122 scripts : {
123+ checkAndRecoverWorkerStream : redis . defineScript ( {
124+ NUMBER_OF_KEYS : 1 ,
125+ SCRIPT : `
126+ local found = false
127+ local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${ this . redisWorkerStreamName } ", "0-0")
128+
129+ if messages and #messages > 0 then
130+ local entries = messages[1][2]
131+ for _, entry in ipairs(entries) do
132+ -- Each entry is an array where entry[2] is the message fields
133+ if entry[2][2] == KEYS[1] then
134+ found = true
135+ break
136+ end
137+ end
138+ end
139+
140+ -- If stream not found in y:worker and the stream exists, add it
141+ if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then
142+ redis.call("XADD", "${ this . redisWorkerStreamName } ", "*", "compact", KEYS[1])
143+ end
144+ ` ,
145+ /**
146+ * @param {string } key
147+ */
148+ transformArguments ( key ) {
149+ return [ key ]
150+ } ,
151+ /**
152+ * @param {null } x
153+ */
154+ transformReply ( x ) {
155+ return x
156+ }
157+ } ) ,
123158 addMessage : redis . defineScript ( {
124159 NUMBER_OF_KEYS : 1 ,
125160 SCRIPT : `
@@ -211,6 +246,16 @@ export class Api {
211246 return this . redis . addMessage ( computeRedisRoomStreamName ( room , docid , this . prefix ) , m )
212247 }
213248
249+ /**
250+ * @param {string } room
251+ * @param {string } docid
252+ */
253+ async checkAndRecoveryWorkerStream ( room , docid ) {
254+ await this . redis . checkAndRecoverWorkerStream (
255+ computeRedisRoomStreamName ( room , docid , this . prefix ) ,
256+ )
257+ }
258+
214259 /**
215260 * @param {string } room
216261 * @param {string } docid
0 commit comments