Skip to content

Commit 271f7d1

Browse files
Merge branch 'release/4.4.1'
2 parents 7f832ef + c9f5cc8 commit 271f7d1

File tree

4 files changed

+143
-28
lines changed

4 files changed

+143
-28
lines changed

azure-pipelines.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ steps:
6262
deleteIntegrationEnvironmentScriptFilePath: "./build/integration-test.ps1"
6363
deleteIntegrationEnvironmentScriptArguments: "-Action TearDown"
6464
publishCodeCoverage: true
65-
useGitVersionDotNetTool: true
65+
useGitVersionDotNetTool: true
66+
publishToDevFeed: true

docs/4.4.1-release-notes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Fixes
2+
- Reduce TooManyRequests exceptions thrown by Azure OpenAI
3+
- Use distributed locks to control rate limits across pods

src/ExternalSearch.Providers.AzureOpenAI/AzureOpenAIExternalSearchProvider.cs

Lines changed: 86 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,12 @@ public IEnumerable<IExternalSearchQueryResult> ExecuteSearch(ExecutionContext co
7878
IDictionary<string, object> config, IProvider provider)
7979
{
8080
var jobData = new AzureOpenAIExternalSearchJobData(config);
81-
return InternalExecuteSearch(context, query, jobData);
81+
82+
return ActionExtensions.ExecuteWithRetry(
83+
() => InternalExecuteSearch(context, query, jobData).ToArray(), // important we need to materialize the enumerable for ExecuteWithRetry to work
84+
retryCount: 1000,
85+
isTransient: ex => ex.ToString().Contains("TooManyRequests") // the core will retry but only 3 times
86+
);
8287
}
8388

8489
public IEnumerable<Clue> BuildClues(ExecutionContext context, IExternalSearchQuery query,
@@ -228,7 +233,7 @@ public ConnectionVerificationResult VerifyConnection(ExecutionContext context,
228233
return new ConnectionVerificationResult(false, "Prompt must contain at least one output. eg, {output:vocabulary:product.description}");
229234
}
230235

231-
var deploymentSupportsCompletion = DeploymentSupportsCompletion(context, deploymentName);
236+
var deploymentSupportsCompletion = DeploymentSupportsCompletion(context, deploymentName, baseUrl);
232237

233238
prompt = "Hello";
234239

@@ -253,10 +258,8 @@ public ConnectionVerificationResult VerifyConnection(ExecutionContext context,
253258
}
254259
}
255260

