1
+ // @ts -check
2
+
1
3
import { CID } from 'multiformats'
2
4
3
5
import { extractVerifiedContent } from './utils/car.js'
4
6
import { asAsyncIterable , asyncIteratorToBuffer } from './utils/itr.js'
5
7
import { randomUUID } from './utils/uuid.js'
6
8
import { memoryStorage } from './storage/index.js'
7
9
import { getJWT } from './utils/jwt.js'
10
+ import { parseUrl , addHttpPrefix } from './utils/url.js'
8
11
import { isBrowserContext } from './utils/runtime.js'
9
12
10
13
export class Saturn {
14
+ static nodesListKey = 'saturn-nodes'
11
15
/**
12
16
*
13
17
* @param {object } [opts={}]
@@ -16,14 +20,18 @@ export class Saturn {
16
20
* @param {string } [opts.cdnURL=saturn.ms]
17
21
* @param {number } [opts.connectTimeout=5000]
18
22
* @param {number } [opts.downloadTimeout=0]
23
+ * @param {string } [opts.orchURL]
24
+ * @param {number } [opts.fallbackLimit]
19
25
* @param {import('./storage/index.js').Storage } [opts.storage]
20
26
*/
21
27
constructor ( opts = { } ) {
22
28
this . opts = Object . assign ( { } , {
23
29
clientId : randomUUID ( ) ,
24
30
cdnURL : 'l1s.saturn.ms' ,
25
31
logURL : 'https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/' ,
32
+ orchURL : 'https://orchestrator.strn.pl/nodes?maxNodes=100' ,
26
33
authURL : 'https://fz3dyeyxmebszwhuiky7vggmsu0rlkoy.lambda-url.us-west-2.on.aws/' ,
34
+ fallbackLimit : 5 ,
27
35
connectTimeout : 5_000 ,
28
36
downloadTimeout : 0
29
37
} , opts )
@@ -33,13 +41,14 @@ export class Saturn {
33
41
}
34
42
35
43
this . logs = [ ]
36
- this . storage = this . opts . storage || memoryStorage ( )
44
+ this . nodes = [ ]
37
45
this . reportingLogs = process ?. env ?. NODE_ENV !== 'development'
38
46
this . hasPerformanceAPI = isBrowserContext && self ?. performance
39
-
40
47
if ( this . reportingLogs && this . hasPerformanceAPI ) {
41
48
this . _monitorPerformanceBuffer ( )
42
49
}
50
+ this . storage = this . opts . storage || memoryStorage ( )
51
+ this . loadNodesPromise = this . _loadNodes ( this . opts )
43
52
}
44
53
45
54
/**
@@ -76,10 +85,9 @@ export class Saturn {
76
85
Authorization : 'Bearer ' + options . jwt
77
86
}
78
87
}
79
-
80
88
let res
81
89
try {
82
- res = await fetch ( url , { signal : controller . signal , ...options } )
90
+ res = await fetch ( parseUrl ( url ) , { signal : controller . signal , ...options } )
83
91
84
92
clearTimeout ( connectTimeout )
85
93
@@ -109,6 +117,74 @@ export class Saturn {
109
117
return { res, controller, log }
110
118
}
111
119
120
+ /**
121
+ *
122
+ * @param {string } cidPath
123
+ * @param {object } [opts={}]
124
+ * @param {('car'|'raw') } [opts.format]
125
+ * @param {string } [opts.url]
126
+ * @param {number } [opts.connectTimeout=5000]
127
+ * @param {number } [opts.downloadTimeout=0]
128
+ * @returns {Promise<AsyncIterable<Uint8Array>> }
129
+ */
130
+ async * fetchContentWithFallback ( cidPath , opts = { } ) {
131
+ let lastError = null
132
+ // we use this to checkpoint at which chunk a request failed.
133
+ // this is temporary until range requests are supported.
134
+ let byteCountCheckpoint = 0
135
+
136
+ const fetchContent = async function * ( ) {
137
+ let byteCount = 0
138
+ const byteChunks = await this . fetchContent ( cidPath , opts )
139
+ for await ( const chunk of byteChunks ) {
140
+ // avoid sending duplicate chunks
141
+ if ( byteCount < byteCountCheckpoint ) {
142
+ // checks for overlapping chunks
143
+ const remainingBytes = byteCountCheckpoint - byteCount
144
+ if ( remainingBytes < chunk . length ) {
145
+ yield chunk . slice ( remainingBytes )
146
+ byteCountCheckpoint += chunk . length - remainingBytes
147
+ }
148
+ } else {
149
+ yield chunk
150
+ byteCountCheckpoint += chunk . length
151
+ }
152
+ byteCount += chunk . length
153
+ }
154
+ } . bind ( this )
155
+
156
+ if ( this . nodes . length === 0 ) {
157
+ // fetch from origin in the case that no nodes are loaded
158
+ opts . url = this . opts . cdnURL
159
+ try {
160
+ yield * fetchContent ( )
161
+ return
162
+ } catch ( err ) {
163
+ lastError = err
164
+ await this . loadNodesPromise
165
+ }
166
+ }
167
+
168
+ let fallbackCount = 0
169
+ for ( const origin of this . nodes ) {
170
+ if ( fallbackCount > this . opts . fallbackLimit ) {
171
+ return
172
+ }
173
+ opts . url = origin . url
174
+ try {
175
+ yield * fetchContent ( )
176
+ return
177
+ } catch ( err ) {
178
+ lastError = err
179
+ }
180
+ fallbackCount += 1
181
+ }
182
+
183
+ if ( lastError ) {
184
+ throw new Error ( `All attempts to fetch content have failed. Last error: ${ lastError . message } ` )
185
+ }
186
+ }
187
+
112
188
/**
113
189
*
114
190
* @param {string } cidPath
@@ -163,10 +239,8 @@ export class Saturn {
163
239
* @returns {URL }
164
240
*/
165
241
createRequestURL ( cidPath , opts ) {
166
- let origin = opts . cdnURL
167
- if ( ! origin . startsWith ( 'http' ) ) {
168
- origin = `https://${ origin } `
169
- }
242
+ let origin = opts . url || opts . cdnURL
243
+ origin = addHttpPrefix ( origin )
170
244
const url = new URL ( `${ origin } /ipfs/${ cidPath } ` )
171
245
172
246
url . searchParams . set ( 'format' , opts . format )
@@ -196,7 +270,6 @@ export class Saturn {
196
270
*/
197
271
reportLogs ( log ) {
198
272
if ( ! this . reportingLogs ) return
199
-
200
273
this . logs . push ( log )
201
274
this . reportLogsTimeout && clearTimeout ( this . reportLogsTimeout )
202
275
this . reportLogsTimeout = setTimeout ( this . _reportLogs . bind ( this ) , 3_000 )
@@ -295,4 +368,46 @@ export class Saturn {
295
368
performance . clearResourceTimings ( )
296
369
}
297
370
}
371
+
372
+ async _loadNodes ( opts ) {
373
+ let origin = opts . orchURL
374
+
375
+ let cacheNodesListPromise
376
+ if ( this . storage ) {
377
+ cacheNodesListPromise = this . storage . get ( Saturn . nodesListKey )
378
+ }
379
+
380
+ origin = addHttpPrefix ( origin )
381
+
382
+ const url = new URL ( origin )
383
+ const controller = new AbortController ( )
384
+ const options = Object . assign ( { } , { method : 'GET' } , this . opts )
385
+
386
+ const connectTimeout = setTimeout ( ( ) => {
387
+ controller . abort ( )
388
+ } , options . connectTimeout )
389
+
390
+ const orchResponse = await fetch ( parseUrl ( url ) , { signal : controller . signal , ...options } )
391
+ const orchNodesListPromise = orchResponse . json ( )
392
+ clearTimeout ( connectTimeout )
393
+
394
+ // This promise races fetching nodes list from the orchestrator and
395
+ // and the provided storage object (localStorage, sessionStorage, etc.)
396
+ // to insure we have a fallback set as quick as possible
397
+ let nodes
398
+ if ( cacheNodesListPromise ) {
399
+ nodes = await Promise . any ( [ orchNodesListPromise , cacheNodesListPromise ] )
400
+ } else {
401
+ nodes = await orchNodesListPromise
402
+ }
403
+
404
+ // if storage returns first, update based on cached storage.
405
+ if ( nodes === await cacheNodesListPromise ) {
406
+ this . nodes = nodes
407
+ }
408
+ // we always want to update from the orchestrator regardless.
409
+ nodes = await orchNodesListPromise
410
+ this . nodes = nodes
411
+ this . storage ?. set ( Saturn . nodesListKey , nodes )
412
+ }
298
413
}
0 commit comments