diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 00000000..31bb7740
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,3 @@
+# Auto detect text files and perform LF normalization
+* text=auto
+*.sh text eol=lf
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a300562b..d4dadfd7 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -7,18 +7,13 @@ on:
workflow_dispatch:
env:
DOTNET_NOLOGO: true
+defaults:
+ run:
+ shell: pwsh
jobs:
build:
- name: ${{ matrix.name }}
- runs-on: ${{ matrix.os }}
- strategy:
- matrix:
- include:
- - os: windows-latest
- name: Windows
- #- os: ubuntu-latest
- # name: Linux
- fail-fast: false
+ name: Linux
+ runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v5.0.0
@@ -28,10 +23,17 @@ jobs:
global-json-file: global.json
- name: Build
run: dotnet build src --configuration Release
+ - name: Remove executables
+ run: |
+ Remove-Item binaries/MonitoringDemo/MonitoringDemo
+ Remove-Item binaries/Billing/Billing
+ Remove-Item binaries/ClientUI/ClientUI
+ Remove-Item binaries/PlatformLauncher/PlatformLauncher
+ Remove-Item binaries/Sales/Sales
+ Remove-Item binaries/Shipping/Shipping
- name: Publish artifacts
- if: matrix.name == 'Windows'
uses: actions/upload-artifact@v4.6.2
with:
name: binaries
- path: src/binaries/*
+ path: binaries/*
retention-days: 7
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 9cce9954..f9bf9894 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -3,9 +3,12 @@ on:
workflow_dispatch:
env:
DOTNET_NOLOGO: true
+defaults:
+ run:
+ shell: pwsh
jobs:
release:
- runs-on: windows-latest
+ runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v5.0.0
@@ -15,25 +18,14 @@ jobs:
global-json-file: global.json
- name: Build
run: dotnet build src --configuration Release
- - name: Install AzureSignTool
- run: dotnet tool install --global azuresigntool
- - name: Sign binaries
+ - name: Remove executables
run: |
- AzureSignTool sign `
- --file-digest sha256 `
- --timestamp-rfc3161 http://timestamp.digicert.com `
- --azure-key-vault-url https://particularcodesigning.vault.azure.net `
- --azure-key-vault-client-id ${{ secrets.AZURE_KEY_VAULT_CLIENT_ID }} `
- --azure-key-vault-tenant-id ${{ secrets.AZURE_KEY_VAULT_TENANT_ID }} `
- --azure-key-vault-client-secret ${{ secrets.AZURE_KEY_VAULT_CLIENT_SECRET }} `
- --azure-key-vault-certificate ${{ secrets.AZURE_KEY_VAULT_CERTIFICATE_NAME }} `
- src/binaries/MonitoringDemo.exe `
- src/binaries/Billing/Billing.exe `
- src/binaries/ClientUI/ClientUI.exe `
- src/binaries/Platform/Platform.exe `
- src/binaries/Sales/Sales.exe `
- src/binaries/Shipping/Shipping.exe
- shell: pwsh
+ Remove-Item binaries/MonitoringDemo/MonitoringDemo
+ Remove-Item binaries/Billing/Billing
+ Remove-Item binaries/ClientUI/ClientUI
+ Remove-Item binaries/PlatformLauncher/PlatformLauncher
+ Remove-Item binaries/Sales/Sales
+ Remove-Item binaries/Shipping/Shipping
- name: Setup AWS Credentials
uses: aws-actions/configure-aws-credentials@v4.1.0
with:
@@ -41,17 +33,16 @@ jobs:
aws-secret-access-key: ${{ secrets.AWS_SECRETKEY }}
aws-region: us-east-1
- name: Deploy to S3
- shell: pwsh
+ shell: bash
run: |
echo "Creating Particular.MonitoringDemo.zip archive"
- Compress-Archive -Path ./src/binaries/* -DestinationPath ./Particular.MonitoringDemo.zip
+ (cd binaries && zip -r "$OLDPWD/Particular.MonitoringDemo.zip" .)
echo "Uploading zip file to AWS"
aws s3 cp ./Particular.MonitoringDemo.zip s3://particular.downloads/MonitoringDemo/Particular.MonitoringDemo.zip --content-type application/zip --acl public-read
echo "Complete"
- name: Upload dependency file to AWS
- shell: pwsh
run: |
$dotnetPackages = dotnet list src/Platform package --include-transitive --format json | ConvertFrom-Json
$firstProject = $dotnetPackages.projects[0]
diff --git a/.gitignore b/.gitignore
index 3abcba8d..3f5466ee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,7 +3,7 @@
**/Platform/**/db
**/bin/
**/obj/
-src/binaries/**/*
+binaries/**/*
**/.vs/
MonitoringDemo.Sql/support/*.log
MonitoringDemo.Sql/transport/
@@ -12,4 +12,5 @@ MonitoringDemo.Sql/transport/
**/.diagnostics/*
.logs
-.idea
\ No newline at end of file
+.idea
+/otel/
diff --git a/README.md b/README.md
index b1aba7dd..27257dd1 100644
--- a/README.md
+++ b/README.md
@@ -4,24 +4,18 @@ Self-contained demo showing all of the monitoring components working together. T
- https://docs.particular.net/tutorials/monitoring/demo/
-# Prerequisites
-
-Running the demo requires .Net Framework 4.6.1 or newer.
+## Prerequisites
In order to run the downloaded sample you will need the following prerequisites.
-
-- Window operating system, the Particular Service Platform requires the Windows operating system
- - Desktop: Windows 8 or higher
- - Server: Windows Server 2016 or higher
-- Powershell 3.0 or higher
-- .NET Framework 4.6.1 (check version)
-# Running
+- .NET 8.0 or higher
+
+## Running
- Compile `src\MonitoringDemo.sln`
-- Execute `src\binaries\MonitoringDemo.exe`
+- Execute `src\binaries\launch.sh` or `src\binaries\launch.ps1`
-# Deploying
+## Deploying
1. Go to the [Release action page](https://github.com/Particular/MonitoringDemo/actions/workflows/release.yml).
2. Click the **Run workflow** button.
diff --git a/src/.editorconfig b/src/.editorconfig
new file mode 100644
index 00000000..84a8142f
--- /dev/null
+++ b/src/.editorconfig
@@ -0,0 +1,4 @@
+[*.{csproj,props,targets,xml}]
+indent_style = space
+indent_size = 2
+xml_space_inside_empty_tag = true
\ No newline at end of file
diff --git a/src/Billing/Billing.csproj b/src/Billing/Billing.csproj
index b0bb2f24..eb33ab31 100644
--- a/src/Billing/Billing.csproj
+++ b/src/Billing/Billing.csproj
@@ -5,8 +5,7 @@
Exe
enable
enable
- ..\binaries\Billing\
- failures.ico
+ ..\..\binaries\Billing\
@@ -15,11 +14,9 @@
-
-
-
-
-
+
+
+
\ No newline at end of file
diff --git a/src/Billing/OrderPlacedHandler.cs b/src/Billing/OrderPlacedHandler.cs
index 058bbcc4..b043d6de 100644
--- a/src/Billing/OrderPlacedHandler.cs
+++ b/src/Billing/OrderPlacedHandler.cs
@@ -1,18 +1,19 @@
using Messages;
+using Shared;
namespace Billing;
-public class OrderPlacedHandler(SimulationEffects simulationEffects) : IHandleMessages
+public class OrderPlacedHandler : IHandleMessages
{
public async Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
- await simulationEffects.SimulatedMessageProcessing(context.CancellationToken);
-
var orderBilled = new OrderBilled
{
OrderId = message.OrderId
};
- await context.Publish(orderBilled);
+ var publishOptions = new PublishOptions();
+ publishOptions.SetMessageId(MessageIdHelper.GetHumanReadableMessageId());
+ await context.Publish(orderBilled, publishOptions);
}
}
\ No newline at end of file
diff --git a/src/Billing/Program.cs b/src/Billing/Program.cs
index e6fcaddc..30200765 100644
--- a/src/Billing/Program.cs
+++ b/src/Billing/Program.cs
@@ -1,78 +1,81 @@
-using System.Text.Json;
-using Billing;
+using System.Reflection;
+using System.Text.Json;
using Messages;
-using Microsoft.Extensions.DependencyInjection;
using Shared;
-Console.Title = "Failure rate (Billing)";
-Console.SetWindowSize(65, 15);
+var instancePostfix = args.FirstOrDefault();
-LoggingUtils.ConfigureLogging("Billing");
+var title = string.IsNullOrEmpty(instancePostfix) ? "Failure rate (Billing)" : $"Billing - {instancePostfix}";
+var instanceName = string.IsNullOrEmpty(instancePostfix) ? "billing" : $"billing-{instancePostfix}";
+var instanceId = DeterministicGuid.Create("Billing", instanceName);
+var prometheusPortString = args.Skip(1).FirstOrDefault();
-var endpointConfiguration = new EndpointConfiguration("Billing");
-endpointConfiguration.LimitMessageProcessingConcurrencyTo(4);
+var endpointControls = new ProcessingEndpointControls(() => PrepareEndpointConfiguration(instanceId, instanceName, prometheusPortString));
-var serializer = endpointConfiguration.UseSerialization();
-serializer.Options(new JsonSerializerOptions
+var ui = new UserInterface();
+endpointControls.BindSlowProcessingDial(ui, '5', 't');
+endpointControls.BindDatabaseFailuresDial(ui, '6', 'y');
+
+endpointControls.BindDatabaseDownToggle(ui, 'f');
+endpointControls.BindDelayedRetriesToggle(ui, 'g');
+endpointControls.BindAutoThrottleToggle(ui, 'h');
+
+endpointControls.BindFailureReceivingButton(ui, 'v');
+endpointControls.BindFailureProcessingButton(ui, 'b');
+endpointControls.BindFailureDispatchingButton(ui, 'n');
+
+if (prometheusPortString != null)
{
- TypeInfoResolverChain =
- {
- MessagesSerializationContext.Default
- }
-});
+ OpenTelemetryUtils.ConfigureOpenTelemetry("Billing", instanceId.ToString(), int.Parse(prometheusPortString));
+}
-endpointConfiguration.UseTransport();
+endpointControls.Start();
-endpointConfiguration.Recoverability()
- .Delayed(delayed => delayed.NumberOfRetries(0));
+ui.RunLoop(title);
-endpointConfiguration.AuditProcessedMessagesTo("audit");
-endpointConfiguration.SendHeartbeatTo("Particular.ServiceControl");
+await endpointControls.StopEndpoint();
-endpointConfiguration.UniquelyIdentifyRunningInstance()
- .UsingCustomIdentifier(new Guid("1C62248E-2681-45A4-B44D-5CF93584BAD6"))
- .UsingCustomDisplayName("original-instance");
+EndpointConfiguration PrepareEndpointConfiguration(Guid guid, string s, string? prometheusPortString1)
+{
+ var endpointConfiguration1 = new EndpointConfiguration("Billing");
+ endpointConfiguration1.LimitMessageProcessingConcurrencyTo(4);
-var metrics = endpointConfiguration.EnableMetrics();
-metrics.SendMetricDataToServiceControl(
- "Particular.Monitoring",
- TimeSpan.FromMilliseconds(500)
-);
+ var serializer = endpointConfiguration1.UseSerialization();
+ serializer.Options(new JsonSerializerOptions
+ {
+ TypeInfoResolverChain =
+ {
+ MessagesSerializationContext.Default
+ }
+ });
-var simulationEffects = new SimulationEffects();
-endpointConfiguration.RegisterComponents(cc => cc.AddSingleton(simulationEffects));
+ var transport = new LearningTransport
+ {
+ StorageDirectory = Path.Combine(Directory.GetParent(Assembly.GetExecutingAssembly().Location)!.Parent!.FullName, ".learningtransport"),
+ TransportTransactionMode = TransportTransactionMode.ReceiveOnly
+ };
+ endpointConfiguration1.UseTransport(transport);
-var endpointInstance = await Endpoint.Start(endpointConfiguration);
+ endpointConfiguration1.Recoverability()
+ .Delayed(delayed => delayed.NumberOfRetries(0));
-RunUserInterfaceLoop(simulationEffects);
+ endpointConfiguration1.AuditProcessedMessagesTo("audit");
+ endpointConfiguration1.SendHeartbeatTo("Particular.ServiceControl");
-await endpointInstance.Stop();
+ endpointConfiguration1.UniquelyIdentifyRunningInstance()
+ .UsingCustomIdentifier(guid)
+ .UsingCustomDisplayName(s);
-void RunUserInterfaceLoop(SimulationEffects state)
-{
- while (true)
- {
- Console.Clear();
- Console.WriteLine("Billing Endpoint");
- Console.WriteLine("Press F to increase the simulated failure rate");
- Console.WriteLine("Press S to decrease the simulated failure rate");
- Console.WriteLine("Press ESC to quit");
- Console.WriteLine();
+ var metrics = endpointConfiguration1.EnableMetrics();
+ metrics.SendMetricDataToServiceControl(
+ "Particular.Monitoring",
+ TimeSpan.FromMilliseconds(500)
+ );
- state.WriteState(Console.Out);
+ endpointConfiguration1.UsePersistence();
+ endpointConfiguration1.EnableOutbox();
- var input = Console.ReadKey(true);
+ endpointConfiguration1.EnableOpenTelemetry();
- switch (input.Key)
- {
- case ConsoleKey.F:
- state.IncreaseFailureRate();
- break;
- case ConsoleKey.S:
- state.DecreaseFailureRate();
- break;
- case ConsoleKey.Escape:
- return;
- }
- }
-}
+ return endpointConfiguration1;
+}
\ No newline at end of file
diff --git a/src/Billing/SimulationEffect.cs b/src/Billing/SimulationEffect.cs
deleted file mode 100644
index 199c518e..00000000
--- a/src/Billing/SimulationEffect.cs
+++ /dev/null
@@ -1,32 +0,0 @@
-namespace Billing;
-
-public class SimulationEffects
-{
- public void IncreaseFailureRate()
- {
- failureRate = Math.Min(1, failureRate + failureRateIncrement);
- }
-
- public void DecreaseFailureRate()
- {
- failureRate = Math.Max(0, failureRate - failureRateIncrement);
- }
-
- public void WriteState(TextWriter output)
- {
- output.WriteLine("Failure rate: {0:P0}", failureRate);
- }
-
- public async Task SimulatedMessageProcessing(CancellationToken cancellationToken = default)
- {
- await Task.Delay(200, cancellationToken);
-
- if (Random.Shared.NextDouble() < failureRate)
- {
- throw new Exception("BOOM! A failure occurred");
- }
- }
-
- double failureRate;
- const double failureRateIncrement = 0.1;
-}
\ No newline at end of file
diff --git a/src/Billing/failures.ico b/src/Billing/failures.ico
deleted file mode 100644
index d302303d..00000000
Binary files a/src/Billing/failures.ico and /dev/null differ
diff --git a/src/ClientUI/ClientUI.csproj b/src/ClientUI/ClientUI.csproj
index ef78e1f7..c29b83c3 100644
--- a/src/ClientUI/ClientUI.csproj
+++ b/src/ClientUI/ClientUI.csproj
@@ -5,8 +5,7 @@
Exe
enable
enable
- ..\binaries\ClientUI\
- traffic.ico
+ ..\..\binaries\ClientUI\
@@ -15,11 +14,8 @@
-
-
-
-
-
+
+
\ No newline at end of file
diff --git a/src/ClientUI/Program.cs b/src/ClientUI/Program.cs
index 06830cbf..595ec121 100644
--- a/src/ClientUI/Program.cs
+++ b/src/ClientUI/Program.cs
@@ -1,12 +1,15 @@
-using System.Text.Json;
+using System.Reflection;
+using System.Text.Json;
using ClientUI;
using Messages;
using Shared;
-Console.Title = "Load (ClientUI)";
-Console.SetWindowSize(65, 15);
+var instancePostfix = args.FirstOrDefault();
-LoggingUtils.ConfigureLogging("ClientUI");
+var title = string.IsNullOrEmpty(instancePostfix) ? "ClientUI" : $"ClientUI - {instancePostfix}";
+var instanceName = string.IsNullOrEmpty(instancePostfix) ? "clientui" : $"clientui-{instancePostfix}";
+var instanceId = DeterministicGuid.Create("ClientUI", instanceName);
+var prometheusPortString = args.Skip(1).FirstOrDefault();
var endpointConfiguration = new EndpointConfiguration("ClientUI");
@@ -19,14 +22,20 @@
}
});
-var transport = endpointConfiguration.UseTransport();
+var transport = new LearningTransport
+{
+ StorageDirectory = Path.Combine(Directory.GetParent(Assembly.GetExecutingAssembly().Location)!.Parent!.FullName, ".learningtransport")
+};
+var routing = endpointConfiguration.UseTransport(transport);
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.SendHeartbeatTo("Particular.ServiceControl");
endpointConfiguration.UniquelyIdentifyRunningInstance()
- .UsingCustomIdentifier(new Guid("EA3E7D1B-8171-4098-B160-1FEA975CCB2C"))
- .UsingCustomDisplayName("original-instance");
+ .UsingCustomIdentifier(instanceId)
+ .UsingCustomDisplayName(instanceName);
+
+endpointConfiguration.EnableOpenTelemetry();
var metrics = endpointConfiguration.EnableMetrics();
metrics.SendMetricDataToServiceControl(
@@ -34,44 +43,32 @@
TimeSpan.FromMilliseconds(500)
);
-var routing = transport.Routing();
routing.RouteToEndpoint(typeof(PlaceOrder), "Sales");
+if (prometheusPortString != null)
+{
+ OpenTelemetryUtils.ConfigureOpenTelemetry("ClientUI", instanceId.ToString(), int.Parse(prometheusPortString));
+}
+
var endpointInstance = await Endpoint.Start(endpointConfiguration);
var simulatedCustomers = new SimulatedCustomers(endpointInstance);
var cancellation = new CancellationTokenSource();
+
+var ui = new UserInterface();
+simulatedCustomers.BindSendingRateDial(ui, '-', '[');
+simulatedCustomers.BindDuplicateLikelihoodDial(ui, '=', ']');
+simulatedCustomers.BindManualModeToggle(ui, ';');
+simulatedCustomers.BindManualSendButton(ui, '/');
+simulatedCustomers.BindNoiseToggle(ui, '`');
+simulatedCustomers.BindBlackFridayToggle(ui, '\'');
+
var simulatedWork = simulatedCustomers.Run(cancellation.Token);
-RunUserInterfaceLoop(simulatedCustomers);
+ui.RunLoop(title);
cancellation.Cancel();
await simulatedWork;
-await endpointInstance.Stop();
-
-void RunUserInterfaceLoop(SimulatedCustomers simulatedCustomers)
-{
- while (true)
- {
- Console.Clear();
- Console.WriteLine("Simulating customers placing orders on a website");
- Console.WriteLine("Press T to toggle High/Low traffic mode");
- Console.WriteLine("Press ESC to quit");
- Console.WriteLine();
-
- simulatedCustomers.WriteState(Console.Out);
-
- var input = Console.ReadKey(true);
-
- switch (input.Key)
- {
- case ConsoleKey.T:
- simulatedCustomers.ToggleTrafficMode();
- break;
- case ConsoleKey.Escape:
- return;
- }
- }
-}
+await endpointInstance.Stop();
\ No newline at end of file
diff --git a/src/ClientUI/SimulatedCustomers.cs b/src/ClientUI/SimulatedCustomers.cs
index 368d73e8..94785ac1 100644
--- a/src/ClientUI/SimulatedCustomers.cs
+++ b/src/ClientUI/SimulatedCustomers.cs
@@ -1,21 +1,57 @@
using Messages;
+using Shared;
namespace ClientUI;
class SimulatedCustomers(IEndpointInstance endpointInstance)
{
- public void WriteState(TextWriter output)
+ public void BindSendingRateDial(UserInterface userInterface, char upKey, char downKey)
{
- var trafficMode = highTrafficMode ? "High" : "Low";
- output.WriteLine($"{trafficMode} traffic mode - sending {rate} orders / second");
+ userInterface.BindDial('B', upKey, downKey,
+ $"Press {upKey} to increase sending rate.{Environment.NewLine}Press {downKey} to decrease it.",
+ () => $"Sending rate: {rate}", x => rate = x + 1); //Rate is from 1 to 10
}
- public void ToggleTrafficMode()
+ public void BindDuplicateLikelihoodDial(UserInterface userInterface, char upKey, char downKey)
{
- highTrafficMode = !highTrafficMode;
- rate = highTrafficMode ? 8 : 1;
+ userInterface.BindDial('C', upKey, downKey,
+ $"Press {upKey} to increase duplicate message rate.{Environment.NewLine}Press {downKey} to decrease it.",
+ () => $"Duplicate rate: {duplicateLikelihood * 10}%", x => duplicateLikelihood = x);
}
+ public void BindManualModeToggle(UserInterface userInterface, char toggleKey)
+ {
+ userInterface.BindToggle('D', toggleKey, $"Press {toggleKey} to toggle manual send mode",
+ () => manualMode ? "Manual sending mode" : "Automatic sending mode",
+ () => manualMode = true, () =>
+ {
+ manualMode = false;
+ manualModeSemaphore.Release();
+ });
+ }
+
+ public void BindNoiseToggle(UserInterface userInterface, char toggleKey)
+ {
+ userInterface.BindToggle('E', toggleKey, $"Press {toggleKey} to toggle random noise",
+ () => enableRandomNoise ? "Random noise" : "No random noise",
+ () => enableRandomNoise = true, () => enableRandomNoise = false);
+ }
+
+ public void BindBlackFridayToggle(UserInterface userInterface, char toggleKey)
+ {
+ userInterface.BindToggle('F', toggleKey, $"Press {toggleKey} to toggle Black Friday mode",
+ () => blackFriday ? "Black Friday!" : "Business as usual",
+ () => blackFriday = true, () => blackFriday = false);
+ }
+
+ public void BindManualSendButton(UserInterface userInterface, char key)
+ {
+ userInterface.BindButton('G', key, $"Press {key} to send a message", null, () => manualModeSemaphore.Release());
+ }
+
+ private int EffectiveRate => Math.Max(blackFriday ? 32 : NoiseModifiedRate, 0);
+ private int NoiseModifiedRate => enableRandomNoise ? rate + noiseComponent : rate;
+
public async Task Run(CancellationToken cancellationToken = default)
{
nextReset = DateTime.UtcNow.AddSeconds(1);
@@ -27,15 +63,35 @@ public async Task Run(CancellationToken cancellationToken = default)
if (now > nextReset)
{
currentIntervalCount = 0;
+
+ var noiseIncrease = Random.Shared.Next(Math.Abs(noiseComponent) + 1) == 0;
+ if (noiseComponent == 0)
+ {
+ //Randomly go up or down
+ }
+ else if (noiseIncrease)
+ {
+ noiseComponent += Math.Sign(noiseComponent);
+ }
+ else
+ {
+ noiseComponent -= Math.Sign(noiseComponent);
+ }
+
nextReset = now.AddSeconds(1);
}
+ if (manualMode)
+ {
+ await manualModeSemaphore.WaitAsync();
+ }
+
await PlaceSingleOrder(cancellationToken);
currentIntervalCount++;
try
{
- if (currentIntervalCount >= rate)
+ if (currentIntervalCount >= EffectiveRate)
{
var delay = nextReset - DateTime.UtcNow;
if (delay > TimeSpan.Zero)
@@ -51,19 +107,54 @@ public async Task Run(CancellationToken cancellationToken = default)
}
}
- Task PlaceSingleOrder(CancellationToken cancellationToken)
+ async Task PlaceSingleOrder(CancellationToken cancellationToken)
{
var placeOrderCommand = new PlaceOrder
{
OrderId = Guid.NewGuid().ToString()
};
- return endpointInstance.Send(placeOrderCommand, cancellationToken);
+ var messageId = MessageIdHelper.GetHumanReadableMessageId();
+
+ await SendOneMessage(messageId, cancellationToken, placeOrderCommand);
+
+ if (manualMode)
+ {
+ Console.WriteLine($"Message {messageId} sent.");
+ }
+
+ if (Random.Shared.Next(10) < duplicateLikelihood)
+ {
+ //Send a duplicate
+ await SendOneMessage(messageId, cancellationToken, placeOrderCommand);
+
+ if (manualMode)
+ {
+ Console.WriteLine($"Duplicate message {messageId} sent.");
+ }
+ }
}
- bool highTrafficMode;
+ private async Task SendOneMessage(string messageId, CancellationToken cancellationToken, PlaceOrder placeOrderCommand)
+ {
+ var sendOptions = new SendOptions();
+
+ if (manualMode)
+ {
+ sendOptions.SetHeader("MonitoringDemo.ManualMode", "True");
+ }
+
+ sendOptions.SetMessageId(messageId);
+ await endpointInstance.Send(placeOrderCommand, sendOptions, cancellationToken);
+ }
DateTime nextReset;
int currentIntervalCount;
int rate = 1;
+ private int noiseComponent = 0;
+ private bool enableRandomNoise;
+ private int duplicateLikelihood;
+ private bool manualMode;
+ private bool blackFriday;
+ private SemaphoreSlim manualModeSemaphore = new SemaphoreSlim(0);
}
\ No newline at end of file
diff --git a/src/ClientUI/traffic.ico b/src/ClientUI/traffic.ico
deleted file mode 100644
index d5dc024e..00000000
Binary files a/src/ClientUI/traffic.ico and /dev/null differ
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 61546c64..2fd19a9c 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -8,8 +8,6 @@
true
low
all
- win-x64
- false
false
@@ -18,7 +16,7 @@
-
+
-
\ No newline at end of file
+
diff --git a/src/Messages/Messages.csproj b/src/Messages/Messages.csproj
index 72b74b03..285dd21e 100644
--- a/src/Messages/Messages.csproj
+++ b/src/Messages/Messages.csproj
@@ -9,7 +9,7 @@
-
+
diff --git a/src/MonitoringDemo.sln b/src/MonitoringDemo.sln
index 90aea022..b95e5d9b 100644
--- a/src/MonitoringDemo.sln
+++ b/src/MonitoringDemo.sln
@@ -1,71 +1,71 @@
-Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 17
-VisualStudioVersion = 17.9.34902.65
-MinimumVisualStudioVersion = 15.0.26730.12
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ClientUI", "ClientUI\ClientUI.csproj", "{D4DCF868-A625-4B0B-BB20-C150553A6548}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messages", "Messages\Messages.csproj", "{F6DE8266-1F56-4241-965C-2DDBB76CEC1E}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sales", "Sales\Sales.csproj", "{CD42E5DF-D4A7-4933-8017-1398B2E9560F}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Billing", "Billing\Billing.csproj", "{9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shipping", "Shipping\Shipping.csproj", "{41F0D809-8FA4-4139-9131-09441D69AFB1}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shared", "Shared\Shared.csproj", "{13B5577A-6635-4964-9B3C-7EA59E4978F4}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Platform", "Platform\Platform.csproj", "{D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}"
-EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MonitoringDemo", "MonitoringDemo\MonitoringDemo.csproj", "{55C64607-52E9-4E85-A547-50B191856A93}"
-EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7D29C905-CE3A-4D93-8271-7BA09CEE1631}"
- ProjectSection(SolutionItems) = preProject
- Directory.Build.props = Directory.Build.props
- EndProjectSection
-EndProject
-Global
- GlobalSection(SolutionConfigurationPlatforms) = preSolution
- Debug|Any CPU = Debug|Any CPU
- Release|Any CPU = Release|Any CPU
- EndGlobalSection
- GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {D4DCF868-A625-4B0B-BB20-C150553A6548}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {D4DCF868-A625-4B0B-BB20-C150553A6548}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {D4DCF868-A625-4B0B-BB20-C150553A6548}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {D4DCF868-A625-4B0B-BB20-C150553A6548}.Release|Any CPU.Build.0 = Release|Any CPU
- {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Release|Any CPU.Build.0 = Release|Any CPU
- {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Release|Any CPU.Build.0 = Release|Any CPU
- {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Release|Any CPU.Build.0 = Release|Any CPU
- {41F0D809-8FA4-4139-9131-09441D69AFB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {41F0D809-8FA4-4139-9131-09441D69AFB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {41F0D809-8FA4-4139-9131-09441D69AFB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {41F0D809-8FA4-4139-9131-09441D69AFB1}.Release|Any CPU.Build.0 = Release|Any CPU
- {13B5577A-6635-4964-9B3C-7EA59E4978F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {13B5577A-6635-4964-9B3C-7EA59E4978F4}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {13B5577A-6635-4964-9B3C-7EA59E4978F4}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {13B5577A-6635-4964-9B3C-7EA59E4978F4}.Release|Any CPU.Build.0 = Release|Any CPU
- {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Release|Any CPU.Build.0 = Release|Any CPU
- {55C64607-52E9-4E85-A547-50B191856A93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {55C64607-52E9-4E85-A547-50B191856A93}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {55C64607-52E9-4E85-A547-50B191856A93}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {55C64607-52E9-4E85-A547-50B191856A93}.Release|Any CPU.Build.0 = Release|Any CPU
- EndGlobalSection
- GlobalSection(SolutionProperties) = preSolution
- HideSolutionNode = FALSE
- EndGlobalSection
- GlobalSection(ExtensibilityGlobals) = postSolution
- SolutionGuid = {081FC59E-04F4-4FB2-88A6-64A7C18BAA10}
- EndGlobalSection
-EndGlobal
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.9.34902.65
+MinimumVisualStudioVersion = 15.0.26730.12
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ClientUI", "ClientUI\ClientUI.csproj", "{D4DCF868-A625-4B0B-BB20-C150553A6548}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messages", "Messages\Messages.csproj", "{F6DE8266-1F56-4241-965C-2DDBB76CEC1E}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sales", "Sales\Sales.csproj", "{CD42E5DF-D4A7-4933-8017-1398B2E9560F}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Billing", "Billing\Billing.csproj", "{9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shipping", "Shipping\Shipping.csproj", "{41F0D809-8FA4-4139-9131-09441D69AFB1}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PlatformLauncher", "PlatformLauncher\PlatformLauncher.csproj", "{D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MonitoringDemo", "MonitoringDemo\MonitoringDemo.csproj", "{55C64607-52E9-4E85-A547-50B191856A93}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7D29C905-CE3A-4D93-8271-7BA09CEE1631}"
+ ProjectSection(SolutionItems) = preProject
+ Directory.Build.props = Directory.Build.props
+ EndProjectSection
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared", "Shared\Shared.csproj", "{5EB4D9AD-4AF7-4CED-9F69-36763A31FBE1}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {D4DCF868-A625-4B0B-BB20-C150553A6548}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D4DCF868-A625-4B0B-BB20-C150553A6548}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D4DCF868-A625-4B0B-BB20-C150553A6548}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D4DCF868-A625-4B0B-BB20-C150553A6548}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F6DE8266-1F56-4241-965C-2DDBB76CEC1E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {CD42E5DF-D4A7-4933-8017-1398B2E9560F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9BF02A43-6D9D-4B49-A06D-603A66C6BCB5}.Release|Any CPU.Build.0 = Release|Any CPU
+ {41F0D809-8FA4-4139-9131-09441D69AFB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {41F0D809-8FA4-4139-9131-09441D69AFB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {41F0D809-8FA4-4139-9131-09441D69AFB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {41F0D809-8FA4-4139-9131-09441D69AFB1}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D98DF31F-6B4B-42E1-BEE5-8CAB949638C2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {55C64607-52E9-4E85-A547-50B191856A93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {55C64607-52E9-4E85-A547-50B191856A93}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {55C64607-52E9-4E85-A547-50B191856A93}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {55C64607-52E9-4E85-A547-50B191856A93}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5EB4D9AD-4AF7-4CED-9F69-36763A31FBE1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5EB4D9AD-4AF7-4CED-9F69-36763A31FBE1}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5EB4D9AD-4AF7-4CED-9F69-36763A31FBE1}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5EB4D9AD-4AF7-4CED-9F69-36763A31FBE1}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {081FC59E-04F4-4FB2-88A6-64A7C18BAA10}
+ EndGlobalSection
+EndGlobal
diff --git a/src/MonitoringDemo/ColoredConsole.cs b/src/MonitoringDemo/ColoredConsole.cs
deleted file mode 100644
index b85e9191..00000000
--- a/src/MonitoringDemo/ColoredConsole.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-namespace MonitoringDemo;
-
-static class ColoredConsole
-{
- public static IDisposable Use(ConsoleColor color)
- {
- var previousColor = Console.ForegroundColor;
- Console.ForegroundColor = color;
-
- return new Restorer(previousColor);
- }
-
- class Restorer(ConsoleColor previousColor) : IDisposable
- {
- public void Dispose()
- {
- Console.ForegroundColor = previousColor;
- }
- }
-}
\ No newline at end of file
diff --git a/src/MonitoringDemo/DemoLauncher.cs b/src/MonitoringDemo/DemoLauncher.cs
index 6716868b..b6bdd30f 100644
--- a/src/MonitoringDemo/DemoLauncher.cs
+++ b/src/MonitoringDemo/DemoLauncher.cs
@@ -1,93 +1,40 @@
-namespace MonitoringDemo;
-
-sealed class DemoLauncher : IDisposable
-{
- public DemoLauncher()
- {
- demoJob = new Job("Particular.MonitoringDemo");
-
- File.WriteAllText(@".\Marker.sln", string.Empty);
- }
-
- public void Dispose()
- {
- disposed = true;
-
- demoJob.Dispose();
-
- File.Delete(@".\Marker.sln");
-
- Console.WriteLine("Removing Transport Files");
- DirectoryEx.Delete(".learningtransport");
-
- Console.WriteLine("Deleting log folders");
- DirectoryEx.Delete(".logs");
-
- Console.WriteLine("Deleting db folders");
- DirectoryEx.ForceDeleteReadonly(".db");
- DirectoryEx.ForceDeleteReadonly(".audit-db");
- }
-
- public void Platform()
- {
- if (disposed)
- {
- return;
- }
-
- demoJob.AddProcess(Path.Combine("Platform", $"Platform.exe"));
- }
-
- public void Billing()
- {
- if (disposed)
- {
- return;
- }
-
- demoJob.AddProcess(Path.Combine("Billing", "Billing.exe"));
- }
-
- public void Shipping()
- {
- if (disposed)
- {
- return;
- }
-
- demoJob.AddProcess(Path.Combine("Shipping", "Shipping.exe"));
- }
-
- public void ScaleOutSales()
- {
- if (disposed)
- {
- return;
- }
-
- demoJob.AddProcess(Path.Combine("Sales", "Sales.exe"));
- }
-
- public void ScaleInSales()
- {
- if (disposed)
- {
- return;
- }
-
- demoJob.KillProcess(Path.Combine("Sales", "Sales.exe"));
- }
-
- public void ClientUI()
- {
- if (disposed)
- {
- return;
- }
-
- demoJob.AddProcess(Path.Combine("ClientUI", "ClientUI.exe"));
- }
-
- readonly Job demoJob;
- private bool disposed;
+namespace MonitoringDemo;
+
+sealed class DemoLauncher : IDisposable
+{
+ public DemoLauncher()
+ {
+ demoProcessGroup = new ProcessGroup("Particular.MonitoringDemo");
+ }
+
+ public void Dispose()
+ {
+ disposed = true;
+
+ demoProcessGroup.Dispose();
+
+ //Console.WriteLine("Removing Transport Files");
+ DirectoryEx.Delete(".learningtransport");
+
+ //Console.WriteLine("Deleting log folders");
+ DirectoryEx.Delete(".logs");
+
+ //Console.WriteLine("Deleting db folders");
+ DirectoryEx.ForceDeleteReadonly(".db");
+ DirectoryEx.ForceDeleteReadonly(".audit-db");
+ }
+
+ public ProcessHandle AddProcess(string name, string instanceId, int port)
+ {
+ if (disposed)
+ {
+ return ProcessHandle.Empty;
+ }
+
+ var path = Path.Combine("..", name, $"{name}.dll"); //TODO: Hard-coded convention
+ return demoProcessGroup.AddProcess(path, instanceId, port);
+ }
+
+ readonly ProcessGroup demoProcessGroup;
+ private bool disposed;
}
\ No newline at end of file
diff --git a/src/MonitoringDemo/DirectoryEx.cs b/src/MonitoringDemo/DirectoryEx.cs
index eeedacab..dff5d6a9 100644
--- a/src/MonitoringDemo/DirectoryEx.cs
+++ b/src/MonitoringDemo/DirectoryEx.cs
@@ -1,57 +1,57 @@
-namespace MonitoringDemo;
-
-static class DirectoryEx
-{
- public static void Delete(string directoryPath)
- {
- for (var i = 0; i < 3; i++)
- {
- try
- {
- Directory.Delete(directoryPath, true);
- return;
- }
- catch (DirectoryNotFoundException)
- {
- return;
- }
- catch (Exception)
- {
- // ignored
- Thread.Sleep(2000);
- }
- }
- }
-
- public static void ForceDeleteReadonly(string directoryPath)
- {
- for (var i = 0; i < 3; i++)
- {
- try
- {
- // necessary because ravendb creates some folders read-only
- var directory = new DirectoryInfo(directoryPath) { Attributes = FileAttributes.Normal };
-
- foreach (var info in directory.GetFileSystemInfos("*", SearchOption.AllDirectories))
- {
- if (info.Attributes != FileAttributes.Normal)
- {
- info.Attributes = FileAttributes.Normal;
- }
- }
-
- directory.Delete(true);
- return;
- }
- catch (DirectoryNotFoundException)
- {
- return;
- }
- catch (Exception)
- {
- // ignored
- Thread.Sleep(2000);
- }
- }
- }
+namespace MonitoringDemo;
+
+static class DirectoryEx
+{
+ public static void Delete(string directoryPath)
+ {
+ for (var i = 0; i < 3; i++)
+ {
+ try
+ {
+ Directory.Delete(directoryPath, true);
+ return;
+ }
+ catch (DirectoryNotFoundException)
+ {
+ return;
+ }
+ catch (Exception)
+ {
+ // ignored
+ Thread.Sleep(2000);
+ }
+ }
+ }
+
+ public static void ForceDeleteReadonly(string directoryPath)
+ {
+ for (var i = 0; i < 3; i++)
+ {
+ try
+ {
+ // necessary because ravendb creates some folders read-only
+ var directory = new DirectoryInfo(directoryPath) { Attributes = FileAttributes.Normal };
+
+ foreach (var info in directory.GetFileSystemInfos("*", SearchOption.AllDirectories))
+ {
+ if (info.Attributes != FileAttributes.Normal)
+ {
+ info.Attributes = FileAttributes.Normal;
+ }
+ }
+
+ directory.Delete(true);
+ return;
+ }
+ catch (DirectoryNotFoundException)
+ {
+ return;
+ }
+ catch (Exception)
+ {
+ // ignored
+ Thread.Sleep(2000);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/MonitoringDemo/IWidget.cs b/src/MonitoringDemo/IWidget.cs
new file mode 100644
index 00000000..fc9c7a7f
--- /dev/null
+++ b/src/MonitoringDemo/IWidget.cs
@@ -0,0 +1,6 @@
+namespace MonitoringDemo;
+
+public interface IWidget
+{
+ string ProcessInput(string line);
+}
\ No newline at end of file
diff --git a/src/MonitoringDemo/Job.cs b/src/MonitoringDemo/Job.cs
deleted file mode 100644
index 9ae5f42b..00000000
--- a/src/MonitoringDemo/Job.cs
+++ /dev/null
@@ -1,209 +0,0 @@
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-
-namespace MonitoringDemo;
-
-partial class Job : IDisposable
-{
- public Job(string jobName)
- {
- handle = CreateJobObject(nint.Zero, jobName);
-
- var info = new JOBOBJECT_BASIC_LIMIT_INFORMATION
- {
- LimitFlags = 0x2000
- };
-
- var extendedInfo = new JOBOBJECT_EXTENDED_LIMIT_INFORMATION
- {
- BasicLimitInformation = info
- };
-
- var length = Marshal.SizeOf(typeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION));
- var extendedInfoPtr = Marshal.AllocHGlobal(length);
- Marshal.StructureToPtr(extendedInfo, extendedInfoPtr, false);
-
- if (!SetInformationJobObject(handle, JobObjectInfoType.ExtendedLimitInformation, extendedInfoPtr, (uint)length))
- {
- throw new Exception($"Unable to set information. Error: {Marshal.GetLastWin32Error()}");
- }
- }
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- public bool AddProcess(string relativeExePath)
- {
- if (!processesByExec.TryGetValue(relativeExePath, out var processes))
- {
- processes = [];
- processesByExec[relativeExePath] = processes;
- }
-
- var processesCount = processes.Count;
- var instanceId = processesCount == 0 ? null : $"instance-{processesCount}";
-
- var process = StartProcess(relativeExePath, instanceId);
-
- if (process is null)
- {
- return false;
- }
-
- processes.Push(process);
-
- return AddProcess(process);
- }
-
- public void KillProcess(string relativeExePath)
- {
- if (!processesByExec.TryGetValue(relativeExePath, out var processes))
- {
- return;
- }
-
- while (processes.TryPop(out var victim))
- {
- try
- {
- victim.Kill(true);
- return;
- }
- catch (Exception)
- {
- //The process has died or has been killed by the user. Let's try to kill another one by doing at
- // least another iteration
- }
- finally
- {
- victim.Dispose();
- }
- }
- }
-
- bool AddProcess(Process process) => AddProcess(process.Handle);
-
- bool AddProcess(nint processHandle) => AssignProcessToJobObject(handle, processHandle);
-
- static Process? StartProcess(string relativeExePath, string? arguments = null)
- {
- var fullExePath = Path.GetFullPath(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, relativeExePath));
- var workingDirectory = Path.GetDirectoryName(fullExePath);
-
- var startInfo = new ProcessStartInfo(fullExePath)
- {
- WorkingDirectory = workingDirectory,
- UseShellExecute = true
- };
-
- if (arguments is not null)
- {
- startInfo.Arguments = arguments;
- }
-
- return Process.Start(startInfo);
- }
-
- void Dispose(bool disposing)
- {
- if (disposed)
- {
- return;
- }
-
- if (!disposing)
- {
- return;
- }
-
- CloseHandle(handle);
- handle = nint.Zero;
- processesByExec.Clear();
- disposed = true;
- }
-
- [LibraryImport("kernel32.dll", EntryPoint = "CreateJobObjectW", StringMarshalling = StringMarshalling.Utf16)]
- private static partial nint CreateJobObject(nint a, string lpName);
-
- [LibraryImport("kernel32.dll")]
- [return: MarshalAs(UnmanagedType.Bool)]
- private static partial bool SetInformationJobObject(nint hJob, JobObjectInfoType infoType, nint lpJobObjectInfo, uint cbJobObjectInfoLength);
-
- [LibraryImport("kernel32.dll", SetLastError = true)]
- [return: MarshalAs(UnmanagedType.Bool)]
- private static partial bool AssignProcessToJobObject(nint job, nint process);
-
- [LibraryImport("kernel32.dll", SetLastError = true)]
- [return: MarshalAs(UnmanagedType.Bool)]
- private static partial bool CloseHandle(nint hObject);
-
- readonly Dictionary> processesByExec = [];
-
- nint handle;
- bool disposed;
-}
-
-#region Helper classes
-
-[StructLayout(LayoutKind.Sequential)]
-#pragma warning disable PS0024 // A non-interface type should not be prefixed with I
-struct IO_COUNTERS
-#pragma warning restore PS0024 // A non-interface type should not be prefixed with I
-{
- public ulong ReadOperationCount;
- public ulong WriteOperationCount;
- public ulong OtherOperationCount;
- public ulong ReadTransferCount;
- public ulong WriteTransferCount;
- public ulong OtherTransferCount;
-}
-
-
-[StructLayout(LayoutKind.Sequential)]
-struct JOBOBJECT_BASIC_LIMIT_INFORMATION
-{
- public long PerProcessUserTimeLimit;
- public long PerJobUserTimeLimit;
- public uint LimitFlags;
- public nuint MinimumWorkingSetSize;
- public nuint MaximumWorkingSetSize;
- public uint ActiveProcessLimit;
- public nuint Affinity;
- public uint PriorityClass;
- public uint SchedulingClass;
-}
-
-[StructLayout(LayoutKind.Sequential)]
-struct SECURITY_ATTRIBUTES
-{
- public uint nLength;
- public nint lpSecurityDescriptor;
- public int bInheritHandle;
-}
-
-[StructLayout(LayoutKind.Sequential)]
-struct JOBOBJECT_EXTENDED_LIMIT_INFORMATION
-{
- public JOBOBJECT_BASIC_LIMIT_INFORMATION BasicLimitInformation;
- public IO_COUNTERS IoInfo;
- public nuint ProcessMemoryLimit;
- public nuint JobMemoryLimit;
- public nuint PeakProcessMemoryUsed;
- public nuint PeakJobMemoryUsed;
-}
-
-enum JobObjectInfoType
-{
- AssociateCompletionPortInformation = 7,
- BasicLimitInformation = 2,
- BasicUIRestrictions = 4,
- EndOfJobTimeInformation = 6,
- ExtendedLimitInformation = 9,
- SecurityLimitInformation = 5,
- GroupInformation = 11
-}
-
-#endregion
diff --git a/src/MonitoringDemo/KeyHelper.cs b/src/MonitoringDemo/KeyHelper.cs
new file mode 100644
index 00000000..8e7130d0
--- /dev/null
+++ b/src/MonitoringDemo/KeyHelper.cs
@@ -0,0 +1,55 @@
+using System.Text;
+using Terminal.Gui.Input;
+
+namespace MonitoringDemo;
+
+static class KeyHelper
+{
+ private static readonly string AllRecognizedKeys = "1234567890-=qwertyuiop[]asdfghjkl;'zxcvbnm,./";
+ private static readonly Rune[] Runes = AllRecognizedKeys.Select(x => (Rune)x).ToArray();
+ private static readonly Rune dollarRune = (Rune)'$';
+
+ public static bool IsRecognized(this Key k)
+ {
+ var rune = k.AsRune;
+ return Runes.Contains(rune);
+ }
+
+ public static bool IsPartOfControllerSequence(this Key k, out string? sequence)
+ {
+ var r = k.AsRune;
+ if (r.Value == 0)
+ {
+ sequence = null;
+ return false;
+ }
+ if (r == dollarRune)
+ {
+ //Begin a new sequence regardless
+ currentSequence = "$";
+ sequence = null;
+ return true;
+ }
+ if (currentSequence != null)
+ {
+ currentSequence += r.ToString();
+
+ if (currentSequence.Length == 4)
+ {
+ //Sequence is complete
+ sequence = currentSequence;
+ currentSequence = null;
+ }
+ else
+ {
+ sequence = null;
+ }
+ return true;
+ }
+
+ sequence = null;
+ return false;
+ }
+
+ private static string? currentSequence;
+}
\ No newline at end of file
diff --git a/src/MonitoringDemo/KeyboardController.cs b/src/MonitoringDemo/KeyboardController.cs
new file mode 100644
index 00000000..19ecea36
--- /dev/null
+++ b/src/MonitoringDemo/KeyboardController.cs
@@ -0,0 +1,11 @@
+namespace MonitoringDemo;
+
+public class KeyboardDriver
+{
+ //Controls a process via key bindings. Level controls are bound to up/down key combinations
+}
+
+public class ControllerDriver
+{
+ //Controls a process via a dedicated USB controller.
+}
\ No newline at end of file
diff --git a/src/MonitoringDemo/MonitoringDemo.bat b/src/MonitoringDemo/MonitoringDemo.bat
new file mode 100644
index 00000000..cb97b05f
--- /dev/null
+++ b/src/MonitoringDemo/MonitoringDemo.bat
@@ -0,0 +1 @@
+dotnet MonitoringDemo\MonitoringDemo.dll
\ No newline at end of file
diff --git a/src/MonitoringDemo/MonitoringDemo.csproj b/src/MonitoringDemo/MonitoringDemo.csproj
index 54c12dcb..5af58f0d 100644
--- a/src/MonitoringDemo/MonitoringDemo.csproj
+++ b/src/MonitoringDemo/MonitoringDemo.csproj
@@ -1,12 +1,24 @@
-
-
-
- net8.0
- Exe
- enable
- enable
- ..\binaries\
- true
-
-
+
+
+
+ net8.0
+ Exe
+ enable
+ enable
+ ..\..\binaries\MonitoringDemo
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/MonitoringDemo/MonitoringDemo.sh b/src/MonitoringDemo/MonitoringDemo.sh
new file mode 100755
index 00000000..d94a6e0a
--- /dev/null
+++ b/src/MonitoringDemo/MonitoringDemo.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+dotnet MonitoringDemo/MonitoringDemo.dll
\ No newline at end of file
diff --git a/src/MonitoringDemo/ProcessGroup.cs b/src/MonitoringDemo/ProcessGroup.cs
new file mode 100644
index 00000000..82925415
--- /dev/null
+++ b/src/MonitoringDemo/ProcessGroup.cs
@@ -0,0 +1,323 @@
+using System.Diagnostics;
+using System.Runtime.InteropServices;
+using System.Runtime.Versioning;
+using System.Threading.Channels;
+
+namespace MonitoringDemo;
+
+///
+/// Provides process group management functionality across Windows, Linux, and macOS.
+/// Uses Windows Job Objects on Windows and Process Groups on Unix-like systems.
+///
+partial class ProcessGroup : IDisposable
+{
+ readonly Dictionary> processesByAssemblyPath = [];
+ bool disposed;
+
+ public ProcessGroup(string groupName)
+ {
+ if (OperatingSystem.IsWindows())
+ {
+ InitializeWindowsJob(groupName);
+ }
+ }
+
+ public ProcessHandle AddProcess(string relativeAssemblyPath, string instanceId, int port)
+ {
+ if (!processesByAssemblyPath.TryGetValue(relativeAssemblyPath, out var processes))
+ {
+ processes = [];
+ processesByAssemblyPath[relativeAssemblyPath] = processes;
+ }
+
+ var process = StartProcess(relativeAssemblyPath, instanceId, port.ToString());
+
+ if (process is null)
+ {
+ return ProcessHandle.Empty;
+ }
+
+ var outputChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
+ process.OutputDataReceived += (sender, args) => outputChannel.Writer.TryWrite(args.Data);
+ process.BeginOutputReadLine();
+
+ processes.Add(process.Id, process);
+
+ if (OperatingSystem.IsWindows())
+ {
+ AddProcessToWindowsJob(process);
+ }
+ else if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS())
+ {
+ AddProcessToUnixGroup(process);
+ }
+
+ return new ProcessHandle(outputChannel, input => process.StandardInput.WriteLine(input), () =>
+ {
+ KillProcess(relativeAssemblyPath, process.Id);
+ });
+ }
+
+ private void KillProcess(string relativeAssemblyPath, int id)
+ {
+ if (!processesByAssemblyPath.TryGetValue(relativeAssemblyPath, out var processes))
+ {
+ return;
+ }
+
+ if (!processes.TryGetValue(id, out var victim))
+ {
+ return;
+ }
+
+ try
+ {
+ if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS())
+ {
+ KillProcessGroupUnix(victim.Id);
+ }
+ else
+ {
+ victim.Kill(true);
+ }
+
+ processes.Remove(id);
+ }
+ catch (Exception)
+ {
+ // Process already terminated
+ }
+ finally
+ {
+ victim.Dispose();
+ }
+ }
+
+ #region Unix-specific implementations
+
+ [SupportedOSPlatform("linux")]
+ [SupportedOSPlatform("macos")]
+ private static bool AddProcessToUnixGroup(Process process)
+ {
+ try
+ {
+ // Set process group ID to child process ID (creating new group)
+ return setpgid(process.Id, process.Id) == 0;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ [SupportedOSPlatform("linux")]
+ [SupportedOSPlatform("macos")]
+ private void KillProcessGroupUnix(int processId)
+ {
+ // Send SIGTERM to the entire process group
+ kill(-processId, 15); // 15 is SIGTERM
+ }
+
+ // P/Invoke declarations for Unix systems
+ [LibraryImport("libc", SetLastError = true)]
+ [SupportedOSPlatform("linux")]
+ [SupportedOSPlatform("macos")]
+ private static partial int setpgid(int pid, int pgid);
+
+ [LibraryImport("libc", SetLastError = true)]
+ [SupportedOSPlatform("linux")]
+ [SupportedOSPlatform("macos")]
+ private static partial int kill(int pid, int sig);
+
+ #endregion
+
+ #region Windows-specific implementations
+
+ nint jobHandle;
+
+ [SupportedOSPlatform("windows")]
+ private void InitializeWindowsJob(string jobName)
+ {
+ jobHandle = CreateJobObject(nint.Zero, jobName);
+
+ var info = new JOBOBJECT_BASIC_LIMIT_INFORMATION
+ {
+ LimitFlags = 0x2000 // JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
+ };
+
+ var extendedInfo = new JOBOBJECT_EXTENDED_LIMIT_INFORMATION
+ {
+ BasicLimitInformation = info
+ };
+
+ var length = Marshal.SizeOf(typeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION));
+ var extendedInfoPtr = Marshal.AllocHGlobal(length);
+ Marshal.StructureToPtr(extendedInfo, extendedInfoPtr, false);
+
+ try
+ {
+ if (!SetInformationJobObject(jobHandle, JobObjectInfoType.ExtendedLimitInformation, extendedInfoPtr, (uint)length))
+ {
+ throw new Exception($"Unable to set information. Error: {Marshal.GetLastWin32Error()}");
+ }
+ }
+ finally
+ {
+ Marshal.FreeHGlobal(extendedInfoPtr);
+ }
+ }
+
+ [SupportedOSPlatform("windows")]
+ private bool AddProcessToWindowsJob(Process process) =>
+ AssignProcessToJobObject(jobHandle, process.Handle);
+
+ [SupportedOSPlatform("windows")]
+ private void DisposeWindowsJob()
+ {
+ if (jobHandle != nint.Zero)
+ {
+ CloseHandle(jobHandle);
+ jobHandle = nint.Zero;
+ }
+ }
+
+ // P/Invoke declarations for Windows
+ [LibraryImport("kernel32.dll", EntryPoint = "CreateJobObjectW", StringMarshalling = StringMarshalling.Utf16)]
+ [SupportedOSPlatform("windows")]
+ private static partial nint CreateJobObject(nint a, string lpName);
+
+ [LibraryImport("kernel32.dll")]
+ [SupportedOSPlatform("windows")]
+ [return: MarshalAs(UnmanagedType.Bool)]
+ private static partial bool SetInformationJobObject(nint hJob, JobObjectInfoType infoType, nint lpJobObjectInfo, uint cbJobObjectInfoLength);
+
+ [LibraryImport("kernel32.dll", SetLastError = true)]
+ [SupportedOSPlatform("windows")]
+ [return: MarshalAs(UnmanagedType.Bool)]
+ private static partial bool AssignProcessToJobObject(nint job, nint process);
+
+ [LibraryImport("kernel32.dll", SetLastError = true)]
+ [SupportedOSPlatform("windows")]
+ [return: MarshalAs(UnmanagedType.Bool)]
+ private static partial bool CloseHandle(nint hObject);
+
+ #endregion
+
+ private static Process? StartProcess(string relativeAssemblyPath, params string[] arguments)
+ {
+ var fullAssemblyPath = Path.GetFullPath(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, relativeAssemblyPath));
+ var workingDirectory = Path.GetDirectoryName(fullAssemblyPath);
+
+ var startInfo = new ProcessStartInfo("dotnet", fullAssemblyPath)
+ {
+ WorkingDirectory = workingDirectory,
+ UseShellExecute = false,
+ RedirectStandardInput = true,
+ RedirectStandardOutput = true
+ };
+
+ foreach (var a in arguments)
+ {
+ startInfo.Arguments += $" {a}";
+ }
+
+ return Process.Start(startInfo);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (disposed)
+ {
+ return;
+ }
+
+ if (disposing)
+ {
+ if (OperatingSystem.IsWindows())
+ {
+ DisposeWindowsJob();
+ }
+ else if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS())
+ {
+ // Clean up any remaining processes on Unix systems
+ foreach (var pid in processesByAssemblyPath.Values.SelectMany(x => x.Keys))
+ {
+ try
+ {
+ KillProcessGroupUnix(pid);
+ }
+ catch
+ {
+ // Process might already be terminated
+ }
+ }
+ }
+ else
+ {
+ throw new PlatformNotSupportedException("Process management is not supported on this platform.");
+ }
+
+ processesByAssemblyPath.Clear();
+ }
+
+ disposed = true;
+ }
+}
+
+#region Windows-specific structs and enums
+#pragma warning disable PS0024
+[StructLayout(LayoutKind.Sequential)]
+file struct IO_COUNTERS
+#pragma warning restore PS0024
+{
+ public ulong ReadOperationCount;
+ public ulong WriteOperationCount;
+ public ulong OtherOperationCount;
+ public ulong ReadTransferCount;
+ public ulong WriteTransferCount;
+ public ulong OtherTransferCount;
+}
+
+[StructLayout(LayoutKind.Sequential)]
+file struct JOBOBJECT_BASIC_LIMIT_INFORMATION
+{
+ public long PerProcessUserTimeLimit;
+ public long PerJobUserTimeLimit;
+ public uint LimitFlags;
+ public nuint MinimumWorkingSetSize;
+ public nuint MaximumWorkingSetSize;
+ public uint ActiveProcessLimit;
+ public nuint Affinity;
+ public uint PriorityClass;
+ public uint SchedulingClass;
+}
+
+[StructLayout(LayoutKind.Sequential)]
+file struct JOBOBJECT_EXTENDED_LIMIT_INFORMATION
+{
+ public JOBOBJECT_BASIC_LIMIT_INFORMATION BasicLimitInformation;
+ public IO_COUNTERS IoInfo;
+ public nuint ProcessMemoryLimit;
+ public nuint JobMemoryLimit;
+ public nuint PeakProcessMemoryUsed;
+ public nuint PeakJobMemoryUsed;
+}
+
+internal enum JobObjectInfoType
+{
+ AssociateCompletionPortInformation = 7,
+ BasicLimitInformation = 2,
+ BasicUIRestrictions = 4,
+ EndOfJobTimeInformation = 6,
+ ExtendedLimitInformation = 9,
+ SecurityLimitInformation = 5,
+ GroupInformation = 11
+}
+#pragma warning restore PS0024
+#endregion
\ No newline at end of file
diff --git a/src/MonitoringDemo/ProcessHandle.cs b/src/MonitoringDemo/ProcessHandle.cs
new file mode 100644
index 00000000..482edcd3
--- /dev/null
+++ b/src/MonitoringDemo/ProcessHandle.cs
@@ -0,0 +1,26 @@
+using System.Threading.Channels;
+
+namespace MonitoringDemo;
+
+sealed class ProcessHandle(Channel outputChannel, Action sendAction, Action closeAction)
+ : IDisposable
+{
+ public ChannelReader Reader { get; } = outputChannel.Reader;
+
+ public void Send(string value)
+ {
+ sendAction(value);
+ }
+
+ public IAsyncEnumerable ReadAllAsync(CancellationToken cancellationToken = default) {
+ return outputChannel.Reader.ReadAllAsync(cancellationToken);
+ }
+
+ public void Dispose()
+ {
+ outputChannel.Writer.TryComplete();
+ closeAction();
+ }
+
+ public static readonly ProcessHandle Empty = new(Channel.CreateBounded(0), _ => { }, () => { });
+}
\ No newline at end of file
diff --git a/src/MonitoringDemo/ProcessWindow.cs b/src/MonitoringDemo/ProcessWindow.cs
new file mode 100644
index 00000000..f8cf3fdd
--- /dev/null
+++ b/src/MonitoringDemo/ProcessWindow.cs
@@ -0,0 +1,413 @@
+using System.Collections.Concurrent;
+using System.Collections.ObjectModel;
+using System.Diagnostics;
+using System.Text;
+using System.Text.RegularExpressions;
+using Terminal.Gui.App;
+using Terminal.Gui.Input;
+using Terminal.Gui.ViewBase;
+using Terminal.Gui.Views;
+
+
+namespace MonitoringDemo;
+
+sealed partial class ProcessWindow : Window
+{
+ private const string Letters = "abcdefghijklmnopqrstuvwxyz";
+
+ private readonly string name;
+ private readonly bool singleInstance;
+ private readonly int basePort;
+ private readonly DemoLauncher launcher;
+ private readonly CancellationToken cancellationToken;
+
+ private readonly ConcurrentDictionary> linesPerInstance = new();
+ private readonly Dictionary recognizedKeys = new();
+ public ListView? InstanceView { get; }
+ public ListView LogView { get; }
+ private ObservableCollection Instances { get; } = new();
+
+ private string?[] PrometheusPorts = new string[10];
+
+ private Dictionary Processes { get; } = new();
+
+ [GeneratedRegex(@"Press (.) to")]
+ private static partial Regex PressKeyRegex();
+
+ [GeneratedRegex(@"!BeginWidget (\w+) (\w+)")]
+ private static partial Regex WidgetStartRegex();
+
+ [GeneratedRegex(@"!EndWidget (\w+)")]
+ private static partial Regex WidgetEndRegex();
+
+ [GeneratedRegex(@"!Widget (\w+) (\w+)")]
+ private static partial Regex WidgetUpdateRegex();
+
+ public ProcessWindow(string title, string name, bool singleInstance, int basePort, DemoLauncher launcher, CancellationToken cancellationToken)
+ {
+ this.name = name;
+ this.singleInstance = singleInstance;
+ this.basePort = basePort;
+ this.launcher = launcher;
+ this.cancellationToken = cancellationToken;
+
+ Title = title;
+ X = 0;
+ Y = 1;
+ Width = Dim.Fill();
+ Height = Dim.Fill();
+
+ if (!singleInstance)
+ {
+ InstanceView = new ListView
+ {
+ X = 0,
+ Y = 0,
+ Width = Dim.Fill(),
+ Height = Dim.Fill(),
+ Source = new ListWrapper(Instances)
+ };
+ InstanceView.SetSource(Instances);
+ InstanceView.SelectedItemChanged += InstanceView_SelectedItemChanged;
+ var instanceViewFrame = new FrameView
+ {
+ X = 0,
+ Y = 0,
+ Width = 15,
+ Height = Dim.Fill(),
+ Title = "Instances"
+ };
+ instanceViewFrame.Add(InstanceView);
+
+ Add(instanceViewFrame);
+ }
+
+ LogView = new ListView
+ {
+ X = 0,
+ Y = 0,
+ Width = Dim.Fill(),
+ Height = Dim.Fill(),
+ Source = new ListWrapper([])
+ };
+ LogView.AllowsMarking = false;
+ LogView.AllowsMultipleSelection = false;
+ var logViewFrame = new FrameView
+ {
+ X = InstanceView != null ? 15 : 0,
+ Y = 0,
+ Width = Dim.Fill(),
+ Height = Dim.Fill(),
+ Title = "Output"
+ };
+ logViewFrame.Add(LogView);
+
+ Add(logViewFrame);
+
+ AddCommand(Command.DeleteAll, () =>
+ {
+ var instance = Instances[SelectedInstance];
+ var lines = linesPerInstance.GetOrAdd(instance, _ => []);
+ lines.Clear();
+ LogView.SetSource(lines);
+ return true;
+ });
+ AddCommand(Command.HotKey, () =>
+ {
+ var instance = Instances[SelectedInstance];
+ Processes[instance].Send("?");
+ return true;
+ });
+ AddCommand(Command.Up, () =>
+ {
+ ScaleOut();
+ return true;
+ });
+ AddCommand(Command.Down, () =>
+ {
+ ScaleIn();
+ return true;
+ });
+
+ KeyBindings.Add(Key.C.WithCtrl, Command.DeleteAll);
+ KeyBindings.Add(Key.F1, Command.HotKey);
+
+ if (!singleInstance)
+ {
+ KeyBindings.Add(Key.F2, Command.Up);
+ KeyBindings.Add(Key.F3, Command.Down);
+ }
+
+ StartNewProcess(CancellationTokenSource.CreateLinkedTokenSource(cancellationToken));
+ }
+
+ private void ScaleIn()
+ {
+ var instance = Instances[SelectedInstance];
+ DoScaleIn(instance);
+ }
+
+ private void ScaleInLast()
+ {
+ var instance = Instances.LastOrDefault();
+ if (instance == null)
+ {
+ return;
+ }
+ DoScaleIn(instance);
+ }
+
+ private void DoScaleIn(string instance)
+ {
+ Debug.WriteLine($"Stopping instance {instance}");
+
+ Processes.Remove(instance, out var process);
+ process!.Dispose();
+ Instances.Remove(instance);
+ linesPerInstance.TryRemove(instance, out _);
+
+ FreePort(instance);
+ }
+
+ private void ScaleOut()
+ {
+ Debug.WriteLine($"Starting new instance.");
+
+ var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ StartNewProcess(cancellationTokenSource);
+ }
+
+ private void ScaleTo(int value)
+ {
+ var numberOfInstances = (value / 2) + 1; //Value is 0-9. Make it min one instance, max 5 instances
+ Debug.WriteLine($"Scaling to {numberOfInstances}.");
+ while (Instances.Count < numberOfInstances)
+ {
+ ScaleOut();
+ }
+ while (Instances.Count > numberOfInstances)
+ {
+ ScaleInLast();
+ }
+ }
+
+ int SelectedInstance => Math.Max(InstanceView?.SelectedItem ?? 0, 0);
+
+ private void InstanceView_SelectedItemChanged(object? sender, ListViewItemEventArgs args)
+ {
+ SelectInstance(Instances[args.Item]);
+ }
+
+ void StartNewProcess(CancellationTokenSource cancellationTokenSource)
+ {
+ string instanceId;
+ do
+ {
+ instanceId = new string(Enumerable.Range(0, 4).Select(x => Letters[Random.Shared.Next(Letters.Length)]).ToArray());
+ } while (Instances.Contains(instanceId));
+
+ var port = FindPort(instanceId);
+ if (port == null)
+ {
+ //No more free ports
+ return;
+ }
+
+ var process = new Process(launcher.AddProcess(name, instanceId, basePort + port.Value), cancellationTokenSource);
+ Processes[instanceId] = process;
+ Instances.Add(instanceId);
+
+ PrintOutput(instanceId, process, cancellationTokenSource.Token);
+
+ SelectInstance(instanceId);
+ }
+
+ int? FindPort(string instance)
+ {
+ for (var i = 0; i < PrometheusPorts.Length; i++)
+ {
+ if (PrometheusPorts[i] == null)
+ {
+ PrometheusPorts[i] = instance;
+ return i;
+ }
+ }
+ return null;
+ }
+
+ void FreePort(string instance)
+ {
+ for (var i = 0; i < PrometheusPorts.Length; i++)
+ {
+ if (PrometheusPorts[i] == instance)
+ {
+ PrometheusPorts[i] = null;
+ }
+ }
+ }
+
+ void SelectInstance(string instance)
+ {
+ LogView.SetSource(linesPerInstance.GetOrAdd(instance, _ => []));
+ LogView.MoveEnd();
+ }
+
+ void PrintOutput(string instance, Process process, CancellationToken cancellationToken)
+ {
+ var lines = linesPerInstance.GetOrAdd(instance, _ => []);
+
+ _ = Task.Run(async () =>
+ {
+ try
+ {
+ var activeWidgets = new Dictionary();
+ var activeWidgetPositions = new Dictionary();
+
+ await foreach (var output in process.ReadAllAsync(cancellationToken))
+ {
+ if (string.IsNullOrWhiteSpace(output))
+ {
+ continue;
+ }
+
+ Application.Invoke(() =>
+ {
+ var startWidgetMatch = WidgetStartRegex().Match(output);
+ if (startWidgetMatch.Success)
+ {
+ var widgetName = startWidgetMatch.Groups[1].Value;
+ var widgetId = startWidgetMatch.Groups[2].Value;
+
+ var widget = CreateWidget(widgetName);
+ if (widget != null)
+ {
+ activeWidgets[widgetId] = widget;
+ }
+ return;
+ }
+ var endWidgetMatch = WidgetEndRegex().Match(output);
+ if (endWidgetMatch.Success)
+ {
+ var widgetId = endWidgetMatch.Groups[1].Value;
+ activeWidgets.Remove(widgetId);
+ return;
+ }
+
+ var updateWidgetMatch = WidgetUpdateRegex().Match(output);
+ if (updateWidgetMatch.Success)
+ {
+ var widgetId = updateWidgetMatch.Groups[1].Value;
+ var widgetData = updateWidgetMatch.Groups[2].Value;
+
+ var widgetLine = activeWidgets[widgetId].ProcessInput(widgetData);
+
+ if (!activeWidgetPositions.TryGetValue(widgetId, out var position))
+ {
+ activeWidgetPositions[widgetId] = lines.Count;
+ lines.Add(widgetLine);
+
+ }
+ else
+ {
+ lines[position] = widgetLine;
+ }
+ LogView.MoveEnd(); // Scroll to end
+ return;
+ }
+
+ var pressKeyMatch = PressKeyRegex().Match(output);
+ if (pressKeyMatch.Success)
+ {
+ var groupValue = pressKeyMatch.Groups[1].Value[0];
+ var c = char.ToLowerInvariant(groupValue);
+ recognizedKeys[(Rune)c] = c;
+ }
+
+ lines.Add(output);
+ //Simple hacky way to not store all the data in the world
+ if (lines.Count > 100)
+ {
+ lines.RemoveAt(0);
+ }
+ LogView.MoveEnd(); // Scroll to end
+ });
+ }
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ // Ignore cancellation
+ }
+ });
+ }
+
+ private static IWidget? CreateWidget(string widgetName)
+ {
+ return widgetName == "Progress" ? new ProgressBarWidget() : null;
+ }
+
+ public void HandleSequence(string sequenceWithoutDollar)
+ {
+ if (sequenceWithoutDollar is ['A', _, ..])
+ {
+ if (!singleInstance)
+ {
+ //First dial is scale out
+ var scaleFactor = int.Parse(sequenceWithoutDollar.Substring(1, 1));
+ ScaleTo(scaleFactor);
+ }
+ }
+ else
+ {
+ foreach (var handle in Processes.Values)
+ {
+ handle.Send($"${sequenceWithoutDollar}");
+ }
+ }
+ }
+
+ public void HandleKey(Key e)
+ {
+ var instance = Instances[SelectedInstance];
+ var r = e.AsRune;
+ if (!recognizedKeys.TryGetValue(r, out var c))
+ {
+ return;
+ }
+
+ //If uppercase, send to all instances. If lowercase, send to selected instance
+ if (e.IsShift)
+ {
+ foreach (var handle in Processes.Values)
+ {
+ handle.Send(new string(c, 1));
+ }
+ }
+ else
+ {
+ Processes[instance].Send(new string(c, 1));
+ }
+
+ e.Handled = true;
+ }
+
+ sealed class Process(ProcessHandle handle, CancellationTokenSource cancellationTokenSource)
+ : IDisposable
+ {
+ public void Send(string value)
+ {
+ handle.Send(value);
+ }
+
+ public IAsyncEnumerable ReadAllAsync(CancellationToken cancellationToken = default)
+ {
+ return handle.ReadAllAsync(cancellationToken);
+ }
+
+ public void Dispose()
+ {
+ cancellationTokenSource.Cancel();
+ handle.Dispose();
+ cancellationTokenSource.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/MonitoringDemo/Program.cs b/src/MonitoringDemo/Program.cs
index 937cb738..5e57ef34 100644
--- a/src/MonitoringDemo/Program.cs
+++ b/src/MonitoringDemo/Program.cs
@@ -1,104 +1,141 @@
-using MonitoringDemo;
-
-CancellationTokenSource tokenSource = new();
-Console.Title = "MonitoringDemo";
-var syncEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
-
-Console.CancelKeyPress += (sender, eventArgs) =>
-{
- eventArgs.Cancel = true;
- tokenSource.Cancel();
- syncEvent.TrySetResult(true);
-};
-
-try
-{
- using var launcher = new DemoLauncher();
- Console.WriteLine("Starting the Particular Platform");
-
- launcher.Platform();
-
- using (ColoredConsole.Use(ConsoleColor.Yellow))
- {
- Console.WriteLine(
- "Once ServiceControl has finished starting a browser window will pop up showing the ServicePulse monitoring tab");
- }
-
- Console.WriteLine("Starting Demo Solution");
-
- if (!tokenSource.IsCancellationRequested)
- {
- Console.WriteLine("Starting Billing endpoint.");
- launcher.Billing();
-
- Console.WriteLine("Starting Sales endpoint.");
- launcher.ScaleOutSales();
-
- Console.WriteLine("Starting Shipping endpoint.");
- launcher.Shipping();
-
- Console.WriteLine("Starting ClientUI endpoint.");
- launcher.ClientUI();
-
- using (ColoredConsole.Use(ConsoleColor.Yellow))
- {
- ScaleSalesEndpointIfRequired(launcher, syncEvent);
-
- await syncEvent.Task;
-
- Console.WriteLine("Shutting down");
- }
- }
-}
-catch (Exception e)
-{
- using (ColoredConsole.Use(ConsoleColor.Red))
- {
- Console.WriteLine("Error starting setting up demo.");
- Console.WriteLine($"{e.Message}{Environment.NewLine}{e.StackTrace}");
- }
-}
-
-using (ColoredConsole.Use(ConsoleColor.Yellow))
-{
- Console.WriteLine("Done, press ENTER.");
- Console.ReadLine();
-}
-
-void ScaleSalesEndpointIfRequired(DemoLauncher launcher, TaskCompletionSource syncEvent)
-{
- _ = Task.Run(() =>
- {
- try
- {
- Console.WriteLine();
- Console.WriteLine("Press [up arrow] to scale out the Sales service or [down arrow] to scale in");
- Console.WriteLine("Press Ctrl+C stop Particular Monitoring Demo.");
- Console.WriteLine();
-
- while (!tokenSource.IsCancellationRequested)
- {
- var input = Console.ReadKey(true);
-
- switch (input.Key)
- {
- case ConsoleKey.DownArrow:
- launcher.ScaleInSales();
- break;
- case ConsoleKey.UpArrow:
- launcher.ScaleOutSales();
- break;
- }
- }
- }
- catch (OperationCanceledException)
- {
- // ignore
- }
- catch (Exception e)
- {
- // surface any other exception
- syncEvent.TrySetException(e);
- }
- });
-}
+using System.Diagnostics;
+using MonitoringDemo;
+using Terminal.Gui.App;
+using Terminal.Gui.Input;
+using Terminal.Gui.ViewBase;
+using Terminal.Gui.Views;
+
+CancellationTokenSource tokenSource = new();
+var cancellationToken = tokenSource.Token;
+
+Application.Init();
+
+using var launcher = new DemoLauncher();
+
+using var top = new Window();
+top.Title = "Particular Monitoring Demo";
+top.X = 0;
+top.Y = 1;
+top.Width = Dim.Fill();
+top.Height = Dim.Fill();
+
+var menuBarItems = new List();
+
+ProcessWindow[] windows = [];
+var platformWindow = CreateWindow("Platform", "PlatformLauncher", "_Platform", true, 10010, cancellationToken);
+var clientWindow = CreateWindow("ClientUI", "ClientUI", "_ClientUI", false, 10000, cancellationToken);
+var billingWindow = CreateWindow("Billing", "Billing", "_Billing", false, 10020, cancellationToken);
+var shippingWindow = CreateWindow("Shipping", "Shipping", "S_hipping", false, 10030, cancellationToken);
+var salesWindow = CreateWindow("Sales", "Sales", "_Sales", false, 10040, cancellationToken);
+
+windows = [
+ platformWindow,
+ clientWindow,
+ billingWindow,
+ shippingWindow,
+ salesWindow
+];
+
+var quitMenuBarItem = new MenuBarItemv2("_Quit");
+quitMenuBarItem.Accepting += (_, eventArgs) =>
+{
+ tokenSource.Cancel();
+ eventArgs.Handled = true;
+ Application.RequestStop();
+};
+menuBarItems.Add(quitMenuBarItem);
+
+top.Add(new MenuBarv2
+{
+ Menus = [.. menuBarItems]
+});
+
+foreach (var window in windows)
+{
+ top.Add(window);
+}
+
+foreach (var window in windows.Skip(1))
+{
+ window.Visible = false;
+}
+
+Application.KeyDown += ApplicationKeyDown;
+
+void ApplicationKeyDown(object? sender, Key e)
+{
+ if (e.IsCtrl)
+ {
+ //Do not forward ctrl
+ return;
+ }
+
+ if (e.IsPartOfControllerSequence(out var seq))
+ {
+ e.Handled = true;
+ if (seq != null)
+ {
+ Debug.WriteLine(seq);
+ if (seq[1] == '1')
+ {
+ //First controller is always wired to Client
+ clientWindow.HandleSequence(seq.Substring(2));
+ }
+ else
+ {
+ var visibleWindow = windows.FirstOrDefault(x => x.Focused != null);
+ visibleWindow?.HandleSequence(seq.Substring(2));
+ }
+ }
+ }
+ else
+ {
+ foreach (var processWindow in windows)
+ {
+ processWindow.HandleKey(e);
+ if (e.Handled)
+ {
+ break;
+ }
+ }
+ }
+}
+
+Application.Run(top);
+
+Application.Shutdown();
+return;
+
+static void SwitchWindow(IReadOnlyCollection windowsToHide, View windowToShow, View focusTarget)
+{
+ // Hide all other windows windows
+ foreach (var window in windowsToHide)
+ {
+ window.Visible = false;
+ }
+
+ windowToShow.Visible = true;
+ focusTarget.SetFocus();
+ windowToShow.SetNeedsDraw();
+}
+
+ProcessWindow CreateWindow(string title, string name, string menuItemText, bool singleInstance, int basePort, CancellationToken cancellationToken)
+{
+ var processWindow = new ProcessWindow(title, name, singleInstance, basePort, launcher, cancellationToken);
+ var windowsToHide = windows.Except([processWindow]).ToArray();
+
+ var menuBarItem = new MenuBarItemv2(menuItemText)
+ {
+ Id = name,
+ Title = menuItemText,
+ };
+ menuBarItem.Accepting += (_, eventArgs) =>
+ {
+ SwitchWindow(windowsToHide, processWindow, processWindow.LogView);
+ eventArgs.Handled = true;
+ };
+
+ menuBarItems.Add(menuBarItem);
+
+ return processWindow;
+}
diff --git a/src/MonitoringDemo/ProgressBarWidget.cs b/src/MonitoringDemo/ProgressBarWidget.cs
new file mode 100644
index 00000000..9286c179
--- /dev/null
+++ b/src/MonitoringDemo/ProgressBarWidget.cs
@@ -0,0 +1,13 @@
+namespace MonitoringDemo;
+
+public class ProgressBarWidget : IWidget
+{
+ public string ProcessInput(string line)
+ {
+ var progressPercent = int.Parse(line);
+ var barsFilled = progressPercent / 10;
+ var bars = new string('\u2588', barsFilled);
+ var spaces = new string(' ', 10 - barsFilled);
+ return $"[{bars}{spaces}] {progressPercent}%";
+ }
+}
\ No newline at end of file
diff --git a/src/Platform/Program.cs b/src/Platform/Program.cs
deleted file mode 100644
index b7b367f3..00000000
--- a/src/Platform/Program.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-using Particular;
-
-Console.Title = "Platform";
-try
-{
- await PlatformLauncher.Launch(showPlatformToolConsoleOutput: false, servicePulseDefaultRoute: "/monitoring");
-}
-catch (Exception e)
-{
- Console.WriteLine(e);
- Console.ReadLine();
-}
diff --git a/src/Platform/Platform.csproj b/src/PlatformLauncher/PlatformLauncher.csproj
similarity index 71%
rename from src/Platform/Platform.csproj
rename to src/PlatformLauncher/PlatformLauncher.csproj
index 49e8bb17..2a144997 100644
--- a/src/Platform/Platform.csproj
+++ b/src/PlatformLauncher/PlatformLauncher.csproj
@@ -1,17 +1,15 @@
-
-
-
- net8.0
- Exe
- enable
- enable
- ..\binaries\Platform\
-
-
-
-
-
-
-
-
-
+
+
+
+ net8.0
+ Exe
+ enable
+ enable
+ ..\..\binaries\PlatformLauncher\
+
+
+
+
+
+
+
diff --git a/src/PlatformLauncher/Program.cs b/src/PlatformLauncher/Program.cs
new file mode 100644
index 00000000..62bda151
--- /dev/null
+++ b/src/PlatformLauncher/Program.cs
@@ -0,0 +1,16 @@
+using Particular;
+using System.Reflection;
+
+Console.Title = "PlatformLauncher";
+
+var rootFolder = Path.Combine(Directory.GetParent(Assembly.GetExecutingAssembly().Location)!.Parent!.FullName);
+
+try
+{
+ await PlatformLauncher.Launch(showPlatformToolConsoleOutput: false, servicePulseDefaultRoute: "/monitoring", rootFolder);
+}
+catch (Exception e)
+{
+ Console.WriteLine(e);
+ Console.ReadLine();
+}
diff --git a/src/Sales/PlaceOrderHandler.cs b/src/Sales/PlaceOrderHandler.cs
index 5bad2144..dec04a74 100644
--- a/src/Sales/PlaceOrderHandler.cs
+++ b/src/Sales/PlaceOrderHandler.cs
@@ -1,19 +1,19 @@
using Messages;
+using Shared;
namespace Sales;
-public class PlaceOrderHandler(SimulationEffects simulationEffects) : IHandleMessages
+public class PlaceOrderHandler : IHandleMessages
{
public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
- // Simulate the time taken to process a message
- await simulationEffects.SimulateMessageProcessing(context.CancellationToken);
-
var orderPlaced = new OrderPlaced
{
OrderId = message.OrderId
};
- await context.Publish(orderPlaced);
+ var publishOptions = new PublishOptions();
+ publishOptions.SetMessageId(MessageIdHelper.GetHumanReadableMessageId());
+ await context.Publish(orderPlaced, publishOptions);
}
}
\ No newline at end of file
diff --git a/src/Sales/Program.cs b/src/Sales/Program.cs
index 5472c887..d487488d 100644
--- a/src/Sales/Program.cs
+++ b/src/Sales/Program.cs
@@ -1,105 +1,79 @@
-using System.Security.Cryptography;
-using System.Text;
+using System.Reflection;
using System.Text.Json;
using Messages;
-using Microsoft.Extensions.DependencyInjection;
-using Sales;
using Shared;
-Console.SetWindowSize(65, 15);
+var instancePostfix = args.FirstOrDefault();
+var title = string.IsNullOrEmpty(instancePostfix) ? "Processing (Sales)" : $"Sales - {instancePostfix}";
+var instanceName = string.IsNullOrEmpty(instancePostfix) ? "sales" : $"sales-{instancePostfix}";
+var prometheusPortString = args.Skip(1).FirstOrDefault();
-LoggingUtils.ConfigureLogging("Sales");
+var instanceId = DeterministicGuid.Create("Sales", instanceName);
-var instanceName = args.FirstOrDefault();
+var endpointControls = new ProcessingEndpointControls(() => PrepareEndpointConfiguration(instanceId, instanceName, prometheusPortString));
-if (string.IsNullOrEmpty(instanceName))
-{
- Console.Title = "Processing (Sales)";
+var ui = new UserInterface();
+endpointControls.BindSlowProcessingDial(ui, '2', 'w');
+endpointControls.BindDatabaseFailuresDial(ui, '3', 'e');
- instanceName = "original-instance";
-}
-else
+endpointControls.BindDatabaseDownToggle(ui, 'a');
+endpointControls.BindDelayedRetriesToggle(ui, 's');
+endpointControls.BindAutoThrottleToggle(ui, 'd');
+
+endpointControls.BindFailureReceivingButton(ui, 'z');
+endpointControls.BindFailureProcessingButton(ui, 'x');
+endpointControls.BindFailureDispatchingButton(ui, 'c');
+
+if (prometheusPortString != null)
{
- Console.Title = $"Sales - {instanceName}";
+ OpenTelemetryUtils.ConfigureOpenTelemetry("Sales", instanceId.ToString(), int.Parse(prometheusPortString));
}
-var instanceId = DeterministicGuid.Create("Sales", instanceName);
+endpointControls.Start();
-var endpointConfiguration = new EndpointConfiguration("Sales");
-endpointConfiguration.LimitMessageProcessingConcurrencyTo(4);
+ui.RunLoop(title);
-var serializer = endpointConfiguration.UseSerialization();
-serializer.Options(new JsonSerializerOptions
+await endpointControls.StopEndpoint();
+
+EndpointConfiguration PrepareEndpointConfiguration(Guid guid, string displayName, string? prometheusPortString1)
{
- TypeInfoResolverChain =
+ var endpointConfiguration1 = new EndpointConfiguration("Sales");
+ endpointConfiguration1.LimitMessageProcessingConcurrencyTo(4);
+
+ var serializer = endpointConfiguration1.UseSerialization();
+ serializer.Options(new JsonSerializerOptions
+ {
+ TypeInfoResolverChain =
{
MessagesSerializationContext.Default
}
-});
-
-endpointConfiguration.UseTransport();
-
-endpointConfiguration.AuditProcessedMessagesTo("audit");
-endpointConfiguration.SendHeartbeatTo("Particular.ServiceControl");
-
-endpointConfiguration.UniquelyIdentifyRunningInstance()
- .UsingCustomDisplayName(instanceName)
- .UsingCustomIdentifier(instanceId);
+ });
-var metrics = endpointConfiguration.EnableMetrics();
-metrics.SendMetricDataToServiceControl(
- "Particular.Monitoring",
- TimeSpan.FromMilliseconds(500)
-);
-
-var simulationEffects = new SimulationEffects();
-endpointConfiguration.RegisterComponents(cc => cc.AddSingleton(simulationEffects));
-
-var endpointInstance = await Endpoint.Start(endpointConfiguration);
+ var transport = new LearningTransport
+ {
+ StorageDirectory = Path.Combine(Directory.GetParent(Assembly.GetExecutingAssembly().Location)!.Parent!.FullName, ".learningtransport"),
+ TransportTransactionMode = TransportTransactionMode.ReceiveOnly
+ };
+ endpointConfiguration1.UseTransport(transport);
-RunUserInterfaceLoop(simulationEffects, instanceName);
+ endpointConfiguration1.AuditProcessedMessagesTo("audit");
+ endpointConfiguration1.SendHeartbeatTo("Particular.ServiceControl");
-await endpointInstance.Stop();
+ endpointConfiguration1.UniquelyIdentifyRunningInstance()
+ .UsingCustomIdentifier(guid)
+ .UsingCustomDisplayName(displayName);
-void RunUserInterfaceLoop(SimulationEffects state, string instanceName)
-{
- while (true)
- {
- Console.Clear();
- Console.WriteLine($"Sales Endpoint - {instanceName}");
- Console.WriteLine("Press F to process messages faster");
- Console.WriteLine("Press S to process messages slower");
+ var metrics = endpointConfiguration1.EnableMetrics();
- Console.WriteLine("Press ESC to quit");
- Console.WriteLine();
+ metrics.SendMetricDataToServiceControl(
+ "Particular.Monitoring",
+ TimeSpan.FromMilliseconds(500)
+ );
- state.WriteState(Console.Out);
+ endpointConfiguration1.UsePersistence();
+ endpointConfiguration1.EnableOutbox();
- var input = Console.ReadKey(true);
+ endpointConfiguration1.EnableOpenTelemetry();
- switch (input.Key)
- {
- case ConsoleKey.F:
- state.ProcessMessagesFaster();
- break;
- case ConsoleKey.S:
- state.ProcessMessagesSlower();
- break;
- case ConsoleKey.Escape:
- return;
- }
- }
-}
-
-static class DeterministicGuid
-{
- public static Guid Create(params object[] data)
- {
- // use MD5 hash to get a 16-byte hash of the string
- using var provider = MD5.Create();
- var inputBytes = Encoding.Default.GetBytes(string.Concat(data));
- var hashBytes = provider.ComputeHash(inputBytes);
- // generate a guid from the hash:
- return new Guid(hashBytes);
- }
+ return endpointConfiguration1;
}
\ No newline at end of file
diff --git a/src/Sales/Sales.csproj b/src/Sales/Sales.csproj
index 8d6caaa1..32277bba 100644
--- a/src/Sales/Sales.csproj
+++ b/src/Sales/Sales.csproj
@@ -5,8 +5,7 @@
Exe
enable
enable
- ..\binaries\Sales\
- processing-time-alternate.ico
+ ..\..\binaries\Sales\
@@ -15,11 +14,9 @@
-
-
-
-
-
+
+
+
\ No newline at end of file
diff --git a/src/Sales/SimulationEffects.cs b/src/Sales/SimulationEffects.cs
deleted file mode 100644
index 4dfea0cc..00000000
--- a/src/Sales/SimulationEffects.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-namespace Sales;
-
-public class SimulationEffects
-{
- public void WriteState(TextWriter output)
- {
- output.WriteLine("Base time to handle each order: {0} seconds", baseProcessingTime.TotalSeconds);
- }
-
- public Task SimulateMessageProcessing(CancellationToken cancellationToken = default)
- {
- return Task.Delay(baseProcessingTime, cancellationToken);
- }
-
- public void ProcessMessagesFaster()
- {
- if (baseProcessingTime > TimeSpan.Zero)
- {
- baseProcessingTime -= increment;
- }
- }
-
- public void ProcessMessagesSlower()
- {
- baseProcessingTime += increment;
- }
-
- TimeSpan baseProcessingTime = TimeSpan.FromMilliseconds(1300);
- TimeSpan increment = TimeSpan.FromMilliseconds(100);
-}
\ No newline at end of file
diff --git a/src/Sales/processing-time-alternate.ico b/src/Sales/processing-time-alternate.ico
deleted file mode 100644
index 776266f2..00000000
Binary files a/src/Sales/processing-time-alternate.ico and /dev/null differ
diff --git a/src/Shared/ButtonControl.cs b/src/Shared/ButtonControl.cs
new file mode 100644
index 00000000..3045119b
--- /dev/null
+++ b/src/Shared/ButtonControl.cs
@@ -0,0 +1,49 @@
+namespace Shared;
+
+class ButtonControl : IControl
+{
+ private readonly char inputId;
+ private readonly char buttonKey;
+ private readonly string helpMessage;
+ private readonly string? pressedMessage;
+ private readonly Action pressedAction;
+
+ public ButtonControl(char inputId, char buttonKey, string helpMessage, string? pressedMessage, Action pressedAction)
+ {
+ this.inputId = inputId;
+ this.buttonKey = buttonKey;
+ this.helpMessage = helpMessage;
+ this.pressedMessage = pressedMessage;
+ this.pressedAction = pressedAction;
+ }
+
+ public bool Match(string input)
+ {
+ if (input[0] == buttonKey)
+ {
+ pressedAction();
+ return true;
+ }
+
+ if (input[0] == '$' && input.Length >= 2 && input[1] == inputId)
+ {
+ pressedAction();
+ return true;
+ }
+
+ return false;
+ }
+
+ public void Help(TextWriter textWriter)
+ {
+ textWriter.WriteLine(helpMessage);
+ }
+
+ public void ReportState(TextWriter textWriter)
+ {
+ if (pressedMessage != null)
+ {
+ textWriter.WriteLine(pressedMessage);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/DatabaseDownSimulationBehavior.cs b/src/Shared/DatabaseDownSimulationBehavior.cs
new file mode 100644
index 00000000..8fbb1c81
--- /dev/null
+++ b/src/Shared/DatabaseDownSimulationBehavior.cs
@@ -0,0 +1,32 @@
+using NServiceBus.Pipeline;
+
+namespace Shared;
+
+public class DatabaseDownSimulationBehavior : Behavior
+{
+ private bool databaseDown;
+
+ public override Task Invoke(IInvokeHandlerContext context, Func next)
+ {
+ if (databaseDown)
+ {
+ throw new Exception("Simulated");
+ }
+ return next();
+ }
+
+ public string ReportState()
+ {
+ return databaseDown ? "Database down" : "Database up";
+ }
+
+ public void Down()
+ {
+ databaseDown = true;
+ }
+
+ public void Up()
+ {
+ databaseDown = false;
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/DatabaseFailureSimulationBehavior.cs b/src/Shared/DatabaseFailureSimulationBehavior.cs
new file mode 100644
index 00000000..cadbfc00
--- /dev/null
+++ b/src/Shared/DatabaseFailureSimulationBehavior.cs
@@ -0,0 +1,27 @@
+using NServiceBus.Pipeline;
+
+namespace Shared;
+
+public class DatabaseFailureSimulationBehavior : Behavior
+{
+ private int failureLevel = 0;
+
+ public override Task Invoke(IInvokeHandlerContext context, Func next)
+ {
+ if (Random.Shared.Next(10) < failureLevel)
+ {
+ throw new Exception("Simulated");
+ }
+ return next();
+ }
+
+ public void SetFailureLevel(int randomFailureLevel)
+ {
+ failureLevel = randomFailureLevel;
+ }
+
+ public string ReportState()
+ {
+ return $"Likelihood of random database failure: {failureLevel * 10}%";
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/DeterministicGuid.cs b/src/Shared/DeterministicGuid.cs
new file mode 100644
index 00000000..1be178d5
--- /dev/null
+++ b/src/Shared/DeterministicGuid.cs
@@ -0,0 +1,15 @@
+using System.Security.Cryptography;
+using System.Text;
+
+namespace Shared;
+
+public static class DeterministicGuid
+{
+ public static Guid Create(params object[] data)
+ {
+ var inputBytes = Encoding.Default.GetBytes(string.Concat(data));
+ var hashBytes = MD5.HashData(inputBytes);
+
+ return new Guid(hashBytes);
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/DialControl.cs b/src/Shared/DialControl.cs
new file mode 100644
index 00000000..d4523b0b
--- /dev/null
+++ b/src/Shared/DialControl.cs
@@ -0,0 +1,69 @@
+namespace Shared;
+
+class DialControl : IControl
+{
+ private int value;
+ private readonly char inputId;
+ private readonly char upKey;
+ private readonly char downKey;
+ private readonly string helpMessage;
+ private readonly Func getState;
+ private readonly Action setAction;
+
+ public DialControl(char inputId, char upKey, char downKey, string helpMessage, Func getState,
+ Action setAction)
+ {
+ this.inputId = inputId;
+ this.upKey = upKey;
+ this.downKey = downKey;
+ this.helpMessage = helpMessage;
+ this.getState = getState;
+ this.setAction = setAction;
+ }
+
+ public bool Match(string input)
+ {
+ if (input[0] == upKey)
+ {
+ //Increase
+ if (value < 9)
+ {
+ value++;
+ }
+
+ setAction(value);
+ return true;
+ }
+
+ if (input[0] == downKey)
+ {
+ //Decrease
+ if (value > 0)
+ {
+ value--;
+ }
+
+ setAction(value);
+ return true;
+ }
+
+ if (input[0] == '$' && input.Length >= 3 && input[1] == inputId)
+ {
+ value = int.Parse(input[2].ToString());
+ setAction(value);
+ return true;
+ }
+
+ return false;
+ }
+
+ public void Help(TextWriter textWriter)
+ {
+ textWriter.WriteLine(helpMessage);
+ }
+
+ public void ReportState(TextWriter textWriter)
+ {
+ textWriter.WriteLine(getState());
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/DispatchingProgressBehavior.cs b/src/Shared/DispatchingProgressBehavior.cs
new file mode 100644
index 00000000..e76ce37c
--- /dev/null
+++ b/src/Shared/DispatchingProgressBehavior.cs
@@ -0,0 +1,25 @@
+using NServiceBus.Pipeline;
+using NServiceBus.Transport;
+
+namespace Shared;
+
+public class DispatchingProgressBehavior : Behavior
+{
+ private FailureSimulator failureSimulator = new();
+
+ public override async Task Invoke(IBatchDispatchContext context, Func next)
+ {
+ await next().ConfigureAwait(false);
+
+ var incomingMessage = context.Extensions.Get();
+ if (incomingMessage.Headers.ContainsKey("MonitoringDemo.ManualMode"))
+ {
+ await failureSimulator.RunInteractive($"Dispatching outgoing messages {incomingMessage.MessageId}...", context.CancellationToken);
+ }
+ }
+
+ public void Failure()
+ {
+ failureSimulator.Trigger();
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/FailureSimulator.cs b/src/Shared/FailureSimulator.cs
new file mode 100644
index 00000000..a4d9deb4
--- /dev/null
+++ b/src/Shared/FailureSimulator.cs
@@ -0,0 +1,30 @@
+namespace Shared;
+
+public class FailureSimulator
+{
+ private bool failureTriggered = false;
+
+#pragma warning disable PS0003
+ public async Task RunInteractive(string taskDescription, CancellationToken cancellationToken)
+#pragma warning restore PS0003
+ {
+ using var progressBar = new ProgressBar(taskDescription);
+
+ for (var i = 0; i <= 100; i++)
+ {
+ if (failureTriggered)
+ {
+ failureTriggered = false;
+ throw new Exception("Simulated failure");
+ }
+ progressBar.Update(i);
+ await Task.Delay(25, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ public void Trigger()
+ {
+ //TODO: Use Interlocked
+ failureTriggered = true;
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/IControl.cs b/src/Shared/IControl.cs
new file mode 100644
index 00000000..5a50207d
--- /dev/null
+++ b/src/Shared/IControl.cs
@@ -0,0 +1,10 @@
+namespace Shared;
+
+public interface IControl
+{
+ bool Match(string input);
+
+ void Help(TextWriter textWriter);
+
+ void ReportState(TextWriter textWriter);
+}
\ No newline at end of file
diff --git a/src/Shared/LoggingUtils.cs b/src/Shared/LoggingUtils.cs
deleted file mode 100644
index 42be799f..00000000
--- a/src/Shared/LoggingUtils.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-
-using System.Reflection;
-using NServiceBus.Extensions.Logging;
-using NServiceBus.Logging;
-using Serilog;
-using Serilog.Extensions.Logging;
-
-namespace Shared;
-
-public static class LoggingUtils
-{
- public static void ConfigureLogging(string endpointName)
- {
- var logsFolder = GetLogLocation();
-
- if (logsFolder is null)
- {
- return;
- }
-
- var logPath = Path.Combine(logsFolder, $"{endpointName}.txt");
-
- Log.Logger = new LoggerConfiguration()
- .WriteTo.File(logPath)
- .CreateLogger();
-
- LogManager.UseFactory(new ExtensionsLoggerFactory(new SerilogLoggerFactory()));
- }
-
- static string? GetLogLocation()
- {
- var assemblyPath = new Uri(Assembly.GetExecutingAssembly().Location).LocalPath;
- var assemblyFolder = Path.GetDirectoryName(assemblyPath);
-
- if (string.IsNullOrEmpty(assemblyFolder))
- {
- return null;
- }
-
- var workingDir = new DirectoryInfo(assemblyFolder);
- var logLocation = FindLogFolder(workingDir);
- return (logLocation ?? workingDir).FullName;
- }
-
- static DirectoryInfo? FindLogFolder(DirectoryInfo? currentDir)
- {
- if (currentDir is null)
- {
- return null;
- }
-
- var logsFolders = currentDir.GetDirectories("logs", SearchOption.TopDirectoryOnly);
-
- return logsFolders.FirstOrDefault() ?? FindLogFolder(currentDir.Parent);
- }
-}
\ No newline at end of file
diff --git a/src/Shared/MessageIdHelper.cs b/src/Shared/MessageIdHelper.cs
new file mode 100644
index 00000000..84d11b94
--- /dev/null
+++ b/src/Shared/MessageIdHelper.cs
@@ -0,0 +1,12 @@
+namespace Shared;
+
+public static class MessageIdHelper
+{
+ private const string Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ public static string GetHumanReadableMessageId()
+ {
+ var messageId = new string([.. Enumerable.Range(0, 4).Select(x => Letters[Random.Shared.Next(Letters.Length)])]);
+ return messageId;
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/OpenTelemetryUtils.cs b/src/Shared/OpenTelemetryUtils.cs
new file mode 100644
index 00000000..b1bac821
--- /dev/null
+++ b/src/Shared/OpenTelemetryUtils.cs
@@ -0,0 +1,29 @@
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Resources;
+
+namespace Shared;
+
+public static class OpenTelemetryUtils
+{
+ public static IDisposable ConfigureOpenTelemetry(string name, string id, int port)
+ {
+ var attributes = new Dictionary
+ {
+ ["service.name"] = name,
+ ["service.instance.id"] = id,
+ };
+
+ var resourceBuilder = ResourceBuilder.CreateDefault().AddAttributes(attributes);
+
+ var meterProviderBuilder = Sdk.CreateMeterProviderBuilder()
+ .SetResourceBuilder(resourceBuilder)
+ .AddMeter("NServiceBus.Core*");
+
+ meterProviderBuilder.AddPrometheusHttpListener(options => options.UriPrefixes = [$"http://127.0.0.1:{port}"]);
+
+ var meterProvider = meterProviderBuilder.Build();
+
+ return meterProvider;
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/ProcessingEndpointControls.cs b/src/Shared/ProcessingEndpointControls.cs
new file mode 100644
index 00000000..b6cb3837
--- /dev/null
+++ b/src/Shared/ProcessingEndpointControls.cs
@@ -0,0 +1,176 @@
+namespace Shared;
+
+public class ProcessingEndpointControls(Func endpointConfigProvider)
+{
+ private IEndpointInstance? runningEndpoint;
+
+ private bool delayedRetries;
+ private bool autoThrottle;
+
+ private readonly RetrievingMessageProgressBehavior retrievingMessageProgressBehavior = new RetrievingMessageProgressBehavior();
+ private readonly ProcessingMessageProgressBehavior processingMessageProgressBehavior = new ProcessingMessageProgressBehavior();
+ private readonly DispatchingProgressBehavior dispatchingMessageProgressBehavior = new DispatchingProgressBehavior();
+ private readonly SlowProcessingSimulationBehavior slowProcessingSimulationBehavior = new SlowProcessingSimulationBehavior();
+ private readonly DatabaseFailureSimulationBehavior databaseFailureSimulationBehavior = new DatabaseFailureSimulationBehavior();
+ private readonly DatabaseDownSimulationBehavior databaseDownSimulationBehavior = new DatabaseDownSimulationBehavior();
+ private CancellationTokenSource? stopTokenSource;
+ private readonly SemaphoreSlim restartSemaphore = new SemaphoreSlim(1);
+ private Task? restartTask;
+
+ void Register(EndpointConfiguration endpointConfiguration)
+ {
+ endpointConfiguration.Pipeline.Register(retrievingMessageProgressBehavior, "Shows progress of retrieving messages");
+ endpointConfiguration.Pipeline.Register(processingMessageProgressBehavior, "Shows progress of processing messages");
+ endpointConfiguration.Pipeline.Register(dispatchingMessageProgressBehavior, "Shows progress of dispatching messages");
+ endpointConfiguration.Pipeline.Register(slowProcessingSimulationBehavior, "Simulates slow processing");
+ endpointConfiguration.Pipeline.Register(databaseFailureSimulationBehavior, "Simulates faulty database");
+ endpointConfiguration.Pipeline.Register(databaseDownSimulationBehavior, "Simulates down database");
+ endpointConfiguration.Pipeline.Register(new PropagateManualModeBehavior(), "Propagates manual mode settings");
+ }
+
+ public void Start()
+ {
+ stopTokenSource = new CancellationTokenSource();
+ restartTask = Task.Run(async () =>
+ {
+ var stopToken = stopTokenSource.Token;
+ while (!stopToken.IsCancellationRequested)
+ {
+ try
+ {
+ await restartSemaphore.WaitAsync(stopToken);
+ //await Task.Delay(5000);
+ await RestartEndpoint();
+ }
+#pragma warning disable PS0019
+ catch (Exception e)
+#pragma warning restore PS0019
+ {
+ Console.WriteLine(e);
+ }
+ }
+ });
+ }
+
+#pragma warning disable PS0018
+ async Task RestartEndpoint()
+#pragma warning restore PS0018
+ {
+ if (runningEndpoint != null)
+ {
+ await runningEndpoint.Stop();
+ }
+
+ var config = endpointConfigProvider();
+
+ if (!delayedRetries)
+ {
+ config.Recoverability().Delayed(settings => settings.NumberOfRetries(0));
+ }
+
+ if (autoThrottle)
+ {
+ var rateLimitSettings = new RateLimitSettings
+ {
+ };
+ config.Recoverability().OnConsecutiveFailures(5, rateLimitSettings);
+ }
+
+ Register(config);
+
+ runningEndpoint = await Endpoint.Start(config);
+ }
+
+#pragma warning disable PS0018
+ public async Task StopEndpoint()
+#pragma warning restore PS0018
+ {
+ stopTokenSource?.Cancel();
+ if (restartTask != null)
+ {
+ await restartTask;
+ }
+ if (runningEndpoint != null)
+ {
+ await runningEndpoint.Stop();
+ }
+ }
+
+ public void BindSlowProcessingDial(UserInterface userInterface, char upKey, char downKey)
+ {
+ userInterface.BindDial(
+ 'B', upKey, downKey,
+ $"Press {upKey} to increase processing delay.{Environment.NewLine}Press {downKey} to increase it.",
+ () => slowProcessingSimulationBehavior.ReportState(),
+ x => slowProcessingSimulationBehavior.SetProcessingDelay(x));
+ }
+
+ public void BindDatabaseFailuresDial(UserInterface userInterface, char upKey, char downKey)
+ {
+ userInterface.BindDial(
+ 'C', upKey, downKey, $"Press {upKey} to increase database failure rate.{Environment.NewLine}Press {downKey} to decrease it.",
+ () => databaseFailureSimulationBehavior.ReportState(),
+ x => databaseFailureSimulationBehavior.SetFailureLevel(x));
+ }
+
+ public void BindDatabaseDownToggle(UserInterface userInterface, char toggleKey)
+ {
+ userInterface.BindToggle('D', toggleKey, $"Press {toggleKey} to toggle database down simulation.",
+ () => databaseDownSimulationBehavior.ReportState(),
+ () => databaseDownSimulationBehavior.Down(),
+ () => databaseDownSimulationBehavior.Up());
+ }
+
+ public void BindDelayedRetriesToggle(UserInterface userInterface, char toggleKey)
+ {
+ userInterface.BindToggle('E', toggleKey, $"Press {toggleKey} to toggle delayed retries.",
+ () => delayedRetries ? "Delayed retries enabled" : "Delayed retries disabled",
+ () =>
+ {
+ delayedRetries = true;
+ restartSemaphore.Release();
+ },
+ () =>
+ {
+ delayedRetries = false;
+ restartSemaphore.Release();
+ });
+ }
+
+ public void BindAutoThrottleToggle(UserInterface userInterface, char toggleKey)
+ {
+ userInterface.BindToggle('F', toggleKey, $"Press {toggleKey} to toggle auto throttle.",
+ () => autoThrottle ? "Auto throttle enabled" : "Auto throttle disabled",
+ () =>
+ {
+ autoThrottle = true;
+ restartSemaphore.Release();
+ },
+ () =>
+ {
+ autoThrottle = false;
+ restartSemaphore.Release();
+ });
+ }
+
+ public void BindFailureReceivingButton(UserInterface userInterface, char key)
+ {
+ userInterface.BindButton('G', key, $"Press {key} to trigger failure while receiving a message",
+ null,
+ () => retrievingMessageProgressBehavior.Failure());
+ }
+
+ public void BindFailureProcessingButton(UserInterface userInterface, char key)
+ {
+ userInterface.BindButton('H', key, $"Press {key} to trigger failure while processing a message",
+ null,
+ () => processingMessageProgressBehavior.Failure());
+ }
+
+ public void BindFailureDispatchingButton(UserInterface userInterface, char key)
+ {
+ userInterface.BindButton('I', key, $"Press {key} to trigger failure while dispatching follow-up messages",
+ null,
+ () => dispatchingMessageProgressBehavior.Failure());
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/ProcessingMessageProgressBehavior.cs b/src/Shared/ProcessingMessageProgressBehavior.cs
new file mode 100644
index 00000000..853a327e
--- /dev/null
+++ b/src/Shared/ProcessingMessageProgressBehavior.cs
@@ -0,0 +1,23 @@
+using NServiceBus.Pipeline;
+
+namespace Shared;
+
+public class ProcessingMessageProgressBehavior : Behavior
+{
+ private FailureSimulator failureSimulator = new();
+
+ public override async Task Invoke(IIncomingLogicalMessageContext context, Func next)
+ {
+ if (context.Headers.ContainsKey("MonitoringDemo.ManualMode"))
+ {
+ await failureSimulator.RunInteractive($"Processing message {context.MessageId}...", context.CancellationToken);
+ }
+
+ await next().ConfigureAwait(false);
+ }
+
+ public void Failure()
+ {
+ failureSimulator.Trigger();
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/ProgressBar.cs b/src/Shared/ProgressBar.cs
new file mode 100644
index 00000000..602c8eb0
--- /dev/null
+++ b/src/Shared/ProgressBar.cs
@@ -0,0 +1,37 @@
+namespace Shared;
+
+public class ProgressBar : IDisposable
+{
+ private readonly string description;
+ private readonly string widgetId = Guid.NewGuid().ToString("N");
+
+ public ProgressBar(string description)
+ {
+ this.description = description;
+ if (Console.IsOutputRedirected)
+ {
+ Console.WriteLine(description);
+ Console.WriteLine($"!BeginWidget Progress {widgetId}");
+ }
+ }
+
+ public void Update(int percent)
+ {
+ if (Console.IsOutputRedirected)
+ {
+ Console.WriteLine($"!Widget {widgetId} {percent}");
+ }
+ else if (percent % 25 == 0)
+ {
+ Console.WriteLine($"{description}: {percent}%");
+ }
+ }
+
+ public void Dispose()
+ {
+ if (Console.IsOutputRedirected)
+ {
+ Console.WriteLine($"!EndWidget {widgetId}");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/PropagateManualModeBehavior.cs b/src/Shared/PropagateManualModeBehavior.cs
new file mode 100644
index 00000000..d9aac46a
--- /dev/null
+++ b/src/Shared/PropagateManualModeBehavior.cs
@@ -0,0 +1,16 @@
+using NServiceBus.Pipeline;
+
+namespace Shared;
+
+public class PropagateManualModeBehavior : Behavior
+{
+ public override Task Invoke(IOutgoingLogicalMessageContext context, Func next)
+ {
+ if (context.TryGetIncomingPhysicalMessage(out var incomingMessage)
+ && incomingMessage.Headers.ContainsKey("MonitoringDemo.ManualMode"))
+ {
+ context.Headers["MonitoringDemo.ManualMode"] = "True";
+ }
+ return next();
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/RetrievingMessageProgressBehavior.cs b/src/Shared/RetrievingMessageProgressBehavior.cs
new file mode 100644
index 00000000..360e0f63
--- /dev/null
+++ b/src/Shared/RetrievingMessageProgressBehavior.cs
@@ -0,0 +1,23 @@
+using NServiceBus.Pipeline;
+
+namespace Shared;
+
+public class RetrievingMessageProgressBehavior : Behavior
+{
+ private FailureSimulator failureSimulator = new();
+
+ public override async Task Invoke(ITransportReceiveContext context, Func next)
+ {
+ if (context.Message.Headers.ContainsKey("MonitoringDemo.ManualMode"))
+ {
+ await failureSimulator.RunInteractive($"Retrieving message {context.Message.MessageId}...", context.CancellationToken);
+ }
+
+ await next().ConfigureAwait(false);
+ }
+
+ public void Failure()
+ {
+ failureSimulator.Trigger();
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/Shared.csproj b/src/Shared/Shared.csproj
index da29cc5c..e7b6aaf4 100644
--- a/src/Shared/Shared.csproj
+++ b/src/Shared/Shared.csproj
@@ -7,11 +7,8 @@
-
-
-
-
-
+
+
diff --git a/src/Shared/SlowProcessingSimulationBehavior.cs b/src/Shared/SlowProcessingSimulationBehavior.cs
new file mode 100644
index 00000000..e0cd1088
--- /dev/null
+++ b/src/Shared/SlowProcessingSimulationBehavior.cs
@@ -0,0 +1,28 @@
+using NServiceBus.Pipeline;
+
+namespace Shared;
+
+public class SlowProcessingSimulationBehavior : Behavior
+{
+ readonly int baseProcessingTime = 1000;
+ readonly int increment = 100;
+ private int delayLevel = 0;
+
+ public override async Task Invoke(IInvokeHandlerContext context, Func next)
+ {
+ await Task.Delay(Delay, context.CancellationToken);
+ await next().ConfigureAwait(false);
+ }
+
+ private TimeSpan Delay => TimeSpan.FromMilliseconds(baseProcessingTime + delayLevel * increment);
+
+ public void SetProcessingDelay(int delayLevel)
+ {
+ this.delayLevel = delayLevel;
+ }
+
+ public string ReportState()
+ {
+ return $"Time to process each message: {Delay.TotalSeconds} seconds";
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/ToggleControl.cs b/src/Shared/ToggleControl.cs
new file mode 100644
index 00000000..836658b8
--- /dev/null
+++ b/src/Shared/ToggleControl.cs
@@ -0,0 +1,67 @@
+namespace Shared;
+
+class ToggleControl : IControl
+{
+ private readonly char inputId;
+ private readonly char toggleKey;
+ private readonly string helpMessage;
+ private readonly Func getState;
+ private readonly Action enableAction;
+ private readonly Action disableAction;
+ private bool enabled;
+
+ public ToggleControl(char inputId, char toggleKey, string helpMessage, Func getState, Action enableAction,
+ Action disableAction)
+ {
+ this.inputId = inputId;
+ this.toggleKey = toggleKey;
+ this.helpMessage = helpMessage;
+ this.getState = getState;
+ this.enableAction = enableAction;
+ this.disableAction = disableAction;
+ }
+
+ public bool Match(string input)
+ {
+ if (input[0] == toggleKey)
+ {
+ enabled = !enabled;
+ if (enabled)
+ {
+ enableAction();
+ }
+ else
+ {
+ disableAction();
+ }
+ return true;
+ }
+
+ if (input[0] == '$' && input.Length >= 3 && input[1] == inputId)
+ {
+ var value = int.Parse(input[2].ToString());
+ enabled = value == 1;
+ if (enabled)
+ {
+ enableAction();
+ }
+ else
+ {
+ disableAction();
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+ public void Help(TextWriter textWriter)
+ {
+ textWriter.WriteLine(helpMessage);
+ }
+
+ public void ReportState(TextWriter textWriter)
+ {
+ textWriter.WriteLine(getState());
+ }
+}
\ No newline at end of file
diff --git a/src/Shared/UserInterface.cs b/src/Shared/UserInterface.cs
new file mode 100644
index 00000000..86381714
--- /dev/null
+++ b/src/Shared/UserInterface.cs
@@ -0,0 +1,85 @@
+namespace Shared;
+
+public class UserInterface
+{
+ List controls = [];
+
+ public void BindDial(char inputId, char upKey, char downKey, string helpMessage, Func getState, Action action)
+ {
+ controls.Add(new DialControl(inputId, upKey, downKey, helpMessage, getState, action));
+ }
+
+ public void BindToggle(char inputId, char toggleKey, string helpMessage, Func getState, Action enableAction, Action disableAction)
+ {
+ controls.Add(new ToggleControl(inputId, toggleKey, helpMessage, getState, enableAction, disableAction));
+ }
+
+ public void BindButton(char inputId, char buttonKey, string helpMessage, string? pressedMessage, Action pressedAction)
+ {
+ controls.Add(new ButtonControl(inputId, buttonKey, helpMessage, pressedMessage, pressedAction));
+ }
+
+
+#pragma warning disable PS0018
+ public void RunLoop(string title)
+#pragma warning restore PS0018
+ {
+ if (!Console.IsInputRedirected)
+ {
+ Console.Title = title;
+ }
+
+ PrintControls();
+
+ while (true)
+ {
+ var input = ReadKeyOrLine();
+ if (string.IsNullOrWhiteSpace(input))
+ {
+ return;
+ }
+
+ if (input == "?")
+ {
+ foreach (var ctrl in controls)
+ {
+ ctrl.Help(Console.Out);
+ }
+ }
+
+ var matchedControl = controls.FirstOrDefault(x => x.Match(input));
+ if (matchedControl != null)
+ {
+ matchedControl.ReportState(Console.Out);
+ }
+ }
+ }
+
+ private static string? ReadKeyOrLine()
+ {
+ if (Console.IsInputRedirected)
+ {
+ return Console.ReadLine();
+ }
+
+ var key = Console.ReadKey(true);
+ return new string(key.KeyChar, 1);
+ }
+
+ private void PrintControls()
+ {
+ foreach (var ctrl in controls)
+ {
+ ctrl.Help(Console.Out);
+ }
+ Console.WriteLine("Page F2/F3 to scale out/in instances.");
+ if (!Console.IsInputRedirected)
+ {
+ Console.WriteLine("Press ? for help");
+ }
+ else
+ {
+ Console.WriteLine("Press F1 for help");
+ }
+ }
+}
diff --git a/src/Shipping/OrderBilledHandler.cs b/src/Shipping/OrderBilledHandler.cs
index 4e8f17d6..0a001419 100644
--- a/src/Shipping/OrderBilledHandler.cs
+++ b/src/Shipping/OrderBilledHandler.cs
@@ -2,10 +2,10 @@
namespace Shipping;
-public class OrderBilledHandler(SimulationEffects simulationEffects) : IHandleMessages
+public class OrderBilledHandler : IHandleMessages
{
public Task Handle(OrderBilled message, IMessageHandlerContext context)
{
- return simulationEffects.SimulateOrderBilledMessageProcessing(context.CancellationToken);
+ return Task.CompletedTask;
}
}
\ No newline at end of file
diff --git a/src/Shipping/OrderPlacedHandler.cs b/src/Shipping/OrderPlacedHandler.cs
index dc65d2f7..60f55861 100644
--- a/src/Shipping/OrderPlacedHandler.cs
+++ b/src/Shipping/OrderPlacedHandler.cs
@@ -2,10 +2,10 @@
namespace Shipping;
-public class OrderPlacedHandler(SimulationEffects simulationEffects) : IHandleMessages
+public class OrderPlacedHandler : IHandleMessages
{
public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
- return simulationEffects.SimulateOrderPlacedMessageProcessing(context.CancellationToken);
+ return Task.CompletedTask;
}
}
\ No newline at end of file
diff --git a/src/Shipping/Program.cs b/src/Shipping/Program.cs
index a8ac51ff..2041264c 100644
--- a/src/Shipping/Program.cs
+++ b/src/Shipping/Program.cs
@@ -1,79 +1,72 @@
-using System.Text.Json;
+using System.Reflection;
+using System.Text.Json;
using Messages;
-using Microsoft.Extensions.DependencyInjection;
using Shared;
-using Shipping;
-Console.Title = "Processing (Shipping)";
-Console.SetWindowSize(65, 15);
+var instancePostfix = args.FirstOrDefault();
-LoggingUtils.ConfigureLogging("Shipping");
+var title = string.IsNullOrEmpty(instancePostfix) ? "Processing (Shipping)" : $"Shipping - {instancePostfix}";
+var instanceName = string.IsNullOrEmpty(instancePostfix) ? "shipping" : $"shipping-{instancePostfix}";
+var instanceId = DeterministicGuid.Create("Shipping", instanceName);
+var prometheusPortString = args.Skip(1).FirstOrDefault();
-var endpointConfiguration = new EndpointConfiguration("Shipping");
-endpointConfiguration.LimitMessageProcessingConcurrencyTo(4);
+var endpointControls = new ProcessingEndpointControls(() => PrepareEndpointConfiguration(instanceId, instanceName, prometheusPortString));
-var serializer = endpointConfiguration.UseSerialization();
-serializer.Options(new JsonSerializerOptions
-{
- TypeInfoResolverChain =
- {
- MessagesSerializationContext.Default
- }
-});
+var ui = new UserInterface();
+endpointControls.BindSlowProcessingDial(ui, '8', 'i');
+endpointControls.BindDatabaseFailuresDial(ui, '9', 'o');
-endpointConfiguration.UseTransport();
+endpointControls.BindDatabaseDownToggle(ui, 'j');
+endpointControls.BindDelayedRetriesToggle(ui, 'k');
+endpointControls.BindAutoThrottleToggle(ui, 'l');
-endpointConfiguration.AuditProcessedMessagesTo("audit");
-endpointConfiguration.SendHeartbeatTo("Particular.ServiceControl");
+endpointControls.BindFailureReceivingButton(ui, 'm');
+endpointControls.BindFailureProcessingButton(ui, ',');
+endpointControls.BindFailureDispatchingButton(ui, '.');
-endpointConfiguration.UniquelyIdentifyRunningInstance()
- .UsingCustomIdentifier(new Guid("BB8A8BAF-4187-455E-AAD2-211CD43267CB"))
- .UsingCustomDisplayName("original-instance");
+if (prometheusPortString != null)
+{
+ OpenTelemetryUtils.ConfigureOpenTelemetry("Shipping", instanceId.ToString(), int.Parse(prometheusPortString));
+}
+endpointControls.Start();
+ui.RunLoop(title);
-var metrics = endpointConfiguration.EnableMetrics();
-metrics.SendMetricDataToServiceControl(
- "Particular.Monitoring",
- TimeSpan.FromMilliseconds(500)
-);
+await endpointControls.StopEndpoint();
-var simulationEffects = new SimulationEffects();
-endpointConfiguration.RegisterComponents(cc => cc.AddSingleton(simulationEffects));
+EndpointConfiguration PrepareEndpointConfiguration(Guid guid, string s, string? prometheusPortString1)
+{
+ var endpointConfiguration1 = new EndpointConfiguration("Shipping");
+ endpointConfiguration1.LimitMessageProcessingConcurrencyTo(4);
-var endpointInstance = await Endpoint.Start(endpointConfiguration);
+ var serializer = endpointConfiguration1.UseSerialization();
+ serializer.Options(new JsonSerializerOptions
+ {
+ TypeInfoResolverChain =
+ {
+ MessagesSerializationContext.Default
+ }
+ });
-RunUserInterfaceLoop(simulationEffects);
+ var transport = new LearningTransport
+ {
+ StorageDirectory = Path.Combine(Directory.GetParent(Assembly.GetExecutingAssembly().Location)!.Parent!.FullName, ".learningtransport")
+ };
+ endpointConfiguration1.UseTransport(transport);
-await endpointInstance.Stop();
+ endpointConfiguration1.AuditProcessedMessagesTo("audit");
+ endpointConfiguration1.SendHeartbeatTo("Particular.ServiceControl");
-void RunUserInterfaceLoop(SimulationEffects state)
-{
- while (true)
- {
- Console.Clear();
- Console.WriteLine("Shipping Endpoint");
- Console.WriteLine("Press D to toggle resource degradation simulation");
- Console.WriteLine("Press F to process OrderBilled events faster");
- Console.WriteLine("Press S to process OrderBilled events slower");
- Console.WriteLine("Press ESC to quit");
- Console.WriteLine();
+ endpointConfiguration1.UniquelyIdentifyRunningInstance()
+ .UsingCustomIdentifier(guid)
+ .UsingCustomDisplayName(s);
- state.WriteState(Console.Out);
+ var metrics = endpointConfiguration1.EnableMetrics();
+ metrics.SendMetricDataToServiceControl(
+ "Particular.Monitoring",
+ TimeSpan.FromMilliseconds(500)
+ );
- var input = Console.ReadKey(true);
+ endpointConfiguration1.EnableOpenTelemetry();
- switch (input.Key)
- {
- case ConsoleKey.D:
- state.ToggleDegradationSimulation();
- break;
- case ConsoleKey.F:
- state.ProcessMessagesFaster();
- break;
- case ConsoleKey.S:
- state.ProcessMessagesSlower();
- break;
- case ConsoleKey.Escape:
- return;
- }
- }
-}
+ return endpointConfiguration1;
+}
\ No newline at end of file
diff --git a/src/Shipping/Shipping.csproj b/src/Shipping/Shipping.csproj
index 39b65477..09689433 100644
--- a/src/Shipping/Shipping.csproj
+++ b/src/Shipping/Shipping.csproj
@@ -5,8 +5,7 @@
Exe
enable
enable
- ..\binaries\Shipping\
- processing-time.ico
+ ..\..\binaries\Shipping\
@@ -15,11 +14,8 @@
-
-
-
-
-
+
+
\ No newline at end of file
diff --git a/src/Shipping/SimulationEffects.cs b/src/Shipping/SimulationEffects.cs
deleted file mode 100644
index 5ecc7f25..00000000
--- a/src/Shipping/SimulationEffects.cs
+++ /dev/null
@@ -1,58 +0,0 @@
-namespace Shipping;
-
-public class SimulationEffects
-{
- public void WriteState(TextWriter output)
- {
- output.WriteLine("Base time to handle each OrderBilled event: {0} seconds", baseProcessingTime.TotalSeconds);
-
- output.Write("Simulated degrading resource: ");
- output.WriteLine(degradingResourceSimulationStarted.HasValue ? "ON" : "OFF");
- }
-
- public Task SimulateOrderBilledMessageProcessing(CancellationToken cancellationToken = default)
- {
- return Task.Delay(baseProcessingTime, cancellationToken);
- }
-
- public void ProcessMessagesFaster()
- {
- if (baseProcessingTime > TimeSpan.Zero)
- {
- baseProcessingTime -= increment;
- }
- }
-
- public void ProcessMessagesSlower()
- {
- baseProcessingTime += increment;
- }
-
- public Task SimulateOrderPlacedMessageProcessing(CancellationToken cancellationToken = default)
- {
- var delay = TimeSpan.FromMilliseconds(200) + Degradation();
- return Task.Delay(delay, cancellationToken);
- }
-
- public void ToggleDegradationSimulation()
- {
- degradingResourceSimulationStarted = degradingResourceSimulationStarted.HasValue ? default(DateTime?) : DateTime.UtcNow;
- }
-
- TimeSpan Degradation()
- {
- var timeSinceDegradationStarted = DateTime.UtcNow - (degradingResourceSimulationStarted ?? DateTime.MaxValue);
- if (timeSinceDegradationStarted < TimeSpan.Zero)
- {
- return TimeSpan.Zero;
- }
-
- return new TimeSpan(timeSinceDegradationStarted.Ticks / degradationRate);
- }
-
- TimeSpan baseProcessingTime = TimeSpan.FromMilliseconds(700);
- TimeSpan increment = TimeSpan.FromMilliseconds(100);
-
- DateTime? degradingResourceSimulationStarted;
- const int degradationRate = 5;
-}
\ No newline at end of file
diff --git a/src/Shipping/processing-time.ico b/src/Shipping/processing-time.ico
deleted file mode 100644
index afd6abae..00000000
Binary files a/src/Shipping/processing-time.ico and /dev/null differ