Skip to content

Commit 7e67f65

Browse files
SNOW-1640968 Make connection finaliser a not blocking operation (#1009)
1 parent 3707fa0 commit 7e67f65

File tree

5 files changed

+187
-35
lines changed

5 files changed

+187
-35
lines changed

Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,6 +2310,54 @@ public void TestOpenAsyncThrowExceptionWhenOperationIsCancelled()
23102310
Assert.AreEqual(ConnectionState.Closed, connection.State);
23112311
}
23122312
}
2313+
2314+
[Test]
2315+
public void TestCloseSessionWhenGarbageCollectorFinalizesConnection()
2316+
{
2317+
// arrange
2318+
var session = GetSessionFromForgottenConnection();
2319+
Assert.NotNull(session);
2320+
Assert.NotNull(session.sessionId);
2321+
Assert.NotNull(session.sessionToken);
2322+
2323+
// act
2324+
GC.Collect();
2325+
Awaiter.WaitUntilConditionOrTimeout(() => session.sessionToken == null, TimeSpan.FromSeconds(15));
2326+
2327+
// assert
2328+
Assert.IsNull(session.sessionToken);
2329+
}
2330+
2331+
private SFSession GetSessionFromForgottenConnection()
2332+
{
2333+
var connection = new SnowflakeDbConnection(ConnectionString + ";poolingEnabled=false;application=TestGarbageCollectorCloseSession");
2334+
connection.Open();
2335+
return connection.SfSession;
2336+
}
2337+
2338+
[Test]
2339+
public void TestHangingCloseIsNotBlocking()
2340+
{
2341+
// arrange
2342+
var restRequester = new MockCloseHangingRestRequester();
2343+
var session = new SFSession("account=test;user=test;password=test", null, restRequester);
2344+
session.Open();
2345+
var watchClose = new Stopwatch();
2346+
var watchClosedFinished = new Stopwatch();
2347+
2348+
// act
2349+
watchClose.Start();
2350+
watchClosedFinished.Start();
2351+
session.CloseNonBlocking();
2352+
watchClose.Stop();
2353+
Awaiter.WaitUntilConditionOrTimeout(() => restRequester.CloseRequests.Count > 0, TimeSpan.FromSeconds(15));
2354+
watchClosedFinished.Stop();
2355+
2356+
// assert
2357+
Assert.AreEqual(1, restRequester.CloseRequests.Count);
2358+
Assert.Less(watchClose.Elapsed.Duration(), TimeSpan.FromSeconds(5)); // close executed immediately
2359+
Assert.GreaterOrEqual(watchClosedFinished.Elapsed.Duration(), TimeSpan.FromSeconds(10)); // while background task took more time
2360+
}
23132361
}
23142362
}
23152363

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Net.Http;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Snowflake.Data.Core;
7+
8+
namespace Snowflake.Data.Tests.Mock
9+
{
10+
internal class MockCloseHangingRestRequester: IMockRestRequester
11+
{
12+
internal List<SFRestRequest> CloseRequests { get; } = new();
13+
14+
public T Get<T>(IRestRequest request)
15+
{
16+
return Task.Run(async () => await (GetAsync<T>(request, CancellationToken.None)).ConfigureAwait(false)).Result;
17+
}
18+
19+
public Task<T> GetAsync<T>(IRestRequest request, CancellationToken cancellationToken)
20+
{
21+
return Task.FromResult<T>((T)(object)null);
22+
}
23+
24+
public Task<HttpResponseMessage> GetAsync(IRestRequest request, CancellationToken cancellationToken)
25+
{
26+
return Task.FromResult<HttpResponseMessage>(null);
27+
}
28+
29+
public HttpResponseMessage Get(IRestRequest request)
30+
{
31+
return null;
32+
}
33+
34+
public T Post<T>(IRestRequest postRequest)
35+
{
36+
return Task.Run(async () => await (PostAsync<T>(postRequest, CancellationToken.None)).ConfigureAwait(false)).Result;
37+
}
38+
39+
public Task<T> PostAsync<T>(IRestRequest postRequest, CancellationToken cancellationToken)
40+
{
41+
SFRestRequest sfRequest = (SFRestRequest)postRequest;
42+
if (sfRequest.jsonBody is LoginRequest)
43+
{
44+
LoginResponse authnResponse = new LoginResponse
45+
{
46+
data = new LoginResponseData()
47+
{
48+
sessionId = "123456789",
49+
token = "session_token",
50+
masterToken = "master_token",
51+
authResponseSessionInfo = new SessionInfo(),
52+
nameValueParameter = new List<NameValueParameter>()
53+
},
54+
success = true
55+
};
56+
57+
// login request return success
58+
return Task.FromResult<T>((T)(object)authnResponse);
59+
}
60+
61+
if (sfRequest.Url.Query.StartsWith("?delete=true"))
62+
{
63+
var closeResponse = new CloseResponse()
64+
{
65+
code = 390111,
66+
message = "Session no longer exists. New login required to access the service",
67+
success = false
68+
};
69+
Thread.Sleep(TimeSpan.FromSeconds(10));
70+
CloseRequests.Add(sfRequest);
71+
return Task.FromResult<T>((T)(object)closeResponse);
72+
}
73+
74+
throw new NotImplementedException();
75+
}
76+
77+
public void setHttpClient(HttpClient httpClient)
78+
{
79+
// Nothing to do
80+
}
81+
}
82+
}

