File tree Expand file tree Collapse file tree 3 files changed +60
-0
lines changed
packages/compass-user-data/src Expand file tree Collapse file tree 3 files changed +60
-0
lines changed Original file line number Diff line number Diff line change 1+ import { expect } from 'chai' ;
2+ import { Semaphore } from './semaphore' ;
3+
4+ describe ( 'semaphore' , function ( ) {
5+ const maxConcurrentOps = 5 ;
6+ let semaphore : Semaphore ;
7+ let taskHandler : ( id : number ) => Promise < number > ;
8+
9+ beforeEach ( ( ) => {
10+ semaphore = new Semaphore ( maxConcurrentOps ) ;
11+ taskHandler = async ( id : number ) => {
12+ const release = await semaphore . waitForRelease ( ) ;
13+ const delay = Math . floor ( Math . random ( ) * 450 ) + 50 ;
14+ try {
15+ await new Promise ( ( resolve ) => setTimeout ( resolve , delay ) ) ;
16+ return id ;
17+ } finally {
18+ release ( ) ;
19+ }
20+ } ;
21+ } ) ;
22+
23+ it ( 'should run operations concurrently' , async function ( ) {
24+ const tasks = Array . from ( { length : 10 } , ( _ , i ) => taskHandler ( i ) ) ;
25+ const results = await Promise . all ( tasks ) ;
26+ expect ( results ) . to . have . lengthOf ( 10 ) ;
27+ } ) ;
28+ } ) ;
Original file line number Diff line number Diff line change 1+ export class Semaphore {
2+ private currentCount = 0 ;
3+ private queue : ( ( ) => void ) [ ] = [ ] ;
4+ constructor ( private maxConcurrentOps : number ) { }
5+
6+ waitForRelease ( ) : Promise < ( ) => void > {
7+ return new Promise ( ( resolve ) => {
8+ const attempt = ( ) => {
9+ this . currentCount ++ ;
10+ resolve ( this . release . bind ( this ) ) ;
11+ } ;
12+ if ( this . currentCount < this . maxConcurrentOps ) {
13+ attempt ( ) ;
14+ } else {
15+ this . queue . push ( attempt ) ;
16+ }
17+ } ) ;
18+ }
19+
20+ private release ( ) {
21+ this . currentCount -- ;
22+ if ( this . queue . length > 0 ) {
23+ const next = this . queue . shift ( ) ;
24+ next && next ( ) ;
25+ }
26+ }
27+ }
Original file line number Diff line number Diff line change @@ -4,6 +4,7 @@ import { createLogger } from '@mongodb-js/compass-logging';
44import { getStoragePath } from '@mongodb-js/compass-utils' ;
55import type { z } from 'zod' ;
66import writeFile from 'write-file-atomic' ;
7+ import { Semaphore } from './semaphore' ;
78
89const { log, mongoLogId } = createLogger ( 'COMPASS-USER-STORAGE' ) ;
910
@@ -68,6 +69,7 @@ export class UserData<T extends z.Schema> {
6869 private readonly serialize : SerializeContent < z . input < T > > ;
6970 private readonly deserialize : DeserializeContent ;
7071 private readonly getFileName : GetFileName ;
72+ private readonly semaphore = new Semaphore ( 100 ) ;
7173
7274 constructor (
7375 private readonly validator : T ,
@@ -122,7 +124,9 @@ export class UserData<T extends z.Schema> {
122124 let data : string ;
123125 let stats : Stats ;
124126 let handle : fs . FileHandle | undefined = undefined ;
127+ let release : ( ( ) => void ) | undefined = undefined ;
125128 try {
129+ release = await this . semaphore . waitForRelease ( ) ;
126130 handle = await fs . open ( absolutePath , 'r' ) ;
127131 [ stats , data ] = await Promise . all ( [
128132 handle . stat ( ) ,
@@ -139,6 +143,7 @@ export class UserData<T extends z.Schema> {
139143 throw error ;
140144 } finally {
141145 await handle ?. close ( ) ;
146+ release ?.( ) ;
142147 }
143148
144149 try {
You can’t perform that action at this time.
0 commit comments