@@ -8,29 +8,50 @@ namespace Cnblogs.DashScope.Core;
8
8
public sealed class DashScopeClientWebSocketPool : IDisposable
9
9
{
10
10
private readonly ConcurrentBag < DashScopeClientWebSocket > _available = new ( ) ;
11
- private readonly ConcurrentBag < DashScopeClientWebSocket > _active = new ( ) ;
11
+ private readonly ConcurrentDictionary < Guid , DashScopeClientWebSocket > _active = new ( ) ;
12
12
private readonly DashScopeOptions _options ;
13
+ private readonly IDashScopeClientWebSocketFactory _dashScopeClientWebSocketFactory ;
13
14
14
15
/// <summary>
15
16
/// Socket pool for DashScope API.
16
17
/// </summary>
18
+ /// <param name="dashScopeClientWebSocketFactory"></param>
17
19
/// <param name="options">Options for DashScope sdk.</param>
18
- public DashScopeClientWebSocketPool ( DashScopeOptions options )
20
+ public DashScopeClientWebSocketPool (
21
+ IDashScopeClientWebSocketFactory dashScopeClientWebSocketFactory ,
22
+ DashScopeOptions options )
19
23
{
24
+ _dashScopeClientWebSocketFactory = dashScopeClientWebSocketFactory ;
20
25
_options = options ;
21
26
}
22
27
23
- internal DashScopeClientWebSocketPool ( IEnumerable < DashScopeClientWebSocket > sockets )
28
+ /// <summary>
29
+ /// Get available connection count.
30
+ /// </summary>
31
+ internal int AvailableSocketCount => _available . Count ;
32
+
33
+ /// <summary>
34
+ /// Get active connection count.
35
+ /// </summary>
36
+ internal int ActiveSocketCount => _active . Count ;
37
+
38
+ internal DashScopeClientWebSocketPool (
39
+ IEnumerable < DashScopeClientWebSocket > sockets ,
40
+ IDashScopeClientWebSocketFactory dashScopeClientWebSocketFactory )
24
41
{
25
42
_options = new DashScopeOptions ( ) ;
26
43
foreach ( var socket in sockets )
27
44
{
28
45
_available . Add ( socket ) ;
29
46
}
47
+
48
+ _dashScopeClientWebSocketFactory = dashScopeClientWebSocketFactory ;
30
49
}
31
50
32
- internal void ReturnSocketAsync ( DashScopeClientWebSocket socket )
51
+ internal void ReturnSocket ( DashScopeClientWebSocket socket )
33
52
{
53
+ _active . Remove ( socket . Id , out _ ) ;
54
+
34
55
if ( socket . State != DashScopeWebSocketState . Ready )
35
56
{
36
57
// not returnable, disposing.
@@ -45,11 +66,8 @@ internal void ReturnSocketAsync(DashScopeClientWebSocket socket)
45
66
/// Rent or create a socket connection from pool.
46
67
/// </summary>
47
68
/// <param name="cancellationToken"></param>
48
- /// <typeparam name="TOutput">The output type.</typeparam>
49
69
/// <returns></returns>
50
- public async Task < DashScopeClientWebSocketWrapper > RentSocketAsync < TOutput > (
51
- CancellationToken cancellationToken = default )
52
- where TOutput : class
70
+ public async Task < DashScopeClientWebSocketWrapper > RentSocketAsync ( CancellationToken cancellationToken = default )
53
71
{
54
72
var found = false ;
55
73
DashScopeClientWebSocket ? socket = null ;
@@ -67,7 +85,7 @@ public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
67
85
}
68
86
else
69
87
{
70
- socket = await InitializeNewSocketAsync < TOutput > ( _options . BaseWebsocketAddress , cancellationToken ) ;
88
+ socket = await InitializeNewSocketAsync ( _options . WebsocketBaseAddress , cancellationToken ) ;
71
89
found = true ;
72
90
}
73
91
}
@@ -77,22 +95,21 @@ public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
77
95
78
96
private DashScopeClientWebSocketWrapper ActivateSocket ( DashScopeClientWebSocket socket )
79
97
{
80
- _active . Add ( socket ) ;
98
+ _active . TryAdd ( socket . Id , socket ) ;
81
99
return new DashScopeClientWebSocketWrapper ( socket , this ) ;
82
100
}
83
101
84
- private async Task < DashScopeClientWebSocket > InitializeNewSocketAsync < TOutput > (
102
+ private async Task < DashScopeClientWebSocket > InitializeNewSocketAsync (
85
103
string url ,
86
104
CancellationToken cancellationToken = default )
87
- where TOutput : class
88
105
{
89
106
if ( _available . Count + _active . Count >= _options . SocketPoolSize )
90
107
{
91
108
throw new InvalidOperationException ( "[DashScopeSDK] Socket pool is full" ) ;
92
109
}
93
110
94
- var socket = new DashScopeClientWebSocket ( _options . ApiKey , _options . WorkspaceId ) ;
95
- await socket . ConnectAsync < TOutput > ( new Uri ( url ) , cancellationToken ) ;
111
+ var socket = _dashScopeClientWebSocketFactory . GetClientWebSocket ( _options . ApiKey , _options . WorkspaceId ) ;
112
+ await socket . ConnectAsync ( new Uri ( url ) , cancellationToken ) ;
96
113
return socket ;
97
114
}
98
115
@@ -107,10 +124,10 @@ private void Dispose(bool disposing)
107
124
socket ? . Dispose ( ) ;
108
125
}
109
126
110
- while ( _active . IsEmpty == false )
127
+ var activeSockets = _active . Values ;
128
+ foreach ( var activeSocket in activeSockets )
111
129
{
112
- _active . TryTake ( out var socket ) ;
113
- socket ? . Dispose ( ) ;
130
+ activeSocket . Dispose ( ) ;
114
131
}
115
132
}
116
133
}
0 commit comments