55'use strict' ;
66
77const EventEmitter = require ( 'events' ) ;
8- const Promise = require ( 'bluebird' ) ;
8+ const promisify = require ( 'util' ) . promisify ;
99const defs = require ( './defs' ) ;
1010const { BaseChannel} = require ( './channel' ) ;
1111const { acceptMessage} = require ( './channel' ) ;
@@ -23,7 +23,7 @@ class ChannelModel extends EventEmitter {
2323 }
2424
2525 close ( ) {
26- return Promise . fromCallback ( this . connection . close . bind ( this . connection ) ) ;
26+ return promisify ( this . connection . close . bind ( this . connection ) ) ( ) ;
2727 }
2828
2929 async createChannel ( ) {
@@ -53,27 +53,25 @@ class Channel extends BaseChannel {
5353 // response's fields; this is intended to be suitable for implementing
5454 // API procedures.
5555 async rpc ( method , fields , expect ) {
56- const f = await Promise . fromCallback ( cb => {
56+ const f = await promisify ( cb => {
5757 return this . _rpc ( method , fields , expect , cb ) ;
58- } )
58+ } ) ( ) ;
5959
6060 return f . fields ;
6161 }
6262
6363 // Do the remarkably simple channel open handshake
64- open ( ) {
65- return Promise . try ( this . allocate . bind ( this ) ) . then (
66- ch => {
67- return ch . rpc ( defs . ChannelOpen , { outOfBand : "" } ,
68- defs . ChannelOpenOk ) ;
69- } ) ;
64+ async open ( ) {
65+ const ch = await this . allocate . bind ( this ) ( ) ;
66+ return ch . rpc ( defs . ChannelOpen , { outOfBand : "" } ,
67+ defs . ChannelOpenOk ) ;
7068 }
7169
7270 close ( ) {
73- return Promise . fromCallback ( cb => {
71+ return promisify ( cb => {
7472 return this . closeBecause ( "Goodbye" , defs . constants . REPLY_SUCCESS ,
7573 cb ) ;
76- } ) ;
74+ } ) ( ) ;
7775 }
7876
7977 // === Public API, declaring queues and stuff ===
@@ -162,21 +160,21 @@ class Channel extends BaseChannel {
162160 // NB we want the callback to be run synchronously, so that we've
163161 // registered the consumerTag before any messages can arrive.
164162 const fields = Args . consume ( queue , options ) ;
165- return Promise . fromCallback ( cb => {
166- this . _rpc ( defs . BasicConsume , fields , defs . BasicConsumeOk , cb ) ;
167- } )
168- . then ( ok => {
169- this . registerConsumer ( ok . fields . consumerTag , callback ) ;
170- return ok . fields ;
163+ return new Promise ( ( resolve , reject ) => {
164+ this . _rpc ( defs . BasicConsume , fields , defs . BasicConsumeOk , ( err , ok ) => {
165+ if ( err ) return reject ( err ) ;
166+ this . registerConsumer ( ok . fields . consumerTag , callback ) ;
167+ resolve ( ok . fields ) ;
168+ } ) ;
171169 } ) ;
172170 }
173171
174172 async cancel ( consumerTag ) {
175- const ok = await Promise . fromCallback ( cb => {
173+ const ok = await promisify ( cb => {
176174 this . _rpc ( defs . BasicCancel , Args . cancel ( consumerTag ) ,
177175 defs . BasicCancelOk ,
178176 cb ) ;
179- } )
177+ } ) ( )
180178 . then ( ok => {
181179 this . unregisterConsumer ( consumerTag ) ;
182180 return ok . fields ;
@@ -185,25 +183,23 @@ class Channel extends BaseChannel {
185183
186184 get ( queue , options ) {
187185 const fields = Args . get ( queue , options ) ;
188- return Promise . fromCallback ( cb => {
189- return this . sendOrEnqueue ( defs . BasicGet , fields , cb ) ;
190- } )
191- . then ( f => {
192- if ( f . id === defs . BasicGetEmpty ) {
193- return false ;
194- }
195- else if ( f . id === defs . BasicGetOk ) {
196- const fields = f . fields ;
197- return new Promise ( resolve => {
186+ return new Promise ( ( resolve , reject ) => {
187+ this . sendOrEnqueue ( defs . BasicGet , fields , ( err , f ) => {
188+ if ( err ) return reject ( err ) ;
189+ if ( f . id === defs . BasicGetEmpty ) {
190+ return resolve ( false ) ;
191+ }
192+ else if ( f . id === defs . BasicGetOk ) {
193+ const fields = f . fields ;
198194 this . handleMessage = acceptMessage ( m => {
199195 m . fields = fields ;
200196 resolve ( m ) ;
201197 } ) ;
202- } ) ;
203- }
204- else {
205- throw new Error ( `Unexpected response to BasicGet: ${ inspect ( f ) } ` ) ;
206- }
198+ }
199+ else {
200+ reject ( new Error ( `Unexpected response to BasicGet: ${ inspect ( f ) } ` ) ) ;
201+ }
202+ } ) ;
207203 } ) ;
208204 }
209205
@@ -286,6 +282,7 @@ class ConfirmChannel extends Channel {
286282 if ( err === null ) resolve ( ) ;
287283 else reject ( err ) ;
288284 } ;
285+ if ( ! this . pending ) unconfirmed [ index ] ( new Error ( 'channel closed' ) )
289286 } ) ;
290287 awaiting . push ( confirmed ) ;
291288 }
0 commit comments