Skip to content

Commit a9a319f

Browse files
authored
fix concurrency issues in both ChatMLFetcher and CachingCompletionsFetchService (#255)
also some drive-by removal of dead code
1 parent 2bd9640 commit a9a319f

File tree

3 files changed

+132
-217
lines changed

3 files changed

+132
-217
lines changed

test/base/cachingChatMLFetcher.ts

Lines changed: 102 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,27 @@ import { IChatMLFetcher, IntentParams, Source } from '../../src/platform/chat/co
1212
import { ChatFetchResponseType, ChatLocation, ChatResponses } from '../../src/platform/chat/common/commonTypes';
1313
import { IConversationOptions } from '../../src/platform/chat/common/conversationOptions';
1414
import { getTextPart } from '../../src/platform/chat/common/globalStringUtils';
15+
import { LogLevel } from '../../src/platform/log/common/logService';
1516
import { FinishedCallback, ICopilotToolCall, IResponseDelta, OptionalChatRequestParams } from '../../src/platform/networking/common/fetch';
1617
import { IChatEndpoint } from '../../src/platform/networking/common/networking';
1718
import { ChoiceLogProbs, rawMessageToCAPI } from '../../src/platform/networking/common/openai';
1819
import { TelemetryProperties } from '../../src/platform/telemetry/common/telemetry';
1920
import { LcsDiff, LineSequence } from '../../src/util/common/diff';
21+
import { LockMap } from '../../src/util/common/lock';
2022
import { BugIndicatingError } from '../../src/util/vs/base/common/errors';
2123
import { IDisposable } from '../../src/util/vs/base/common/lifecycle';
2224
import { SyncDescriptor } from '../../src/util/vs/platform/instantiation/common/descriptors';
2325
import { IInstantiationService } from '../../src/util/vs/platform/instantiation/common/instantiation';
2426
import { CHAT_ML_CACHE_SALT } from '../cacheSalt';
2527
import { IJSONOutputPrinter } from '../jsonOutputPrinter';
2628
import { OutputType } from '../simulation/shared/sharedTypes';
29+
import { logger } from '../simulationLogger';
2730
import { computeSHA256 } from './hash';
2831
import { CacheMode, NoFetchChatMLFetcher } from './simulationContext';
2932
import { ISimulationEndpointHealth } from './simulationEndpointHealth';
3033
import { SimulationOutcomeImpl } from './simulationOutcome';
3134
import { drainStdoutAndExit } from './stdout';
3235
import { REPO_ROOT, SimulationTest } from './stest';
33-
import { logger } from '../simulationLogger';
34-
import { LogLevel } from '../../src/platform/log/common/logService';
3536

3637
export class CacheableChatRequest {
3738
public readonly hash: string;
@@ -106,6 +107,9 @@ export type ResponseWithMeta = ChatResponses & {
106107

107108

108109
export class CachingChatMLFetcher extends AbstractChatMLFetcher implements IDisposable {
110+
111+
private static readonly Locks = new LockMap();
112+
109113
private readonly fetcher: IChatMLFetcher;
110114
private isDisposed = false;
111115

@@ -167,113 +171,115 @@ export class CachingChatMLFetcher extends AbstractChatMLFetcher implements IDisp
167171
const req = new CacheableChatRequest(messages, endpoint.model, finalReqOptions, intentParams, this.extraCacheProperties);
168172
// console.log(`request with hash: ${req.hash}`);
169173

170-
let isCacheHit: boolean | undefined = undefined;
171-
if (this.cacheMode !== CacheMode.Disable) {
172-
const cacheValue = await this.cache.get(req, this.testInfo.cacheSlot);
173-
if (cacheValue) {
174-
if (cacheValue.type === ChatFetchResponseType.Success) {
175-
await finishedCb?.(cacheValue.value[0], 0, { text: cacheValue.value[0], copilotToolCalls: cacheValue.copilotFunctionCalls, logprobs: cacheValue.logprobs });
176-
} else if (cacheValue.type === ChatFetchResponseType.Length) {
177-
await finishedCb?.(cacheValue.truncatedValue, 0, { text: cacheValue.truncatedValue, copilotToolCalls: cacheValue.copilotFunctionCalls, logprobs: cacheValue.logprobs });
174+
return CachingChatMLFetcher.Locks.withLock(req.hash, async () => {
175+
let isCacheHit: boolean | undefined = undefined;
176+
if (this.cacheMode !== CacheMode.Disable) {
177+
const cacheValue = await this.cache.get(req, this.testInfo.cacheSlot);
178+
if (cacheValue) {
179+
if (cacheValue.type === ChatFetchResponseType.Success) {
180+
await finishedCb?.(cacheValue.value[0], 0, { text: cacheValue.value[0], copilotToolCalls: cacheValue.copilotFunctionCalls, logprobs: cacheValue.logprobs });
181+
} else if (cacheValue.type === ChatFetchResponseType.Length) {
182+
await finishedCb?.(cacheValue.truncatedValue, 0, { text: cacheValue.truncatedValue, copilotToolCalls: cacheValue.copilotFunctionCalls, logprobs: cacheValue.logprobs });
183+
}
184+
return { ...cacheValue, isCacheHit: true, cacheKey: req.hash };
178185
}
179-
return { ...cacheValue, isCacheHit: true, cacheKey: req.hash };
180-
}
181-
isCacheHit = false;
182-
}
183-
184-
if (this.cacheMode === CacheMode.Require) {
185-
let diff: { newRequest: string; oldRequest: string } | undefined;
186-
try {
187-
diff = await this.suggestDiffCommandForCacheMiss(req);
188-
} catch (err) {
189-
console.log(err);
186+
isCacheHit = false;
190187
}
191188

192-
console.log(JSON.stringify(messages, (key, value) => {
193-
if (typeof value === 'string') {
194-
const split = value.split(/\n/g);
195-
return split.length > 1 ? split : value;
189+
if (this.cacheMode === CacheMode.Require) {
190+
let diff: { newRequest: string; oldRequest: string } | undefined;
191+
try {
192+
diff = await this.suggestDiffCommandForCacheMiss(req);
193+
} catch (err) {
194+
console.log(err);
196195
}
197-
return value;
198-
}, 4));
199196

200-
let message = `\n✗ Cache entry not found for a request generated by test "${this.testInfo.testName}"!
197+
console.log(JSON.stringify(messages, (key, value) => {
198+
if (typeof value === 'string') {
199+
const split = value.split(/\n/g);
200+
return split.length > 1 ? split : value;
201+
}
202+
return value;
203+
}, 4));
204+
205+
let message = `\n✗ Cache entry not found for a request generated by test "${this.testInfo.testName}"!
201206
- Valid cache entries are currently required for all requests!
202207
- The missing request has the hash: ${req.hash} (cache slot ${this.testInfo.cacheSlot}, make sure to call simulate -- -n=10).
203208
`;
204-
if (diff) {
205-
message += `- Compare with the closest cache entry using \`code-insiders --diff "${diff.oldRequest}" "${diff.newRequest}"\`\n`;
206-
}
209+
if (diff) {
210+
message += `- Compare with the closest cache entry using \`code-insiders --diff "${diff.oldRequest}" "${diff.newRequest}"\`\n`;
211+
}
207212

208-
console.log(message);
209-
this.printTerminatedWithRequireCache(message);
210-
await drainStdoutAndExit(1);
211-
throw new Error(message);
212-
}
213+
console.log(message);
214+
this.printTerminatedWithRequireCache(message);
215+
await drainStdoutAndExit(1);
216+
throw new Error(message);
217+
}
213218

214-
const callbackWrapper = new FinishedCallbackWrapper(finishedCb);
215-
const start = Date.now();
216-
if (logger.shouldLog(LogLevel.Trace)) {
217-
logger.trace(`Making request:\n` + messages.map(m => ` ${m.role}: ${getTextPart(m.content)}`).join('\n'));
218-
}
219-
const result = await this.fetcher.fetchMany(
220-
debugName,
221-
messages,
222-
callbackWrapper.getCb(),
223-
token,
224-
location,
225-
endpoint,
226-
source,
227-
requestOptions,
228-
userInitiatedRequest,
229-
telemetryProperties,
230-
intentParams
231-
);
232-
const fetchingResponseTimeInMs = Date.now() - start;
233-
// Don't cache failed results
234-
if (
235-
result.type === ChatFetchResponseType.OffTopic
236-
|| result.type === ChatFetchResponseType.Filtered
237-
|| result.type === ChatFetchResponseType.Length
238-
|| result.type === ChatFetchResponseType.Success
239-
) {
240-
const cacheMetadata: CachedResponseMetadata = {
241-
testName: this.testInfo.testName,
242-
requestDuration: fetchingResponseTimeInMs,
243-
requestTime: new Date().toISOString()
244-
};
245-
const cachedResponse: CachedResponse = {
246-
...result,
247-
cacheMetadata,
248-
copilotFunctionCalls: callbackWrapper.copilotFunctionCalls,
249-
logprobs: callbackWrapper.logprobs,
250-
};
251-
if (!(this.fetcher instanceof NoFetchChatMLFetcher)) {
252-
try {
253-
await this.cache.set(req, this.testInfo.cacheSlot, cachedResponse);
254-
} catch (err) {
255-
if (/Key already exists/.test(err.message)) {
256-
console.log(JSON.stringify(messages, (key, value) => {
257-
if (typeof value === 'string') {
258-
const split = value.split(/\n/g);
259-
return split.length > 1 ? split : value;
260-
}
261-
return value;
262-
}, 4));
263-
console.log(`\n✗ ${err.message}`);
264-
await drainStdoutAndExit(1);
219+
const callbackWrapper = new FinishedCallbackWrapper(finishedCb);
220+
const start = Date.now();
221+
if (logger.shouldLog(LogLevel.Trace)) {
222+
logger.trace(`Making request:\n` + messages.map(m => ` ${m.role}: ${getTextPart(m.content)}`).join('\n'));
223+
}
224+
const result = await this.fetcher.fetchMany(
225+
debugName,
226+
messages,
227+
callbackWrapper.getCb(),
228+
token,
229+
location,
230+
endpoint,
231+
source,
232+
requestOptions,
233+
userInitiatedRequest,
234+
telemetryProperties,
235+
intentParams
236+
);
237+
const fetchingResponseTimeInMs = Date.now() - start;
238+
// Don't cache failed results
239+
if (
240+
result.type === ChatFetchResponseType.OffTopic
241+
|| result.type === ChatFetchResponseType.Filtered
242+
|| result.type === ChatFetchResponseType.Length
243+
|| result.type === ChatFetchResponseType.Success
244+
) {
245+
const cacheMetadata: CachedResponseMetadata = {
246+
testName: this.testInfo.testName,
247+
requestDuration: fetchingResponseTimeInMs,
248+
requestTime: new Date().toISOString()
249+
};
250+
const cachedResponse: CachedResponse = {
251+
...result,
252+
cacheMetadata,
253+
copilotFunctionCalls: callbackWrapper.copilotFunctionCalls,
254+
logprobs: callbackWrapper.logprobs,
255+
};
256+
if (!(this.fetcher instanceof NoFetchChatMLFetcher)) {
257+
try {
258+
await this.cache.set(req, this.testInfo.cacheSlot, cachedResponse);
259+
} catch (err) {
260+
if (/Key already exists/.test(err.message)) {
261+
console.log(JSON.stringify(messages, (key, value) => {
262+
if (typeof value === 'string') {
263+
const split = value.split(/\n/g);
264+
return split.length > 1 ? split : value;
265+
}
266+
return value;
267+
}, 4));
268+
console.log(`\n✗ ${err.message}`);
269+
await drainStdoutAndExit(1);
270+
}
271+
272+
throw err;
265273
}
266-
267-
throw err;
274+
return { ...result, cacheMetadata, isCacheHit, cacheKey: req.hash };
268275
}
269-
return { ...result, cacheMetadata, isCacheHit, cacheKey: req.hash };
276+
} else {
277+
// A request failed, so we don't want to cache it.
278+
// But we should warn the developer that they need to rerun
279+
this.simulationEndpointHealth.markFailure(this.testInfo, result);
270280
}
271-
} else {
272-
// A request failed, so we don't want to cache it.
273-
// But we should warn the developer that they need to rerun
274-
this.simulationEndpointHealth.markFailure(this.testInfo, result);
275-
}
276-
return { ...result, isCacheHit };
281+
return { ...result, isCacheHit };
282+
});
277283
}
278284

279285
private async suggestDiffCommandForCacheMiss(req: CacheableChatRequest) {

test/base/cachingCompletionsFetchService.ts

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { CancellationToken } from '../../src/util/vs/base/common/cancellation';
1818
import { IJSONOutputPrinter } from '../jsonOutputPrinter';
1919
import { InterceptedRequest, ISerialisedChatResponse, OutputType } from '../simulation/shared/sharedTypes';
2020

21+
import { LockMap } from '../../src/util/common/lock';
2122
import { assertType } from '../../src/util/vs/base/common/types';
2223
import { OPENAI_FETCHER_CACHE_SALT } from '../cacheSalt';
2324
import { CachedResponseMetadata, CachedTestInfo } from './cachingChatMLFetcher';
@@ -44,6 +45,8 @@ export class CacheableCompletionRequest {
4445

4546
export class CachingCompletionsFetchService extends CompletionsFetchService {
4647

48+
private static readonly Locks = new LockMap();
49+
4750
/** Throttle per URL (currently set to send a request only once a second) */
4851
private static readonly throttlers = new CachedFunction(
4952
function createThrottler(url: string) {
@@ -153,41 +156,43 @@ export class CachingCompletionsFetchService extends CompletionsFetchService {
153156
return this._fetchFromUrlAndCache(request, url, options, ct);
154157
}
155158

156-
const cachedValue = await this.nesCache.get(request, this.testInfo.cacheSlot);
157-
if (cachedValue) {
158-
this.requests.set(options.requestId, { request, hitsCache: true });
159-
return Result.ok(ICacheableCompletionsResponse.toFetchResponse(cachedValue));
160-
}
161-
162-
if (this.cacheMode === CacheMode.Require) {
163-
console.log(JSON.stringify(options.body, (key, value) => {
164-
if (typeof value === 'string') {
165-
const split = value.split(/\n/g);
166-
return split.length > 1 ? split : value;
167-
}
168-
return value;
169-
}, 4));
170-
await this.throwCacheMissing(request);
171-
}
159+
return CachingCompletionsFetchService.Locks.withLock(request.hash, async () => {
160+
const cachedValue = await this.nesCache.get(request, this.testInfo.cacheSlot);
161+
if (cachedValue) {
162+
this.requests.set(options.requestId, { request, hitsCache: true });
163+
return Result.ok(ICacheableCompletionsResponse.toFetchResponse(cachedValue));
164+
}
172165

173-
try {
174-
this.requests.set(options.requestId, { request, hitsCache: false });
175-
} catch (err) {
176-
if (/Key already exists/.test(err.message)) {
166+
if (this.cacheMode === CacheMode.Require) {
177167
console.log(JSON.stringify(options.body, (key, value) => {
178168
if (typeof value === 'string') {
179169
const split = value.split(/\n/g);
180170
return split.length > 1 ? split : value;
181171
}
182172
return value;
183173
}, 4));
184-
console.log(`\n✗ ${err.message}`);
185-
await drainStdoutAndExit(1);
174+
await this.throwCacheMissing(request);
186175
}
187176

188-
throw err;
189-
}
190-
return this._fetchFromUrlAndCache(request, url, options, ct);
177+
try {
178+
this.requests.set(options.requestId, { request, hitsCache: false });
179+
} catch (err) {
180+
if (/Key already exists/.test(err.message)) {
181+
console.log(JSON.stringify(options.body, (key, value) => {
182+
if (typeof value === 'string') {
183+
const split = value.split(/\n/g);
184+
return split.length > 1 ? split : value;
185+
}
186+
return value;
187+
}, 4));
188+
console.log(`\n✗ ${err.message}`);
189+
await drainStdoutAndExit(1);
190+
}
191+
192+
throw err;
193+
}
194+
return this._fetchFromUrlAndCache(request, url, options, ct);
195+
});
191196
}
192197

193198
private async _fetchFromUrlAndCache(

0 commit comments

Comments
 (0)