Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 37 additions & 68 deletions tests/accuracy/sdk/accuracy-result-storage/disk-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ export class DiskBasedResultStorage implements AccuracyResultStorage {

async updateRunStatus(commitSHA: string, runId: string, status: AccuracyRunStatuses): Promise<void> {
const resultFilePath = this.getAccuracyResultFilePath(commitSHA, runId);
let releaseLock: (() => Promise<void>) | undefined;
try {
releaseLock = await lock(resultFilePath, { retries: 10 });
await this.withFileLock(resultFilePath, async () => {
const accuracyResult = await this.getAccuracyResult(commitSHA, runId, false);
if (!accuracyResult) {
throw new Error("Results not found!");
Expand All @@ -78,23 +76,16 @@ export class DiskBasedResultStorage implements AccuracyResultStorage {
),
{ encoding: "utf8" }
);
} catch (error) {
console.warn(
`Could not update run status to ${status} for commit - ${commitSHA}, runId - ${runId}.`,
error
);
throw error;
} finally {
await releaseLock?.();
}
});

// This bit is important to mark the current run as the latest run for a
// commit so that we can use that during baseline comparison.
if (status === AccuracyRunStatus.Done) {
await this.atomicUpdateLink(
this.getAccuracyResultFilePath(commitSHA, runId),
this.getLatestResultFilePath(commitSHA)
);
const latestResultFilePath = this.getLatestResultFilePath(commitSHA);
await this.withFileLock(latestResultFilePath, async () => {
await fs.unlink(latestResultFilePath);
await fs.link(resultFilePath, latestResultFilePath);
});
}
}

Expand Down Expand Up @@ -134,50 +125,36 @@ export class DiskBasedResultStorage implements AccuracyResultStorage {
return;
}

let releaseLock: (() => Promise<void>) | undefined;
try {
releaseLock = await lock(resultFilePath, { retries: 10 });
const accuracyResult = await this.getAccuracyResult(commitSHA, runId, false);
await this.withFileLock(resultFilePath, async () => {
let accuracyResult = await this.getAccuracyResult(commitSHA, runId, false);
if (!accuracyResult) {
throw new Error("Expected at-least initial accuracy result to be present");
}

const existingPromptIdx = accuracyResult.promptResults.findIndex((result) => result.prompt === prompt);
const promptResult = accuracyResult.promptResults[existingPromptIdx];
if (!promptResult) {
return await fs.writeFile(
resultFilePath,
JSON.stringify(
if (promptResult) {
accuracyResult.promptResults.splice(existingPromptIdx, 1, {
prompt: promptResult.prompt,
expectedToolCalls: promptResult.expectedToolCalls,
modelResponses: [...promptResult.modelResponses, modelResponse],
});
} else {
accuracyResult = {
...accuracyResult,
promptResults: [
...accuracyResult.promptResults,
{
...accuracyResult,
promptResults: [
...accuracyResult.promptResults,
{
prompt,
expectedToolCalls,
modelResponses: [modelResponse],
},
],
prompt,
expectedToolCalls,
modelResponses: [modelResponse],
},
null,
2
)
);
],
};
}

accuracyResult.promptResults.splice(existingPromptIdx, 1, {
prompt: promptResult.prompt,
expectedToolCalls: promptResult.expectedToolCalls,
modelResponses: [...promptResult.modelResponses, modelResponse],
});

return await fs.writeFile(resultFilePath, JSON.stringify(accuracyResult, null, 2));
} catch (error) {
console.warn(`Could not save model response for commit - ${commitSHA}, runId - ${runId}.`, error);
throw error;
} finally {
await releaseLock?.();
}
await fs.writeFile(resultFilePath, JSON.stringify(accuracyResult, null, 2));
});
}

close(): Promise<void> {
Expand Down Expand Up @@ -206,20 +183,16 @@ export class DiskBasedResultStorage implements AccuracyResultStorage {
}
}

private async atomicUpdateLink(filePath: string, linkPath: string) {
for (let attempt = 0; attempt < 10; attempt++) {
try {
const tempLinkPath = `${linkPath}~${Date.now()}`;
await fs.link(filePath, tempLinkPath);
await fs.rename(tempLinkPath, linkPath);
return;
} catch (error) {
if (attempt < 10) {
await this.waitFor(100 + Math.random() * 200);
} else {
throw error;
}
}
private async withFileLock(filePath: string, callback: () => Promise<void>): Promise<void> {
let releaseLock: (() => Promise<void>) | undefined;
try {
releaseLock = await lock(filePath, { retries: 10 });
await callback();
} catch (error) {
console.warn(`Could not acquire lock for file - ${filePath}.`, error);
throw error;
} finally {
await releaseLock?.();
}
}

Expand All @@ -230,8 +203,4 @@ export class DiskBasedResultStorage implements AccuracyResultStorage {
private getLatestResultFilePath(commitSHA: string): string {
return path.join(ACCURACY_RESULTS_DIR, commitSHA, `${LATEST_ACCURACY_RUN_NAME}.json`);
}

private waitFor(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Loading