forked from Alachisoft/NCache-Solutions
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCosmosDbNotificationDependency.cs
More file actions
340 lines (296 loc) · 15.2 KB
/
CosmosDbNotificationDependency.cs
File metadata and controls
340 lines (296 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
// Copyright (c) 2020 Alachisoft
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using Alachisoft.NCache.Runtime.Dependencies;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement;
using Microsoft.Azure.Documents.Client;
using System;
using System.Collections.Generic;
using System.Net.Http;
namespace Alachisoft.NCache.Samples
{
// The CosmosDbNotificationDependency class implements NotifyExtensibleDependency and provides
// real-time synchronization with a Cosmos DB SQL collection using the Change Feed Processor (CFP)
// library
[Serializable]
public class CosmosDbNotificationDependency :
NotifyExtensibleDependency, IEquatable<CosmosDbNotificationDependency>
{
// The key of the item in the database against which we are setting dependency
private readonly string _cacheKey;
// Unique id for the dependency instance
private readonly string _dependencyId;
// The change feed processor key against which we will be saving an instance of IChangeFeedProcessor
private readonly string _changeFeedProcessorKey;
// Lock to synchronize registration and removal of dependency instance from dependency tracking data structures
private static readonly object lock_mutex =
new object();
// dependencyOnProcessors is a static dictionary that maps information on a
// change feed processor instance to the set of dependencies IDs. Everytime a
// dependency is invoked, the dependency count associated with the change
// feed processor is decremented. Once the number of dependencies for a given
// change feed processors is 0, we can stop the change feed processor.
private static readonly Dictionary<string, IChangeFeedProcessor> changefeedProcessors =
new Dictionary<string, IChangeFeedProcessor>();
// changefeedProcessors is a static dictionary that maps a key to a
// IChangeFeedProcessorInstance and the same key is used in the
// dependencyOnProcessors mapping. Once the number of dependency Ids
// associated with a key reaches 0, the same key can be used to access the
// IChangeFeedProcessor instance in the changefeedProcessors mapping and
// stop it.
private static readonly Dictionary<string, HashSet<string>> dependenciesOnProcessors =
new Dictionary<string, HashSet<string>>();
// dependencyOnKeys is a static dictionary that maps information on a
// collection key to the set of dependencies that rely on it as a single
// database document can have multiple dependent cached items
internal static readonly Dictionary<string, HashSet<CosmosDbNotificationDependency>> dependenciesOnKeys =
new Dictionary<string, HashSet<CosmosDbNotificationDependency>>();
// Equality comparer used with the dependencyOnKeys mapped set of dependencies
private static readonly CosmosDbNotificationDependencyEqualityComparer comparer =
new CosmosDbNotificationDependencyEqualityComparer();
public CosmosDbNotificationDependency(
string cacheKey, // The _id value of the document against which the dependency is set
string cacheId, // The cache Id of the cache
string monitoredUriString, // The Cosmos DB connection string where the monitored collection is deployed
string monitoredAuthKey, // The Comos DB authorization key where the monitored collection is deployed
string monitoredDatabaseId, // The Id of the database the monitored collection is included in
string monitoredContainerId, // The Id of the monitored collection
string leaseUriString, // The Cosmos DB connection string where the lease collection is deployed
string leaseAuthKey, // The Comos DB authorization key where the lease collection is deployed
string leaseDatabaseId, // The Id of the database the lease collection is included in
string leaseContainerId // The Id of the monitored collection
)
{
// Validate all the arguments
if (string.IsNullOrWhiteSpace(cacheKey))
{
throw new ArgumentNullException(nameof(cacheKey));
}
if (string.IsNullOrWhiteSpace(cacheId))
{
throw new ArgumentNullException(nameof(cacheId));
}
if (string.IsNullOrWhiteSpace(monitoredUriString))
{
throw new ArgumentNullException(nameof(monitoredUriString));
}
if (string.IsNullOrWhiteSpace(monitoredAuthKey))
{
throw new ArgumentNullException(nameof(monitoredAuthKey));
}
if (string.IsNullOrWhiteSpace(monitoredDatabaseId))
{
throw new ArgumentNullException(nameof(monitoredDatabaseId));
}
if (string.IsNullOrWhiteSpace(monitoredContainerId))
{
throw new ArgumentNullException(nameof(monitoredContainerId));
}
if (string.IsNullOrWhiteSpace(leaseUriString))
{
throw new ArgumentNullException(nameof(leaseUriString));
}
if (string.IsNullOrWhiteSpace(leaseAuthKey))
{
throw new ArgumentNullException(nameof(leaseAuthKey));
}
if (string.IsNullOrWhiteSpace(leaseDatabaseId))
{
throw new ArgumentNullException(nameof(leaseDatabaseId));
}
if (string.IsNullOrWhiteSpace(leaseContainerId))
{
throw new ArgumentNullException(nameof(leaseContainerId));
}
_dependencyId = $"cosmosDependency-{Guid.NewGuid().ToString()}";
_cacheKey = cacheKey;
_changeFeedProcessorKey = $"{monitoredUriString}-{monitoredAuthKey}-{monitoredDatabaseId}-{monitoredContainerId}-{leaseUriString}-{leaseAuthKey}-{leaseDatabaseId}-{leaseContainerId}-{cacheId}";
}
// Overall unique identifier used to track dependency against change feed
// processor instances on which the dependency is reliant
internal string DependencyId
{
get
{
return _dependencyId + _changeFeedProcessorKey + _cacheKey;
}
}
// After the constructor is exited, the NCache handler will call **Initialize** to set up the required resources
// and start monitoring for changes in the monitored collection
public override bool Initialize()
{
var dependencyKey = $"{_changeFeedProcessorKey} {_cacheKey}";
RegisterDependency(_changeFeedProcessorKey, dependencyKey, this);
return true;
}
// In case the DependencyChanged event occurs, the **DependencyDispose** method is called to clean up
// any resources. **DependencyDispose** is also called in case of explicit deletion of item due to
// expiration, eviction or a remove command from the user
protected override void DependencyDispose()
{
var dependencyKey = $"{_changeFeedProcessorKey} {_cacheKey}";
UnregisterDependency(_changeFeedProcessorKey, dependencyKey, this);
}
// Implementation of the IEquatable interface.
public bool Equals(CosmosDbNotificationDependency other)
{
if (other == null)
{
return false;
}
return _dependencyId.Equals(other._dependencyId, StringComparison.Ordinal) &&
_changeFeedProcessorKey.Equals(other._changeFeedProcessorKey, StringComparison.Ordinal) &&
_cacheKey.Equals(other._cacheKey, StringComparison.Ordinal);
}
public override bool Equals(object obj)
{
if (obj == null)
{
return false;
}
CosmosDbNotificationDependency other = obj as CosmosDbNotificationDependency;
if (other == null)
{
return false;
}
return Equals(other);
}
public override int GetHashCode()
{
string hashString = _changeFeedProcessorKey + _cacheKey + _dependencyId;
return hashString.GetHashCode();
}
// **RegisterDependency** registers the dependency with the static data structures so as to be able to track
// when a non-required change feed processors can be stopped
private static void RegisterDependency(
string changeFeedProcessorKey,
string dependencyKey,
CosmosDbNotificationDependency dependency)
{
lock (lock_mutex)
{
if (!dependenciesOnKeys.ContainsKey(dependencyKey))
{
dependenciesOnKeys.Add(dependencyKey, new HashSet<CosmosDbNotificationDependency>(comparer));
if (!dependenciesOnProcessors.ContainsKey(changeFeedProcessorKey))
{
dependenciesOnProcessors.Add(changeFeedProcessorKey, new HashSet<string>(StringComparer.Ordinal));
changefeedProcessors.Add(
changeFeedProcessorKey, CreateChangeFeedProcessor(changeFeedProcessorKey));
changefeedProcessors[changeFeedProcessorKey].StartAsync().Wait();
}
}
dependenciesOnKeys[dependencyKey].Add(dependency);
dependenciesOnProcessors[changeFeedProcessorKey].Add(dependency.DependencyId);
}
}
// In case the **DependencyDispose** method is called, the dependency is removed from list of registered dependencies
// and any change feed processors that don't have any dependencies relying on them will be stopped
private static void UnregisterDependency(
string changeFeedProcessorKey,
string dependencyKey,
CosmosDbNotificationDependency dependency)
{
lock (lock_mutex)
{
dependenciesOnKeys[dependencyKey].Remove(dependency);
dependenciesOnProcessors[changeFeedProcessorKey].Remove(dependency.DependencyId);
if (dependenciesOnKeys[dependencyKey].Count == 0)
{
dependenciesOnKeys.Remove(dependencyKey);
if (dependenciesOnProcessors[changeFeedProcessorKey].Count == 0)
{
dependenciesOnProcessors.Remove(changeFeedProcessorKey);
changefeedProcessors[changeFeedProcessorKey].StopAsync().Wait();
changefeedProcessors.Remove(changeFeedProcessorKey);
}
}
}
}
// In the **CreateChangeFeedProcessor** method, we take the change feed processor key
// and split out all the parameters needed to create a change feed processor
private static IChangeFeedProcessor CreateChangeFeedProcessor(string changeFeedProcessorKey)
{
string[] changeFeedProcessorAttributes = changeFeedProcessorKey.Split('-');
string monitoredUri = changeFeedProcessorAttributes[0];
string monitoredAuthKey = changeFeedProcessorAttributes[1];
string monitoreddatabaseId = changeFeedProcessorAttributes[2];
string monitoredContainerId = changeFeedProcessorAttributes[3];
string leaseUri = changeFeedProcessorAttributes[4];
string leaseAuthKey = changeFeedProcessorAttributes[5];
string leaseDatabaseId = changeFeedProcessorAttributes[6];
string leaseContainerId = changeFeedProcessorAttributes[7];
string cacheId = changeFeedProcessorAttributes[8];
string hostName = "host-" + Guid.NewGuid().ToString();
DocumentCollectionInfo monitoredContainerInfo = new DocumentCollectionInfo
{
Uri = new Uri(monitoredUri),
MasterKey = monitoredAuthKey,
DatabaseName = monitoreddatabaseId,
CollectionName = monitoredContainerId
};
DocumentCollectionInfo leaseContainerInfo = new DocumentCollectionInfo
{
Uri = new Uri(leaseUri),
MasterKey = leaseAuthKey,
DatabaseName = leaseDatabaseId,
CollectionName = leaseContainerId
};
DocumentClient feedClient = new DocumentClient(
serviceEndpoint: new Uri(monitoredUri),
authKeyOrResourceToken: monitoredAuthKey,
handler: new HttpClientHandler
{
ServerCertificateCustomValidationCallback = (a, b, c, d) => true
},
connectionPolicy: new ConnectionPolicy
{
EnableEndpointDiscovery = false,
ConnectionMode = ConnectionMode.Gateway,
ConnectionProtocol = Protocol.Tcp
});
DocumentClient leaseClient = new DocumentClient(
serviceEndpoint: new Uri(leaseUri),
authKeyOrResourceToken: leaseAuthKey,
handler: new HttpClientHandler
{
ServerCertificateCustomValidationCallback = (a, b, c, d) => true
},
connectionPolicy: new ConnectionPolicy
{
EnableEndpointDiscovery = false,
ConnectionMode = ConnectionMode.Gateway,
ConnectionProtocol = Protocol.Tcp
});
return new ChangeFeedProcessorBuilder()
.WithHostName(hostName)
.WithFeedCollection(monitoredContainerInfo)
.WithLeaseCollection(leaseContainerInfo)
.WithFeedDocumentClient(feedClient)
.WithLeaseDocumentClient(leaseClient)
.WithObserverFactory(new CosmosDbChangFeedObserverFactory(changeFeedProcessorKey))
.WithProcessorOptions(new ChangeFeedProcessorOptions
{
// We create a separate lease document for each of the cache
// server nodes so as to avoid missing on updates to db state
// changes due to partitioning of data on different nodes
LeasePrefix = $"NCache--{cacheId}-{Guid.NewGuid().ToString()}",
StartTime = DateTime.Now
})
.BuildAsync()
.GetAwaiter()
.GetResult();
}
}
}