@@ -8,11 +8,13 @@ const promisify = require('promisify-es6')
8
8
const multihashes = require ( 'multihashes' )
9
9
const pull = require ( 'pull-stream' )
10
10
const sort = require ( 'pull-sort' )
11
+ const pushable = require ( 'pull-pushable' )
11
12
const toStream = require ( 'pull-stream-to-stream' )
12
13
const toPull = require ( 'stream-to-pull-stream' )
13
14
const CID = require ( 'cids' )
14
15
const waterfall = require ( 'async/waterfall' )
15
16
const isStream = require ( 'isstream' )
17
+ const Duplex = require ( 'stream' ) . Duplex
16
18
17
19
module . exports = function files ( self ) {
18
20
const createAddPullStream = ( options ) => {
@@ -30,7 +32,19 @@ module.exports = function files (self) {
30
32
callback = options
31
33
options = undefined
32
34
}
33
- callback ( null , toStream ( createAddPullStream ( options ) ) )
35
+
36
+ const addPullStream = createAddPullStream ( options )
37
+ const p = pushable ( )
38
+ const s = pull (
39
+ p ,
40
+ addPullStream
41
+ )
42
+
43
+ const retStream = new AddStreamDuplex ( s , p )
44
+
45
+ retStream . once ( 'finish' , ( ) => p . end ( ) )
46
+
47
+ callback ( null , retStream )
34
48
} ,
35
49
36
50
createAddPullStream : createAddPullStream ,
@@ -164,3 +178,28 @@ function normalizeContent (content) {
164
178
}
165
179
166
180
function noop ( ) { }
181
+
182
+ class AddStreamDuplex extends Duplex {
183
+ constructor ( pullStream , push , options ) {
184
+ super ( Object . assign ( { objectMode : true } , options ) )
185
+ this . _pullStream = pullStream
186
+ this . _pushable = push
187
+ }
188
+
189
+ _read ( ) {
190
+ this . _pullStream ( null , ( end , data ) => {
191
+ if ( end ) {
192
+ if ( end instanceof Error ) {
193
+ this . emit ( 'error' , end )
194
+ }
195
+ } else {
196
+ this . push ( data )
197
+ }
198
+ } )
199
+ }
200
+
201
+ _write ( chunk , encoding , callback ) {
202
+ this . _pushable . push ( chunk )
203
+ callback ( )
204
+ }
205
+ }
0 commit comments