@@ -4,123 +4,124 @@ import * as draft from "../../src/draft"
4
4
import { assert } from "chai"
5
5
import { testProtos , uniqAddress } from "./helpers"
6
6
7
- if ( zmq . capability . draft ) {
8
- for ( const proto of testProtos ( "tcp" , "ipc" , "inproc" , "udp" ) ) {
9
- describe ( `draft socket with ${ proto } radio/dish` , function ( ) {
10
- let radio : draft . Radio
11
- let dish : draft . Dish
12
-
13
- beforeEach ( function ( ) {
14
- radio = new draft . Radio ( )
15
- dish = new draft . Dish ( )
16
- } )
7
+ for ( const proto of testProtos ( "tcp" , "ipc" , "inproc" , "udp" ) ) {
8
+ describe ( `draft socket with ${ proto } radio/dish` , function ( ) {
9
+ if ( zmq . capability . draft !== true ) {
10
+ if ( process . env . ZMQ_DRAFT === "true" ) {
11
+ throw new Error ( "Draft API requested but not available at runtime." )
12
+ }
13
+ return
14
+ }
15
+
16
+ let radio : draft . Radio
17
+ let dish : draft . Dish
18
+
19
+ beforeEach ( function ( ) {
20
+ radio = new draft . Radio ( )
21
+ dish = new draft . Dish ( )
22
+ } )
17
23
18
- afterEach ( function ( ) {
19
- global . gc ?.( )
20
- radio . close ( )
21
- dish . close ( )
22
- global . gc ?.( )
23
- } )
24
+ afterEach ( function ( ) {
25
+ global . gc ?.( )
26
+ radio . close ( )
27
+ dish . close ( )
28
+ global . gc ?.( )
29
+ } )
24
30
25
- describe ( "send/receive" , function ( ) {
26
- it ( "should deliver messages" , async function ( ) {
27
- /* RADIO -> foo -> DISH
31
+ describe ( "send/receive" , function ( ) {
32
+ it ( "should deliver messages" , async function ( ) {
33
+ /* RADIO -> foo -> DISH
28
34
-> bar -> joined all
29
35
-> baz ->
30
36
-> qux ->
31
37
*/
32
38
33
- const address = uniqAddress ( proto )
34
- const messages = [ "foo" , "bar" , "baz" , "qux" ]
39
+ const address = uniqAddress ( proto )
40
+ const messages = [ "foo" , "bar" , "baz" , "qux" ]
35
41
36
- /* Max 15 non-null bytes. */
37
- const uuid = Buffer . from ( [
38
- 0xf6 , 0x46 , 0x1f , 0x03 , 0xd2 , 0x0d , 0xc8 , 0x66 , 0xe5 , 0x5f , 0xf5 ,
39
- 0xa1 , 0x65 , 0x62 , 0xb2 ,
40
- ] )
42
+ /* Max 15 non-null bytes. */
43
+ const uuid = Buffer . from ( [
44
+ 0xf6 , 0x46 , 0x1f , 0x03 , 0xd2 , 0x0d , 0xc8 , 0x66 , 0xe5 , 0x5f , 0xf5 ,
45
+ 0xa1 , 0x65 , 0x62 , 0xb2 ,
46
+ ] )
41
47
42
- const received : string [ ] = [ ]
48
+ const received : string [ ] = [ ]
43
49
44
- dish . join ( uuid )
50
+ dish . join ( uuid )
45
51
46
- await dish . bind ( address )
47
- await radio . connect ( address )
52
+ await dish . bind ( address )
53
+ await radio . connect ( address )
48
54
49
- const send = async ( ) => {
50
- /* Wait briefly before publishing to avoid slow joiner syndrome. */
51
- await new Promise ( resolve => {
52
- setTimeout ( resolve , 25 )
53
- } )
54
- for ( const msg of messages ) {
55
- await radio . send ( msg , { group : uuid } )
56
- }
55
+ const send = async ( ) => {
56
+ /* Wait briefly before publishing to avoid slow joiner syndrome. */
57
+ await new Promise ( resolve => {
58
+ setTimeout ( resolve , 25 )
59
+ } )
60
+ for ( const msg of messages ) {
61
+ await radio . send ( msg , { group : uuid } )
57
62
}
58
-
59
- const receive = async ( ) => {
60
- for await ( const [ msg , { group } ] of dish ) {
61
- assert . instanceOf ( msg , Buffer )
62
- assert . instanceOf ( group , Buffer )
63
- assert . deepEqual ( group , uuid )
64
- received . push ( msg . toString ( ) )
65
- if ( received . length === messages . length ) {
66
- break
67
- }
63
+ }
64
+
65
+ const receive = async ( ) => {
66
+ for await ( const [ msg , { group } ] of dish ) {
67
+ assert . instanceOf ( msg , Buffer )
68
+ assert . instanceOf ( group , Buffer )
69
+ assert . deepEqual ( group , uuid )
70
+ received . push ( msg . toString ( ) )
71
+ if ( received . length === messages . length ) {
72
+ break
68
73
}
69
74
}
75
+ }
70
76
71
- await Promise . all ( [ send ( ) , receive ( ) ] )
72
- assert . deepEqual ( received , messages )
73
- } )
77
+ await Promise . all ( [ send ( ) , receive ( ) ] )
78
+ assert . deepEqual ( received , messages )
74
79
} )
80
+ } )
75
81
76
- describe ( "join/leave" , function ( ) {
77
- it ( "should filter messages" , async function ( ) {
78
- /* RADIO -> foo -X DISH
82
+ describe ( "join/leave" , function ( ) {
83
+ it ( "should filter messages" , async function ( ) {
84
+ /* RADIO -> foo -X DISH
79
85
-> bar -> joined "ba"
80
86
-> baz ->
81
87
-> qux -X
82
88
*/
83
89
84
- const address = uniqAddress ( proto )
85
- const messages = [ "foo" , "bar" , "baz" , "qux" ]
86
- const received : string [ ] = [ ]
90
+ const address = uniqAddress ( proto )
91
+ const messages = [ "foo" , "bar" , "baz" , "qux" ]
92
+ const received : string [ ] = [ ]
87
93
88
- /* Everything after null byte should be ignored. */
89
- dish . join ( Buffer . from ( "fo\x00ba" ) , Buffer . from ( "ba\x00fo" ) )
90
- dish . leave ( Buffer . from ( "fo" ) )
94
+ /* Everything after null byte should be ignored. */
95
+ dish . join ( Buffer . from ( "fo\x00ba" ) , Buffer . from ( "ba\x00fo" ) )
96
+ dish . leave ( Buffer . from ( "fo" ) )
91
97
92
- await dish . bind ( address )
93
- await radio . connect ( address )
98
+ await dish . bind ( address )
99
+ await radio . connect ( address )
94
100
95
- const send = async ( ) => {
96
- /* Wait briefly before publishing to avoid slow joiner syndrome. */
97
- await new Promise ( resolve => {
98
- setTimeout ( resolve , 25 )
99
- } )
100
- for ( const msg of messages ) {
101
- await radio . send ( msg , { group : msg . slice ( 0 , 2 ) } )
102
- }
101
+ const send = async ( ) => {
102
+ /* Wait briefly before publishing to avoid slow joiner syndrome. */
103
+ await new Promise ( resolve => {
104
+ setTimeout ( resolve , 25 )
105
+ } )
106
+ for ( const msg of messages ) {
107
+ await radio . send ( msg , { group : msg . slice ( 0 , 2 ) } )
103
108
}
104
-
105
- const receive = async ( ) => {
106
- for await ( const [ msg , { group } ] of dish ) {
107
- assert . instanceOf ( msg , Buffer )
108
- assert . deepEqual ( group , msg . slice ( 0 , 2 ) )
109
- received . push ( msg . toString ( ) )
110
- if ( received . length === 2 ) {
111
- break
112
- }
109
+ }
110
+
111
+ const receive = async ( ) => {
112
+ for await ( const [ msg , { group } ] of dish ) {
113
+ assert . instanceOf ( msg , Buffer )
114
+ assert . deepEqual ( group , msg . slice ( 0 , 2 ) )
115
+ received . push ( msg . toString ( ) )
116
+ if ( received . length === 2 ) {
117
+ break
113
118
}
114
119
}
120
+ }
115
121
116
- await Promise . all ( [ send ( ) , receive ( ) ] )
117
- assert . deepEqual ( received , [ "bar" , "baz" ] )
118
- } )
122
+ await Promise . all ( [ send ( ) , receive ( ) ] )
123
+ assert . deepEqual ( received , [ "bar" , "baz" ] )
119
124
} )
120
125
} )
121
- }
122
- } else {
123
- if ( process . env . ZMQ_DRAFT === "true" ) {
124
- throw new Error ( "Draft API requested but not available at runtime." )
125
- }
126
+ } )
126
127
}
0 commit comments