@@ -55,6 +55,8 @@ export class BlockedSource extends BaseSource {
55
55
56
56
// set of blockIds missing for the current requests
57
57
this . blockIdsToFetch = new Set ( ) ;
58
+
59
+ this . abortedBlockIds = new Set ( ) ;
58
60
}
59
61
60
62
get fileSize ( ) {
@@ -66,9 +68,9 @@ export class BlockedSource extends BaseSource {
66
68
* @param {basesource/Slice[] } slices
67
69
*/
68
70
async fetch ( slices , signal ) {
69
- const cachedBlocks = new Map ( ) ;
70
- const blockRequests = new Map ( ) ;
71
- const missingBlockIds = new Set ( ) ;
71
+ const blockRequests = [ ] ;
72
+ const missingBlockIds = [ ] ;
73
+ const allBlockIds = [ ] ;
72
74
73
75
for ( const { offset, length } of slices ) {
74
76
let top = offset + length ;
@@ -80,92 +82,68 @@ export class BlockedSource extends BaseSource {
80
82
81
83
const firstBlockOffset = Math . floor ( offset / this . blockSize ) * this . blockSize ;
82
84
83
- // chunk the current slice into blocks
84
85
for ( let current = firstBlockOffset ; current < top ; current += this . blockSize ) {
85
- // check if the block is cached, being requested or still missing
86
86
const blockId = Math . floor ( current / this . blockSize ) ;
87
-
88
- if ( this . blockCache . has ( blockId ) ) {
89
- cachedBlocks . set ( blockId , this . blockCache . get ( blockId ) ) ;
90
- } else if ( this . blockRequests . has ( blockId ) ) {
91
- blockRequests . set ( blockId , this . blockRequests . get ( blockId ) ) ;
92
- } else if ( this . blockIdsToFetch . has ( blockId ) ) {
93
- missingBlockIds . add ( blockId ) ;
94
- } else {
87
+ if ( ! this . blockCache . has ( blockId ) && ! this . blockRequests . has ( blockId ) ) {
95
88
this . blockIdsToFetch . add ( blockId ) ;
96
- missingBlockIds . add ( blockId ) ;
89
+ missingBlockIds . push ( blockId ) ;
90
+ }
91
+ if ( this . blockRequests . has ( blockId ) ) {
92
+ blockRequests . push ( this . blockRequests . get ( blockId ) ) ;
97
93
}
94
+ allBlockIds . push ( blockId ) ;
98
95
}
99
96
}
100
97
101
98
// allow additional block requests to accumulate
102
99
await wait ( ) ;
103
100
this . fetchBlocks ( signal ) ;
104
101
102
+ // Gather all of the new requests that this fetch call is contributing to `fetch`.
103
+ const missingRequests = [ ] ;
105
104
for ( const blockId of missingBlockIds ) {
106
- const block = this . blockRequests . get ( blockId ) ;
107
- const cachedBlock = this . blockCache . get ( blockId ) ;
108
-
109
- if ( block ) {
110
- blockRequests . set ( blockId , block ) ;
111
- } else if ( cachedBlock ) {
112
- cachedBlocks . set ( blockId , cachedBlock ) ;
113
- } else {
114
- throw new Error ( `Block ${ blockId } is not in the block requests` ) ;
105
+ // The requested missing block could already be in the cache
106
+ // instead of having its request still be outstanding.
107
+ if ( this . blockRequests . has ( blockId ) ) {
108
+ missingRequests . push ( this . blockRequests . get ( blockId ) ) ;
115
109
}
116
110
}
117
111
118
- // actually await all pending requests
119
- let results = await Promise . allSettled ( Array . from ( blockRequests . values ( ) ) ) ;
120
-
121
- // perform retries if a block was interrupted by a previous signal
122
- if ( results . some ( ( result ) => result . status === 'rejected' ) ) {
123
- const retriedBlockRequests = new Set ( ) ;
124
- for ( const [ blockId , result ] of zip ( blockRequests . keys ( ) , results ) ) {
125
- const { rejected, reason } = result ;
126
- if ( rejected ) {
127
- // push some blocks back to the to-fetch list if they were
128
- // aborted, but only when a different signal was used
129
- if ( reason . name === 'AbortError' && reason . signal !== signal ) {
130
- this . blockIdsToFetch . add ( blockId ) ;
131
- retriedBlockRequests . add ( blockId ) ;
132
- }
112
+ // Actually await all pending requests that are needed for this `fetch`.
113
+ await Promise . allSettled ( blockRequests . values ( ) ) ;
114
+ await Promise . allSettled ( missingRequests . values ( ) ) ;
115
+
116
+ // Perform retries if a block was interrupted by a previous signal
117
+ const abortedBlockRequests = [ ] ;
118
+ const abortedBlockIds = allBlockIds
119
+ . filter ( ( id ) => this . abortedBlockIds . has ( id ) || ! this . blockCache . has ( id ) ) ;
120
+ abortedBlockIds . forEach ( ( id ) => this . blockIdsToFetch . add ( id ) ) ;
121
+ // start the retry of some blocks if required
122
+ if ( abortedBlockIds . length > 0 && signal && ! signal . aborted ) {
123
+ this . fetchBlocks ( null ) ;
124
+ for ( const blockId of abortedBlockIds ) {
125
+ const block = this . blockRequests . get ( blockId ) ;
126
+ if ( ! block ) {
127
+ throw new Error ( `Block ${ blockId } is not in the block requests` ) ;
133
128
}
129
+ abortedBlockRequests . push ( block ) ;
134
130
}
135
-
136
- // start the retry of some blocks if required
137
- if ( this . blockIdsToFetch . length > 0 ) {
138
- this . fetchBlocks ( signal ) ;
139
- for ( const blockId of retriedBlockRequests ) {
140
- const block = this . blockRequests . get ( blockId ) ;
141
- if ( ! block ) {
142
- throw new Error ( `Block ${ blockId } is not in the block requests` ) ;
143
- }
144
- blockRequests . set ( blockId , block ) ;
145
- }
146
- results = await Promise . allSettled ( Array . from ( blockRequests . values ( ) ) ) ;
147
- }
131
+ await Promise . allSettled ( Array . from ( abortedBlockRequests . values ( ) ) ) ;
148
132
}
149
133
150
- // throw an error (either abort error or AggregateError if no abort was done)
151
- if ( results . some ( ( result ) => result . status === 'rejected' ) ) {
152
- if ( signal && signal . aborted ) {
153
- throw new AbortError ( 'Request was aborted' ) ;
154
- }
155
- throw new AggregateError (
156
- results . filter ( ( result ) => result . status === 'rejected' ) . map ( ( result ) => result . reason ) ,
157
- 'Request failed' ,
158
- ) ;
134
+ // throw an abort error
135
+ if ( signal && signal . aborted ) {
136
+ throw new AbortError ( 'Request was aborted' ) ;
159
137
}
160
138
161
- // extract the actual block responses
162
- const values = results . map ( ( result ) => result . value ) ;
139
+ const blocks = allBlockIds . map ( ( id ) => this . blockCache . get ( id ) ) ;
140
+ const failedBlocks = blocks . filter ( ( i ) => ! i ) ;
141
+ if ( failedBlocks . length ) {
142
+ throw new AggregateError ( failedBlocks , 'Request failed' ) ;
143
+ }
163
144
164
145
// create a final Map, with all required blocks for this request to satisfy
165
- const requiredBlocks = new Map ( zip ( Array . from ( blockRequests . keys ( ) ) , values ) ) ;
166
- for ( const [ blockId , block ] of cachedBlocks ) {
167
- requiredBlocks . set ( blockId , block ) ;
168
- }
146
+ const requiredBlocks = new Map ( zip ( allBlockIds , blocks ) ) ;
169
147
170
148
// TODO: satisfy each slice
171
149
return this . readSliceData ( slices , requiredBlocks ) ;
@@ -188,7 +166,7 @@ export class BlockedSource extends BaseSource {
188
166
189
167
for ( const blockId of group . blockIds ) {
190
168
// make an async IIFE for each block
191
- const blockRequest = ( async ( ) => {
169
+ this . blockRequests . set ( blockId , ( async ( ) => {
192
170
try {
193
171
const response = ( await groupRequests ) [ groupIndex ] ;
194
172
const blockOffset = blockId * this . blockSize ;
@@ -199,21 +177,24 @@ export class BlockedSource extends BaseSource {
199
177
blockOffset ,
200
178
data . byteLength ,
201
179
data ,
180
+ blockId ,
202
181
) ;
203
182
this . blockCache . set ( blockId , block ) ;
204
- return block ;
183
+ this . abortedBlockIds . delete ( blockId ) ;
205
184
} catch ( err ) {
206
185
if ( err . name === 'AbortError' ) {
207
186
// store the signal here, we need it to determine later if an
208
187
// error was caused by this signal
209
188
err . signal = signal ;
189
+ this . blockCache . del ( blockId ) ;
190
+ this . abortedBlockIds . add ( blockId ) ;
191
+ } else {
192
+ throw err ;
210
193
}
211
- throw err ;
212
194
} finally {
213
195
this . blockRequests . delete ( blockId ) ;
214
196
}
215
- } ) ( ) ;
216
- this . blockRequests . set ( blockId , blockRequest ) ;
197
+ } ) ( ) ) ;
217
198
}
218
199
}
219
200
this . blockIdsToFetch . clear ( ) ;
@@ -265,9 +246,12 @@ export class BlockedSource extends BaseSource {
265
246
*/
266
247
readSliceData ( slices , blocks ) {
267
248
return slices . map ( ( slice ) => {
268
- const top = slice . offset + slice . length ;
249
+ let top = slice . offset + slice . length ;
250
+ if ( this . fileSize !== null ) {
251
+ top = Math . min ( this . fileSize , top ) ;
252
+ }
269
253
const blockIdLow = Math . floor ( slice . offset / this . blockSize ) ;
270
- const blockIdHigh = Math . floor ( ( slice . offset + slice . length ) / this . blockSize ) ;
254
+ const blockIdHigh = Math . floor ( top / this . blockSize ) ;
271
255
const sliceData = new ArrayBuffer ( slice . length ) ;
272
256
const sliceView = new Uint8Array ( sliceData ) ;
273
257
0 commit comments