@@ -89,9 +89,13 @@ export async function ipfsAdd(coll, downloaderOpts = {}, replayOpts = {}, progre
89
89
90
90
let url , cid ;
91
91
92
+ let reject = null ;
93
+
94
+ const p2 = new Promise ( ( res , rej ) => reject = rej ) ;
95
+
92
96
const p = readable
93
97
. pipeThrough ( new ShardingStream ( shardSize ) )
94
- . pipeThrough ( new ShardStoringStream ( autoipfs , concur ) )
98
+ . pipeThrough ( new ShardStoringStream ( autoipfs , concur , reject ) )
95
99
. pipeTo (
96
100
new WritableStream ( {
97
101
write : ( res ) => {
@@ -113,7 +117,7 @@ export async function ipfsAdd(coll, downloaderOpts = {}, replayOpts = {}, progre
113
117
downloaderOpts . markers , favicon ,
114
118
) ;
115
119
116
- await p ;
120
+ await Promise . race ( [ p , p2 ] ) ;
117
121
118
122
const res = { cid : cid . toString ( ) , url} ;
119
123
@@ -135,7 +139,9 @@ export async function ipfsRemove(coll) {
135
139
try {
136
140
await autoipfs . clear ( url ) ;
137
141
} catch ( e ) {
138
- console . log ( "Removal from this IPFS backend not yet implemented" ) ;
142
+ console . log ( "Failed to unpin" ) ;
143
+ autoipfsOpts . daemonURL = null ;
144
+ return false ;
139
145
}
140
146
}
141
147
@@ -493,7 +499,7 @@ export class ShardingStream extends TransformStream {
493
499
* @extends {TransformStream<import('./types').CARFile, import('./types').CARMetadata> }
494
500
*/
495
501
export class ShardStoringStream extends TransformStream {
496
- constructor ( autoipfs , concurrency ) {
502
+ constructor ( autoipfs , concurrency , reject ) {
497
503
const queue = new Queue ( { concurrency } ) ;
498
504
const abortController = new AbortController ( ) ;
499
505
super ( {
@@ -513,6 +519,8 @@ export class ShardStoringStream extends TransformStream {
513
519
} catch ( err ) {
514
520
controller . error ( err ) ;
515
521
abortController . abort ( err ) ;
522
+ autoipfsOpts . daemonURL = null ;
523
+ reject ( err ) ;
516
524
}
517
525
} ,
518
526
{ signal : abortController . signal }
0 commit comments