Snowflake.Data.Tests/UnitTests/SFSessionTest.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@ class SFSessionTest
1616
[Test]
1717
public void TestSessionGoneWhenClose()
1818
{
19-
Mock.MockCloseSessionGone restRequester = new Mock.MockCloseSessionGone();
19+
var restRequester = new MockCloseSessionGone();
2020
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
2121
sfSession.Open();
2222
Assert.DoesNotThrow(() => sfSession.close());
2323
}
2424

25+
[Test]
26+
public void TestSessionGoneWhenCloseNonBlocking()
27+
{
28+
var restRequester = new MockCloseSessionGone();
29+
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
30+
sfSession.Open();
31+
Assert.DoesNotThrow(() => sfSession.CloseNonBlocking());
32+
}
33+
2534
[Test]
2635
public void TestUpdateSessionProperties()
2736
{

Snowflake.Data/Client/SnowflakeDbConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ protected override void Dispose(bool disposing)
397397
}
398398
else
399399
{
400-
SfSession?.close();
400+
SfSession?.CloseNonBlocking();
401401
SfSession = null;
402402
_connectionState = ConnectionState.Closed;
403403
}

Snowflake.Data/Core/Session/SFSession.cs

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -279,62 +279,75 @@ internal void close()
279279
if (!IsEstablished()) return;
280280
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
281281
stopHeartBeatForThisSession();
282+
var closeSessionRequest = PrepareCloseSessionRequest();
283+
PostCloseSession(closeSessionRequest, restRequester);
284+
sessionToken = null;
285+
}
282286

283-
// Send a close session request
284-
var queryParams = new Dictionary<string, string>();
285-
queryParams[RestParams.SF_QUERY_SESSION_DELETE] = "true";
286-
queryParams[RestParams.SF_QUERY_REQUEST_ID] = Guid.NewGuid().ToString();
287-
queryParams[RestParams.SF_QUERY_REQUEST_GUID] = Guid.NewGuid().ToString();
287+
internal void CloseNonBlocking()
288+
{
289+
// Nothing to do if the session is not open
290+
if (!IsEstablished()) return;
291+
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
292+
stopHeartBeatForThisSession();
293+
var closeSessionRequest = PrepareCloseSessionRequest();
294+
Task.Run(() => PostCloseSession(closeSessionRequest, restRequester));
295+
sessionToken = null;
296+
}
288297

