3
3
const { async, utils } = require ( "merapi" ) ;
4
4
const ServiceSubQueue = require ( "merapi-plugin-service/lib/service_sub_queue" ) ;
5
5
const pack = require ( "../package" ) ;
6
+ const sleep = require ( "then-sleep" ) ;
6
7
7
8
class ServiceSubRabbit extends ServiceSubQueue {
8
9
@@ -60,6 +61,11 @@ class ServiceSubRabbit extends ServiceSubQueue {
60
61
let channel = this . _channels [ event ] ;
61
62
62
63
if ( channel ) {
64
+ channel . on ( "close" , ( ) => {
65
+ console . dir ( `${ event } is closed` )
66
+ this . reconnect ( event )
67
+ } ) ;
68
+
63
69
let queueName = `${ this . _namespace } .queue.${ this . SERVICE_NAME } .${ event } ` ;
64
70
65
71
yield channel . assertQueue ( queueName , { durable : true } ) ;
@@ -80,6 +86,25 @@ class ServiceSubRabbit extends ServiceSubQueue {
80
86
}
81
87
}
82
88
89
+ * reconnect ( event ) {
90
+ try {
91
+ let desc = this . config . default ( "service.queue.subscribe" , { } ) ;
92
+ let prefetch = parseInt ( this . config . default ( "service.rabbit.prefetch" , 5 ) ) ;
93
+
94
+ let channel = yield this . _connection . createChannel ( ) ;
95
+ yield channel . prefetch ( prefetch ) ;
96
+ this . _channels [ event ] = channel ;
97
+
98
+ let method = yield this . injector . resolveMethod ( desc [ event ] ) ;
99
+ this . createQueue ( event , method ) ;
100
+ } catch ( e ) {
101
+ this . logger . warn ( "Failed to connect channel " + event , e ) ;
102
+ yield sleep ( 3000 ) ;
103
+
104
+ yield this . reconnect ( event ) ;
105
+ }
106
+ }
107
+
83
108
* runMethod ( method , payload ) {
84
109
let ret = method ( payload ) ;
85
110
if ( utils . isPromise ( ret ) ) yield ret ;
0 commit comments