Skip to content

Commit 6f38c67

Browse files
committed
Implemented a BufferedQueue, and refactored to use it for RepoManager File System Event processing. Various unit testing improvements.
1 parent b20bb3e commit 6f38c67

File tree

9 files changed

+442
-510
lines changed

9 files changed

+442
-510
lines changed

src/repoManager.ts

Lines changed: 40 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { DEFAULT_REPO_STATE, ExtensionState } from './extensionState';
66
import { Logger } from './logger';
77
import { GitRepoSet, GitRepoState } from './types';
88
import { evalPromises, getPathFromUri, pathWithTrailingSlash, realpath } from './utils';
9+
import { BufferedQueue } from './utils/bufferedQueue';
910
import { Disposable, toDisposable } from './utils/disposable';
1011
import { Event, EventEmitter } from './utils/event';
1112

@@ -22,18 +23,16 @@ export class RepoManager extends Disposable {
2223
private readonly dataSource: DataSource;
2324
private readonly extensionState: ExtensionState;
2425
private readonly logger: Logger;
26+
2527
private repos: GitRepoSet;
2628
private ignoredRepos: string[];
2729
private maxDepthOfRepoSearch: number;
30+
2831
private readonly folderWatchers: { [workspace: string]: vscode.FileSystemWatcher } = {};
2932
private readonly repoEventEmitter: EventEmitter<RepoChangeEvent>;
3033

31-
private readonly createEventQueue: string[] = [];
32-
private readonly changeEventQueue: string[] = [];
33-
private processCreateEventsTimeout: NodeJS.Timer | null = null;
34-
private processChangeEventsTimeout: NodeJS.Timer | null = null;
35-
private processingCreateEvents: boolean = false;
36-
private processingChangeEvents: boolean = false;
34+
private readonly onWatcherCreateQueue: BufferedQueue<string>;
35+
private readonly onWatcherChangeQueue: BufferedQueue<string>;
3736

3837
/**
3938
* Creates the Git Graph Repository Manager, and runs startup tasks.
@@ -49,7 +48,12 @@ export class RepoManager extends Disposable {
4948
this.repos = extensionState.getRepos();
5049
this.ignoredRepos = extensionState.getIgnoredRepos();
5150
this.maxDepthOfRepoSearch = getConfig().maxDepthOfRepoSearch;
51+
5252
this.repoEventEmitter = new EventEmitter<RepoChangeEvent>();
53+
54+
this.onWatcherCreateQueue = new BufferedQueue<string>(this.processOnWatcherCreateEvent.bind(this), this.sendRepos.bind(this));
55+
this.onWatcherChangeQueue = new BufferedQueue<string>(this.processOnWatcherChangeEvent.bind(this), this.sendRepos.bind(this));
56+
5357
this.startupTasks();
5458

5559
this.registerDisposables(
@@ -83,6 +87,12 @@ export class RepoManager extends Disposable {
8387
// Dispose the Repository Event Emitter when disposed
8488
this.repoEventEmitter,
8589

90+
// Dispose the onWatcherCreateQueue
91+
this.onWatcherCreateQueue,
92+
93+
// Dispose the onWatcherChangeQueue
94+
this.onWatcherChangeQueue,
95+
8696
// Stop watching folders when disposed
8797
toDisposable(() => {
8898
const folders = Object.keys(this.folderWatchers);
@@ -483,9 +493,9 @@ export class RepoManager extends Disposable {
483493
*/
484494
private startWatchingFolder(path: string) {
485495
let watcher = vscode.workspace.createFileSystemWatcher(path + '/**');
486-
watcher.onDidCreate(uri => this.onWatcherCreate(uri));
487-
watcher.onDidChange(uri => this.onWatcherChange(uri));
488-
watcher.onDidDelete(uri => this.onWatcherDelete(uri));
496+
watcher.onDidCreate((uri) => this.onWatcherCreate(uri));
497+
watcher.onDidChange((uri) => this.onWatcherChange(uri));
498+
watcher.onDidDelete((uri) => this.onWatcherDelete(uri));
489499
this.folderWatchers[path] = watcher;
490500
}
491501

@@ -499,53 +509,29 @@ export class RepoManager extends Disposable {
499509
}
500510

501511
/**
502-
* Process a file system creation event.
512+
* Handle a file system creation event.
503513
* @param uri The URI of the creation event.
504514
*/
505515
private onWatcherCreate(uri: vscode.Uri) {
506516
let path = getPathFromUri(uri);
507517
if (path.indexOf('/.git/') > -1) return;
508518
if (path.endsWith('/.git')) path = path.slice(0, -5);
509-
if (this.createEventQueue.indexOf(path) > -1) return;
510-
511-
this.createEventQueue.push(path);
512-
513-
if (!this.processingCreateEvents) {
514-
if (this.processCreateEventsTimeout !== null) {
515-
clearTimeout(this.processCreateEventsTimeout);
516-
}
517-
this.processCreateEventsTimeout = setTimeout(() => {
518-
this.processCreateEventsTimeout = null;
519-
this.processCreateEvents();
520-
}, 1000);
521-
}
519+
this.onWatcherCreateQueue.enqueue(path);
522520
}
523521

524522
/**
525-
* Process a file system change event.
523+
* Handle a file system change event.
526524
* @param uri The URI of the change event.
527525
*/
528526
private onWatcherChange(uri: vscode.Uri) {
529527
let path = getPathFromUri(uri);
530528
if (path.indexOf('/.git/') > -1) return;
531529
if (path.endsWith('/.git')) path = path.slice(0, -5);
532-
if (this.changeEventQueue.indexOf(path) > -1) return;
533-
534-
this.changeEventQueue.push(path);
535-
536-
if (!this.processingChangeEvents) {
537-
if (this.processChangeEventsTimeout !== null) {
538-
clearTimeout(this.processChangeEventsTimeout);
539-
}
540-
this.processChangeEventsTimeout = setTimeout(() => {
541-
this.processChangeEventsTimeout = null;
542-
this.processChangeEvents();
543-
}, 1000);
544-
}
530+
this.onWatcherChangeQueue.enqueue(path);
545531
}
546532

547533
/**
548-
* Process a file system deletion event.
534+
* Handle a file system deletion event.
549535
* @param uri The URI of the deletion event.
550536
*/
551537
private onWatcherDelete(uri: vscode.Uri) {
@@ -556,33 +542,31 @@ export class RepoManager extends Disposable {
556542
}
557543

558544
/**
559-
* Process the queue of file system creation events.
545+
* Process a file system creation event.
546+
* @param path The path of the file that was created.
547+
* @returns TRUE => Change was made. FALSE => No change was made.
560548
*/
561-
private async processCreateEvents() {
562-
this.processingCreateEvents = true;
563-
let path, changes = false;
564-
while (path = this.createEventQueue.shift()) {
565-
if (await isDirectory(path)) {
566-
if (await this.searchDirectoryForRepos(path, this.maxDepthOfRepoSearch)) changes = true;
549+
private async processOnWatcherCreateEvent(path: string) {
550+
if (await isDirectory(path)) {
551+
if (await this.searchDirectoryForRepos(path, this.maxDepthOfRepoSearch)) {
552+
return true;
567553
}
568554
}
569-
this.processingCreateEvents = false;
570-
if (changes) this.sendRepos();
555+
return false;
571556
}
572557

573558
/**
574-
* Process the queue of file system change events
559+
* Process a file system change event.
560+
* @param path The path of the file that was changed.
561+
* @returns TRUE => Change was made. FALSE => No change was made.
575562
*/
576-
private async processChangeEvents() {
577-
this.processingChangeEvents = true;
578-
let path, changes = false;
579-
while (path = this.changeEventQueue.shift()) {
580-
if (!await doesPathExist(path)) {
581-
if (this.removeReposWithinFolder(path)) changes = true;
563+
private async processOnWatcherChangeEvent(path: string) {
564+
if (!await doesPathExist(path)) {
565+
if (this.removeReposWithinFolder(path)) {
566+
return true;
582567
}
583568
}
584-
this.processingChangeEvents = false;
585-
if (changes) this.sendRepos();
569+
return false;
586570
}
587571
}
588572

src/utils/bufferedQueue.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { Disposable, toDisposable } from './disposable';
2+
3+
/**
4+
* Represents a BufferedQueue, which is queue that buffers items for a short period of time before processing them.
5+
*/
6+
export class BufferedQueue<T> extends Disposable {
7+
private readonly queue: T[] = [];
8+
private timeout: NodeJS.Timer | null = null;
9+
private processing: boolean = false;
10+
11+
private readonly bufferDuration: number;
12+
private onItem: (item: T) => Promise<boolean>;
13+
private onChanges: () => void;
14+
15+
/**
16+
* Constructs a BufferedQueue instance.
17+
* @param onItem A callback invoked to process an item in the queue.
18+
* @param onChanges A callback invoked when a change was indicated by onItem.
19+
* @param bufferDuration The number of milliseconds to buffer items in the queue.
20+
* @returns The BufferedQueue instance.
21+
*/
22+
constructor(onItem: (item: T) => Promise<boolean>, onChanges: () => void, bufferDuration: number = 1000) {
23+
super();
24+
this.bufferDuration = bufferDuration;
25+
this.onItem = onItem;
26+
this.onChanges = onChanges;
27+
28+
this.registerDisposable(toDisposable(() => {
29+
if (this.timeout !== null) {
30+
clearTimeout(this.timeout);
31+
this.timeout = null;
32+
}
33+
}));
34+
}
35+
36+
/**
37+
* Enqueue an item if it doesn't already exist in the queue.
38+
* @param item The item to enqueue.
39+
*/
40+
public enqueue(item: T) {
41+
const itemIndex = this.queue.indexOf(item);
42+
if (itemIndex > -1) {
43+
this.queue.splice(itemIndex, 1);
44+
}
45+
this.queue.push(item);
46+
47+
if (!this.processing) {
48+
if (this.timeout !== null) {
49+
clearTimeout(this.timeout);
50+
}
51+
this.timeout = setTimeout(() => {
52+
this.timeout = null;
53+
this.run();
54+
}, this.bufferDuration);
55+
}
56+
}
57+
58+
/**
59+
* Process all of the items that are currently queued, and call the onChanges callback if any of the items resulted in a change
60+
*/
61+
private async run() {
62+
this.processing = true;
63+
let item, changes = false;
64+
while (item = this.queue.shift()) {
65+
if (await this.onItem(item)) {
66+
changes = true;
67+
}
68+
}
69+
this.processing = false;
70+
if (changes) this.onChanges();
71+
}
72+
}

tests/avatarManager.test.ts

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { waitForExpect } from './helpers/expectations';
2+
13
import * as date from './mocks/date';
24
import * as vscode from './mocks/vscode';
35
jest.mock('vscode', () => vscode, { virtual: true });
@@ -1737,24 +1739,6 @@ function expectFileToHaveBeenRead(name: string) {
17371739
expect(spyOnReadFile.mock.calls.some((args) => args[0] === name)).toBe(true);
17381740
}
17391741

1740-
function waitForExpect(expect: () => void) {
1741-
return new Promise((resolve, reject) => {
1742-
let attempts = 0;
1743-
const testInterval = setInterval(async () => {
1744-
try {
1745-
attempts++;
1746-
expect();
1747-
resolve();
1748-
} catch (e) {
1749-
if (attempts === 100) {
1750-
clearInterval(testInterval);
1751-
reject(e);
1752-
}
1753-
}
1754-
}, 50);
1755-
});
1756-
}
1757-
17581742
function waitForEvents(avatarManager: AvatarManager, n: number, runPendingTimers = false) {
17591743
return new Promise<AvatarEvent[]>((resolve) => {
17601744
const events: AvatarEvent[] = [];

0 commit comments

Comments
 (0)