289-
SFRestRequest closeSessionRequest = new SFRestRequest
290-
{
291-
Url = BuildUri(RestPath.SF_SESSION_PATH, queryParams),
292-
authorizationToken = string.Format(SF_AUTHORIZATION_SNOWFLAKE_FMT, sessionToken),
293-
sid = sessionId
294-
};
298+
internal async Task CloseAsync(CancellationToken cancellationToken)
299+
{
300+
// Nothing to do if the session is not open
301+
if (!IsEstablished()) return;
302+
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
303+
stopHeartBeatForThisSession();
304+
305+
var closeSessionRequest = PrepareCloseSessionRequest();
295306

296-
logger.Debug($"Send closeSessionRequest");
297-
var response = restRequester.Post<CloseResponse>(closeSessionRequest);
307+
logger.Debug($"Closing session async");
308+
var response = await restRequester.PostAsync<CloseResponse>(closeSessionRequest, cancellationToken).ConfigureAwait(false);
298309
if (!response.success)
299310
{
300-
logger.Debug($"Failed to delete session: {sessionId}, error ignored. Code: {response.code} Message: {response.message}");
311+
logger.Error($"Failed to close session {sessionId}, error ignored. Code: {response.code} Message: {response.message}");
301312
}
302313

303314
logger.Debug($"Session closed: {sessionId}");
304-
// Just in case the session won't be closed twice
305315
sessionToken = null;
306316
}
307317

308-
internal async Task CloseAsync(CancellationToken cancellationToken)
318+
private static void PostCloseSession(SFRestRequest closeSessionRequest, IRestRequester restRequester)
309319
{
310-
// Nothing to do if the session is not open
311-
if (!IsEstablished()) return;
312-
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
313-
stopHeartBeatForThisSession();
320+
try
321+
{
322+
logger.Debug($"Closing session");
323+
var response = restRequester.Post<CloseResponse>(closeSessionRequest);
324+
if (!response.success)
325+
{
326+
logger.Error($"Failed to close session: {closeSessionRequest.sid}, error ignored. Code: {response.code} Message: {response.message}");
327+
}
314328

315-
// Send a close session request
329+
logger.Debug($"Session closed: {closeSessionRequest.sid}");
330+
}
331+
catch (Exception)
332+
{
333+
logger.Error($"Failed to close session: {closeSessionRequest.sid}, because of exception.");
334+
throw;
335+
}
336+
}
337+
338+
private SFRestRequest PrepareCloseSessionRequest()
339+
{
316340
var queryParams = new Dictionary<string, string>();
317341
queryParams[RestParams.SF_QUERY_SESSION_DELETE] = "true";
318342
queryParams[RestParams.SF_QUERY_REQUEST_ID] = Guid.NewGuid().ToString();
319343
queryParams[RestParams.SF_QUERY_REQUEST_GUID] = Guid.NewGuid().ToString();
320344

321-
SFRestRequest closeSessionRequest = new SFRestRequest()
345+
return new SFRestRequest
322346
{
323347
Url = BuildUri(RestPath.SF_SESSION_PATH, queryParams),
324348
authorizationToken = string.Format(SF_AUTHORIZATION_SNOWFLAKE_FMT, sessionToken),
325349
sid = sessionId
326350
};
327-
328-
logger.Debug($"Send async closeSessionRequest");
329-
var response = await restRequester.PostAsync<CloseResponse>(closeSessionRequest, cancellationToken).ConfigureAwait(false);
330-
if (!response.success)
331-
{
332-
logger.Debug($"Failed to delete session {sessionId}, error ignored. Code: {response.code} Message: {response.message}");
333-
}
334-
335-
logger.Debug($"Session closed: {sessionId}");
336-
// Just in case the session won't be closed twice
337-
sessionToken = null;
338351
}
339352

340353
internal bool IsEstablished() => sessionToken != null;

0 commit comments

Comments
 (0)