1
1
'use server' ;
2
2
3
3
import { RevisionPage , SearchAIAnswer , SearchPageResult , SiteSpace , Space } from '@gitbook/api' ;
4
- import pMap from 'p-map' ;
5
4
import * as React from 'react' ;
6
5
import { assert } from 'ts-essentials' ;
7
6
@@ -179,14 +178,28 @@ export const streamAskQuestion = streamResponse(async function* (
179
178
{ format : 'document' } ,
180
179
) ;
181
180
182
- const spaceData = new Map < string , RevisionPage [ ] > ( ) ;
181
+ const spaceData = new PromiseQueue < string , RevisionPage [ ] > ( ) ;
183
182
for await ( const chunk of stream ) {
184
- if ( ! chunk ) {
185
- continue ;
186
- }
183
+ const answer = chunk . answer ;
184
+
185
+ // Register the space of each page source into the promise queue.
186
+ const spaces = answer . sources
187
+ . map ( ( source ) => {
188
+ if ( source . type !== 'page' ) {
189
+ return null ;
190
+ }
191
+
192
+ spaceData . registerPromise ( source . space , ( ) => {
193
+ return api . getRevisionPages ( source . space , source . revision , { metadata : false } ) ;
194
+ } ) ;
195
+
196
+ return source . space ;
197
+ } )
198
+ . filter ( filterOutNullable ) ;
187
199
188
- const encoded = await transformAnswer ( chunk . answer , spaceData ) ;
189
- yield encoded ;
200
+ // Get the pages for all spaces referenced by this answer.
201
+ const pages = await spaceData . getPromises ( spaces ) ;
202
+ yield transformAnswer ( chunk . answer , pages ) ;
190
203
}
191
204
} ) ;
192
205
@@ -198,46 +211,17 @@ export async function getRecommendedQuestions(spaceId: string): Promise<string[]
198
211
return data . questions ;
199
212
}
200
213
201
- async function transformAnswer (
214
+ function transformAnswer (
202
215
answer : SearchAIAnswer ,
203
-
204
- /**
205
- * Transforming an answer requires fetching space data so we can calculate absolute
206
- * and relative page paths. Maintain an in-memory cache of space data to avoid
207
- * refetching for the same source.
208
- */
209
- spaceData : Map < string , RevisionPage [ ] > ,
210
- ) : Promise < AskAnswerResult > {
211
- // Gather a unique set of all space IDs referenced in this answer.
212
- const spaces = answer . sources . reduce < Set < string > > ( ( set , source ) => {
213
- if ( source . type !== 'page' ) {
214
- return set ;
215
- }
216
-
217
- return set . add ( source . space ) ;
218
- } , new Set < string > ( ) ) ;
219
-
220
- // Fetch the content of all spaces referenced in this answer, if not already fetched.
221
- await pMap (
222
- spaces . values ( ) ,
223
- async ( spaceId ) => {
224
- if ( spaceData . has ( spaceId ) ) {
225
- return ;
226
- }
227
-
228
- const { pages } = await api . getSpaceContentData ( { spaceId } , undefined ) ;
229
- spaceData . set ( spaceId , pages ) ;
230
- } ,
231
- { concurrency : 1 } ,
232
- ) ;
233
-
216
+ spacePages : Map < string , RevisionPage [ ] > ,
217
+ ) : AskAnswerResult {
234
218
const sources = answer . sources
235
219
. map ( ( source ) => {
236
220
if ( source . type !== 'page' ) {
237
221
return null ;
238
222
}
239
223
240
- const pages = spaceData . get ( source . space ) ;
224
+ const pages = spacePages . get ( source . space ) ;
241
225
242
226
if ( ! pages ) {
243
227
return null ;
@@ -337,3 +321,70 @@ function transformPageResult(item: SearchPageResult, space?: Space) {
337
321
338
322
return [ page , ...sections ] ;
339
323
}
324
+
325
+ /**
326
+ * A queue for promises that performs deduplication of requests
327
+ * and ensures they run sequentially.
328
+ *
329
+ * This is useful when you want to make multiple async requests
330
+ * but want to avoid making the same request multiple times.
331
+ *
332
+ * Running the promises sequentially gives us some safety about
333
+ * hitting Cloudflare Worker's 6 parallel subrequest limits.
334
+ */
335
+ class PromiseQueue < Key extends string , Value > {
336
+ private promiseQueue : ( ( ) => Promise < Value > ) [ ] = [ ] ;
337
+ private results : Map < Key , Value > = new Map ( ) ;
338
+ private registeredKeys : Set < Key > = new Set ( ) ;
339
+ private currentExecution : Promise < void > | null = null ;
340
+
341
+ private async executeQueue ( ) {
342
+ while ( this . promiseQueue . length > 0 ) {
343
+ const promiseFn = this . promiseQueue . shift ( ) ! ;
344
+ await promiseFn ( ) ;
345
+ }
346
+ this . currentExecution = null ; // Mark the queue as idle
347
+ }
348
+
349
+ registerPromise ( key : Key , fn : ( ) => Promise < Value > ) : void {
350
+ if ( this . registeredKeys . has ( key ) ) {
351
+ return ;
352
+ }
353
+
354
+ this . registeredKeys . add ( key ) ;
355
+
356
+ const wrapperFn = async ( ) => {
357
+ const result = await fn ( ) ;
358
+ this . results . set ( key , result ) ;
359
+ return result ;
360
+ } ;
361
+
362
+ this . promiseQueue . push ( wrapperFn ) ;
363
+
364
+ // Start executing if not already running
365
+ if ( ! this . currentExecution ) {
366
+ this . currentExecution = this . executeQueue ( ) ;
367
+ }
368
+ }
369
+
370
+ /**
371
+ * Returns a promise that resolves to a map of results for the given keys.
372
+ * If the key has not been registered in the queue, it will be ignored.
373
+ */
374
+ async getPromises ( keys : Key [ ] ) : Promise < Map < Key , Value > > {
375
+ // Wait for any ongoing executions to finish
376
+ if ( this . currentExecution ) {
377
+ await this . currentExecution ;
378
+ }
379
+
380
+ return keys . reduce ( ( map , key ) => {
381
+ const data = this . results . get ( key ) ;
382
+
383
+ if ( data ) {
384
+ return map . set ( key , data ) ;
385
+ }
386
+
387
+ return map ;
388
+ } , new Map < Key , Value > ( ) ) ;
389
+ }
390
+ }
0 commit comments