Skip to content

Commit f538aee

Browse files
author
anouar.hassine
committed
Adding filter to GetSnapshot
1 parent f41ec7f commit f538aee

File tree

12 files changed

+68
-31
lines changed

12 files changed

+68
-31
lines changed

ReactiveXComponent/Connection/IXCPublisher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public interface IXCPublisher : IDisposable
1010
void SendEvent(string stateMachine, object message, string messageType, Visibility visibility = Visibility.Public);
1111
void SendEvent(string stateMachine, object message, Visibility visibility = Visibility.Public);
1212
void SendEvent(StateMachineRefHeader stateMachineRefHeader, object message, Visibility visibility = Visibility.Public);
13-
List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000);
14-
Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000);
13+
List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000);
14+
Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000);
1515
}
1616
}

ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag
7070
Send(message, routingKey, prop);
7171
}
7272

73-
public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000)
73+
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
7474
{
75-
return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout);
75+
return _rabbitMqSnapshotManager.GetSnapshot(stateMachine, filter, chunkSize, timeout);
7676
}
7777

78-
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000)
78+
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
7979
{
80-
return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout);
80+
return _rabbitMqSnapshotManager.GetSnapshotAsync(stateMachine, filter, chunkSize, timeout);
8181
}
8282

8383
#endregion

ReactiveXComponent/RabbitMq/RabbitMqSnapshotManager.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private static SnapshotResponse AggregateChunks(ConcurrentBag<SnapshotResponseCh
7373
return aggregatedResult;
7474
}
7575

76-
public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000)
76+
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
7777
{
7878
var guid = Guid.NewGuid();
7979
var requestId = guid.ToString();
@@ -119,7 +119,7 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
119119
SnapshotReceived += snapshotListenerOnMessageReceived;
120120

121121
SubscribeSnapshot(stateMachine, requestId);
122-
SendSnapshotRequest(stateMachine, requestId, chunkSize, _privateCommunicationIdentifier);
122+
SendSnapshotRequest(stateMachine, requestId, filter, chunkSize, _privateCommunicationIdentifier);
123123

124124
if (receivedSnapshotChunksInitialized.WaitOne(timeout))
125125
{
@@ -183,12 +183,12 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
183183
return result;
184184
}
185185

186-
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000)
186+
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
187187
{
188-
return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout));
188+
return Task.Run(() => GetSnapshot(stateMachine, filter, chunkSize, timeout));
189189
}
190190

191-
private void SendSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize, string privateCommunicationIdentifier = null)
191+
private void SendSnapshotRequest(string stateMachine, string replyTopic, string filter = null, int? chunkSize = null, string privateCommunicationIdentifier = null)
192192
{
193193
if (_xcConfiguration == null)
194194
return;
@@ -212,6 +212,7 @@ private void SendSnapshotRequest(string stateMachine, string replyTopic, int? ch
212212
CallerPrivateTopic = !string.IsNullOrEmpty(privateCommunicationIdentifier)
213213
? new List<string>{ privateCommunicationIdentifier}
214214
: null,
215+
Filter = filter,
215216
ChunkSize = chunkSize
216217
};
217218

ReactiveXComponent/ReactiveXComponent.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
<WorkingDirectory>$(MSBuildThisFileDirectory)$(OutputPath)</WorkingDirectory>
3535
</PropertyGroup>
3636
<ItemGroup>
37-
<InputAssemblies Include="$(WorkingDirectory)\ReactiveXComponent.dll" />
3837
<InputAssemblies Include="$(WorkingDirectory)\Newtonsoft.Json.dll" />
3938
<InputAssemblies Include="$(WorkingDirectory)\RabbitMQ.Client.dll" />
4039
<InputAssemblies Include="$(WorkingDirectory)\WebSocket4Net.dll" />

ReactiveXComponent/WebSocket/WebSocketPublisher.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ public void SendEvent(StateMachineRefHeader stateMachineRefHeader, object messag
6565
_webSocketClient.Send(webSocketRequest);
6666
}
6767

68-
public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize, int timeout = 10000)
68+
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
6969
{
70-
return _webSocketSnapshotManager.GetSnapshot(stateMachine, chunkSize, timeout);
70+
return _webSocketSnapshotManager.GetSnapshot(stateMachine, filter, chunkSize, timeout);
7171
}
7272

73-
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize, int timeout = 10000)
73+
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
7474
{
75-
return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, chunkSize, timeout);
75+
return _webSocketSnapshotManager.GetSnapshotAsync(stateMachine, filter, chunkSize, timeout);
7676
}
7777

7878
#endregion

ReactiveXComponent/WebSocket/WebSocketSnapshotManager.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public WebSocketSnapshotManager(string component, IWebSocketClient webSocketClie
4343
h => SnapshotReceived -= h);
4444
}
4545