256-
private bool DeploymentSupportsCompletion(ExecutionContext executionContext, string deploymentName)
261+
private bool DeploymentSupportsCompletion(ExecutionContext executionContext, string deploymentName, string baseUrl)
257262
{
258-
var baseUrl = executionContext.Organization.Settings.GetValue("OpenAiBaseUrl", "OpenAiBaseUrl", "");
259-
260263
var deploymentSupportsCompletionCacheKey = $"{nameof(DeploymentSupportsCompletion)}_{executionContext.Organization.Id}_{baseUrl}_{deploymentName}";
261264
if (_cache.TryGetValue(deploymentSupportsCompletionCacheKey, out var cached) && cached != null)
262265
{
@@ -412,37 +415,61 @@ private IEnumerable<IExternalSearchQueryResult> InternalExecuteSearch(ExecutionC
412415

413416
using (context.Log.BeginScope("{0} {1}: query {2}", GetType().Name, "ExecuteSearch", query))
414417
{
415-
context.Log.LogTrace("Starting external search for Id: '{Id}' QueryKey: '{QueryKey}'", query.Id, query.QueryKey);
416-
417418
var prompt = query.QueryParameters["prompt"].Single();
418419
var deploymentName = query.QueryParameters["deploymentName"].Single();
420+
var baseUrl = context.Organization.Settings.GetValue("OpenAiBaseUrl", "OpenAiBaseUrl", "");
419421

420-
var deploymentSupportsCompletion = DeploymentSupportsCompletion(context, deploymentName);
421-
422-
var response = deploymentSupportsCompletion
423-
? QueryInternalUsingCompletionApi(context, deploymentName, prompt)
424-
: QueryInternalUsingChatApi(context, deploymentName, prompt);
425-
426-
JObject jsonResponse;
427-
428-
try
422+
using (new DistributedLock(context, $"{nameof(AzureOpenAIExternalSearchProvider)}_{baseUrl}_{deploymentName}", exclusive: false))
429423
{
430-
jsonResponse = JObject.Parse(response);
431-
}
432-
catch
433-
{
434-
prompt +=
435-
"\n\nImportant: A prior attempt to answer this question resulted in malformed JSON. Please retry and verify that the output adheres strictly to the specified JSON format. The response must consist solely of valid JSON, as it will be programmatically processed.";
424+
context.Log.LogTrace("Starting external search for Id: '{Id}' QueryKey: '{QueryKey}'", query.Id, query.QueryKey);
425+
426+
var deploymentSupportsCompletion = DeploymentSupportsCompletion(context, deploymentName, baseUrl);
436427

437-
response = deploymentSupportsCompletion
428+
var response = deploymentSupportsCompletion
438429
? QueryInternalUsingCompletionApi(context, deploymentName, prompt)
439430
: QueryInternalUsingChatApi(context, deploymentName, prompt);
440431

441-
jsonResponse = JObject.Parse(response);
432+
JObject jsonResponse;
433+
434+
try
435+
{
436+
jsonResponse = ParseResponse(response);
437+
}
438+
catch(Exception ex)
439+
{
440+
context.Log.LogDebug(ex, $"Failed to parse json response from AI. Will try again. response was:\n{response}");
441+
442+
prompt +=
443+
"\n\nImportant: A prior attempt to answer this question resulted in malformed JSON. Please retry and verify that the output adheres strictly to the specified JSON format. The response must consist solely of valid JSON, as it will be programmatically processed.";
444+
445+
response = deploymentSupportsCompletion
446+
? QueryInternalUsingCompletionApi(context, deploymentName, prompt)
447+
: QueryInternalUsingChatApi(context, deploymentName, prompt);
448+
449+
try
450+
{
451+
jsonResponse = ParseResponse(response);
452+
}
453+
catch (Exception ex2)
454+
{
455+
throw new Exception($"Unable to deserialize the response generated by AI. The generated response was:\n{response}", ex2);
456+
}
457+
}
458+
459+
yield return new ExternalSearchQueryResult<JObject>(query, jsonResponse);
442460
}
461+
}
462+
}
443463

444-
yield return new ExternalSearchQueryResult<JObject>(query, jsonResponse);
464+
private static JObject ParseResponse(string response)
465+
{
466+
var m = Regex.Match(response, "(?<json>{.+})", RegexOptions.Singleline);
467+
if (m.Success)
468+
{
469+
response = m.Groups["json"].Value;
445470
}
471+
472+
return JObject.Parse(response);
446473
}
447474

448475
private string QueryInternalUsingCompletionApi(ExecutionContext executionContext, string deploymentName, string prompt, bool logError = true)
@@ -470,7 +497,8 @@ private string QueryInternalUsingCompletionApi(ExecutionContext executionContext
470497

471498
if (response.StatusCode == HttpStatusCode.TooManyRequests)
472499
{
473-
Thread.Sleep(2000); // while developing we observed that the error message says to retry in 2s
500+
WaitDueToTooManyRequests(executionContext, deploymentName, response, baseUrl);
501+
474502
throw new Exception($"Too many requests - Call to openai returned HTTP {response.StatusCode}"); // hack the message must start with 'Too many requests' for the core to retry
475503
}
476504

@@ -523,7 +551,8 @@ private string QueryInternalUsingChatApi(ExecutionContext executionContext, stri
523551

524552
if (response.StatusCode == HttpStatusCode.TooManyRequests)
525553
{
526-
Thread.Sleep(2000); // while developing we observed that the error message says to retry in 2s
554+
WaitDueToTooManyRequests(executionContext, deploymentName, response, baseUrl);
555+
527556
throw new Exception($"Too many requests - Call to openai returned HTTP {response.StatusCode}"); // hack the message must start with 'Too many requests' for the core to retry
528557
}
529558

@@ -555,6 +584,36 @@ private string QueryInternalUsingChatApi(ExecutionContext executionContext, stri
555584
return content.TrimEnd();
556585
}
557586

587+
private static void WaitDueToTooManyRequests(ExecutionContext executionContext, string deploymentName, IRestResponse response, string baseUrl)
588+
{
589+
var responseAt = DateTime.Now;
590+
591+
// Try to get Retry-After from headers (Different model is having different request per minute)
592+
var retryAfterHeader = response.Headers
593+
.FirstOrDefault(h => h.Name.Equals("Retry-After", StringComparison.OrdinalIgnoreCase));
594+
595+
if (!int.TryParse(retryAfterHeader?.Value?.ToString(), out var waitSeconds))
596+
{
597+
waitSeconds = 2;
598+
}
599+
600+
using (new DistributedLock(executionContext, $"{nameof(AzureOpenAIExternalSearchProvider)}_{baseUrl}_{deploymentName}", exclusive: true))
601+
{
602+
var timeAlreadyWaited = (int)DateTime.Now.Subtract(responseAt).TotalMilliseconds;
603+
var waitTime = waitSeconds * 1000 - timeAlreadyWaited;
604+
605+
if (waitTime > 0)
606+
{
607+
executionContext.Log.Log(LogLevel.Debug, $"Sleeping thread for {waitTime}ms due to TooManyRequest response received");
608+
Thread.Sleep(waitTime);
609+
}
610+
else
611+
{
612+
executionContext.Log.Log(LogLevel.Debug, $"TooManyRequest response received however no need to sleep as another thread already blocked");
613+
}
614+
}
615+
}
616+
558617
public override IPreviewImage GetPrimaryEntityPreviewImage(ExecutionContext context,
559618
IExternalSearchQueryResult result, IExternalSearchRequest request)
560619
{
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Data;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
using CluedIn.Core;
8+
using CluedIn.Core.DataStore;
9+
using Microsoft.Data.SqlClient;
10+
11+
namespace CluedIn.ExternalSearch.Providers.AzureOpenAI
12+
{
13+
internal class DistributedLock : IDisposable
14+
{
15+
private readonly SqlConnection _connection;
16+
private readonly SqlTransaction _transaction;
17+
18+
public DistributedLock(ExecutionContext context, string lockName, bool exclusive = false)
19+
{
20+
var connectionStringKey = context.ApplicationContext.System.DataShards.GetDataShard(DataShardType.Locking).ReadConnectionString;
21+
var connectionString = context.ApplicationContext.System.ConnectionStrings.GetConnectionString(connectionStringKey);
22+
23+
_connection = new SqlConnection(connectionString);
24+
_connection.Open();
25+
_transaction = _connection.BeginTransaction();
26+
using var cmd = new SqlCommand("sp_getapplock", _connection);
27+
cmd.CommandType = CommandType.StoredProcedure;
28+
cmd.Transaction = _transaction;
29+
cmd.CommandTimeout = 0;
30+
31+
cmd.Parameters.Add(new SqlParameter("@Resource", SqlDbType.NVarChar, 255) { Value = lockName });
32+
cmd.Parameters.Add(new SqlParameter("@LockMode", SqlDbType.NVarChar, 32) { Value = exclusive ? "Exclusive" : "Shared" });
33+
cmd.Parameters.Add(new SqlParameter("@LockTimeout", SqlDbType.Int) { Value = -1 });
34+
cmd.Parameters.Add(new SqlParameter("@Result", SqlDbType.Int) { Direction = ParameterDirection.ReturnValue });
35+
36+
cmd.ExecuteNonQuery();
37+
38+
var queryResult = (int)cmd.Parameters["@Result"].Value;
39+
40+
if (queryResult < 0)
41+
{
42+
throw new ApplicationException($"sp_getapplock returned {queryResult}"); // should never happen as @LockTimeout is -1
43+
}
44+
}
45+
46+
public void Dispose()
47+
{
48+
_transaction.Rollback();
49+
_connection.Close();
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)