46-
public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize = null, int timeout = 10000)
46+
public List<MessageEventArgs> GetSnapshot(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
4747
{
4848
var replyTopic = Guid.NewGuid().ToString();
4949

@@ -74,7 +74,7 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
7474
using (_snapshotStream.Subscribe(observer))
7575
{
7676
SendWebSocketSnapshotSubscriptionResquest(replyTopic);
77-
SendWebSocketSnapshotRequest(stateMachine, replyTopic, chunkSize);
77+
SendWebSocketSnapshotRequest(stateMachine, replyTopic, filter, chunkSize);
7878
lockEvent.WaitOne(timeout);
7979
}
8080

@@ -84,12 +84,12 @@ public List<MessageEventArgs> GetSnapshot(string stateMachine, int? chunkSize =
8484
return result;
8585
}
8686

87-
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, int? chunkSize = null, int timeout = 10000)
87+
public Task<List<MessageEventArgs>> GetSnapshotAsync(string stateMachine, string filter = null, int? chunkSize = null, int timeout = 10000)
8888
{
89-
return Task.Run(() => GetSnapshot(stateMachine, chunkSize, timeout));
89+
return Task.Run(() => GetSnapshot(stateMachine, filter, chunkSize, timeout));
9090
}
9191

92-
private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, int? chunkSize)
92+
private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic, string filter = null, int? chunkSize = null)
9393
{
9494
if (!_webSocketClient.IsOpen) return;
9595

@@ -101,7 +101,7 @@ private void SendWebSocketSnapshotRequest(string stateMachine, string replyTopic
101101
ComponentCode = componentCode,
102102
StateMachineCode = stateMachineCode
103103
};
104-
var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize);
104+
var snapshotMessage = new WebSocketSnapshotMessage(stateMachineCode, componentCode, replyTopic, _privateCommunicationIdentifier, chunkSize, filter);
105105
var webSocketRequest = WebSocketMessageHelper.SerializeRequest(
106106
WebSocketCommand.Snapshot,
107107
inputHeader,

ReactiveXComponent/WebSocket/WebSocketSnapshotMessage.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ public class WebSocketSnapshotMessage
1010
public string ReplyTopic { get; set; }
1111
public string[] CallerPrivateTopic { get; set; }
1212
public int? ChunkSize { get; set; }
13+
public string Filter { get; set; }
1314

1415
public WebSocketSnapshotMessage() { }
1516

16-
public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, string replyTopic, string callerPrivateTopic, int? chunkSize)
17+
public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, string replyTopic, string callerPrivateTopic, int? chunkSize, string filter = null)
1718
{
1819
StateMachineCode = stateMachineCode;
1920
ComponentCode = componentCode;
@@ -23,6 +24,7 @@ public WebSocketSnapshotMessage(long stateMachineCode, long componentCode, strin
2324
CallerPrivateTopic = new []{ callerPrivateTopic };
2425
}
2526
ChunkSize = chunkSize;
27+
Filter = filter;
2628
}
2729
}
2830
}

ReactiveXComponentTest/Configuration/ConfigurationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void GetBusDetailsTest()
8383
Check.That(busDetails.SslServerName).IsEqualTo("XComponent RMq");
8484
Check.That(busDetails.SslCertificatePath).IsEqualTo("some_cert_path");
8585
Check.That(busDetails.SslCertificatePassphrase).IsEqualTo("some_cert_pass");
86-
Check.That(busDetails.SslProtocol).IsEqualTo(SslProtocols.Default);
86+
Check.That(busDetails.SslProtocol).IsEqualTo(SslProtocols.Tls12);
8787
Check.That(busDetails.SslAllowUntrustedServerCertificate).IsTrue();
8888
}
8989

ReactiveXComponentTest/RabbitMq/RabbitMqTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ public void SnapshotTest(int instancesCount, int chunkSize)
403403
List<MessageEventArgs> snapshotInstances = null;
404404
Task.Run(async () =>
405405
{
406-
snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize);
406+
snapshotInstances = await publisher.GetSnapshotAsync(StateMachineA, chunkSize: chunkSize);
407407
}).GetAwaiter().OnCompleted(() =>
408408
{
409409
snapshotReceivedEvent.Set();

ReactiveXComponentTest/RabbitMqTestApi.xcApi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<threading />
44
<serialization>Binary</serialization>
55
<communication>
6-
<bus name="rabbitmq" host="127.0.0.1" virtualHost="myVirtualHost" port="5671" user="guest" password="guest" type="RABBIT_MQ" sslEnabled="True" sslServerName="XComponent RMq" sslCertPath="some_cert_path" sslCertPassphrase="some_cert_pass" sslProtocol="Default" sslAllowUntrustedServerCertificate="True" />
6+
<bus name="rabbitmq" host="127.0.0.1" virtualHost="myVirtualHost" port="5671" user="guest" password="guest" type="RABBIT_MQ" sslEnabled="True" sslServerName="XComponent RMq" sslCertPath="some_cert_path" sslCertPassphrase="some_cert_pass" sslProtocol="Tls12" sslAllowUntrustedServerCertificate="True" />
77
</communication>
88
<clientAPICommunication>
99
<publish componentCode="-69981087" stateMachineCode="-829536631" eventType="UPDATE" topicType="output" communicationType="BUS" stateCode="0" eventCode="9" event="XComponent.HelloWorld.UserObject.SayHello" communication="rabbitmq">

0 commit comments

Comments
 (0)