11using System . Net ;
2+ using System . Net . Http . Headers ;
23using System . Reflection ;
34using System . Security . Cryptography . X509Certificates ;
5+ using System . Text ;
6+ using DotNext ;
7+ using DotNext . Net ;
8+ using DotNext . Net . Cluster . Discovery . HyParView ;
49using DotNext . Net . Cluster . Discovery . HyParView . Http ;
5- using Microsoft . Extensions . Logging . Console ;
6- using SslOptions = DotNext . Net . Security . SslOptions ;
10+ using DotNext . Net . Cluster . Messaging . Gossip ;
11+ using DotNext . Net . Http ;
12+ using HyParViewPeer ;
13+ using Microsoft . AspNetCore . Connections ;
14+ using Microsoft . Extensions . Options ;
715
816int port ;
917int ? contactNodePort = null ;
3745if ( contactNodePort . HasValue )
3846 configuration . Add ( "contactNode" , $ "https://localhost:{ contactNodePort . GetValueOrDefault ( ) } ") ;
3947
40- await new HostBuilder ( ) . ConfigureWebHost ( webHost =>
48+ var builder = WebApplication . CreateSlimBuilder ( ) ;
49+ builder . Configuration . AddInMemoryCollection ( configuration ) ;
50+
51+ // web server
52+ builder . WebHost . ConfigureKestrel ( options =>
4153{
42- webHost . UseKestrel ( options =>
43- {
44- options . ListenLocalhost ( port , static listener => listener . UseHttps ( LoadCertificate ( ) ) ) ;
45- } )
46- . UseStartup < HyParViewPeer . Startup > ( ) ;
47- } )
48- . ConfigureLogging ( static builder => builder . AddConsole ( ) . SetMinimumLevel ( LogLevel . Error ) )
49- . ConfigureAppConfiguration ( builder => builder . AddInMemoryCollection ( configuration ) )
50- . JoinMesh ( )
51- . Build ( )
52- . RunAsync ( ) ;
54+ options . ListenLocalhost ( port , static listener => listener . UseHttps ( LoadCertificate ( ) ) ) ;
55+ } ) ;
56+
57+ // services
58+ builder . Services
59+ . AddSingleton < RumorSpreadingManager > ( static sp => new RumorSpreadingManager ( EndPointFormatter . UriEndPointComparer ) )
60+ . AddSingleton < IPeerLifetime , HyParViewPeerLifetime > ( )
61+ . AddSingleton < IHttpMessageHandlerFactory , HyParViewClientHandlerFactory > ( ) ;
62+
63+ // misc
64+ builder . Logging . AddConsole ( ) . SetMinimumLevel ( LogLevel . Debug ) ;
65+ builder . JoinMesh ( ) ;
66+
67+ await using var app = builder . Build ( ) ;
68+
69+ // endpoints
70+ app . UseHyParViewProtocolHandler ( ) . UseRouting ( ) . UseEndpoints ( static endpoints =>
71+ {
72+ endpoints . MapGet ( RumorSender . RumorResource , SendRumourAsync ) ;
73+ endpoints . MapGet ( RumorSender . NeighborsResource , PrintNeighborsAsync ) ;
74+ endpoints . MapPost ( RumorSender . BroadcastResource , BroadcastAsync ) ;
75+ } ) ;
76+
77+ await app . RunAsync ( ) ;
5378
5479static X509Certificate2 LoadCertificate ( )
5580{
@@ -58,4 +83,101 @@ static X509Certificate2 LoadCertificate()
5883 rawCertificate ? . CopyTo ( ms ) ;
5984 ms . Seek ( 0 , SeekOrigin . Begin ) ;
6085 return new X509Certificate2 ( ms . ToArray ( ) , "1234" ) ;
86+ }
87+
88+ static ( Uri , RumorTimestamp ) PrepareMessageId ( IServiceProvider sp )
89+ {
90+ var config = sp . GetRequiredService < IOptions < HttpPeerConfiguration > > ( ) . Value ;
91+ var manager = sp . GetRequiredService < RumorSpreadingManager > ( ) ;
92+ return ( config . LocalNode ! , manager . Tick ( ) ) ;
93+ }
94+
95+ static Task BroadcastAsync ( HttpContext context )
96+ {
97+ var senderAddress = RumorSender . ParseSenderAddress ( context . Request ) ;
98+ var senderId = RumorSender . ParseRumorId ( context . Request ) ;
99+
100+ var spreadingManager = context . RequestServices . GetRequiredService < RumorSpreadingManager > ( ) ;
101+ if ( ! spreadingManager . CheckOrder ( new UriEndPoint ( senderAddress ) , senderId ) )
102+ return Task . CompletedTask ;
103+
104+ Console . WriteLine ( $ "Spreading rumor from { senderAddress } with sequence number = { senderId } ") ;
105+
106+ return context . RequestServices
107+ . GetRequiredService < PeerController > ( )
108+ . EnqueueBroadcastAsync ( controller => new RumorSender ( ( IPeerMesh < HttpPeerClient > ) controller , senderAddress , senderId ) )
109+ . AsTask ( ) ;
110+ }
111+
112+ static Task SendRumourAsync ( HttpContext context )
113+ {
114+ var ( sender , id ) = PrepareMessageId ( context . RequestServices ) ;
115+
116+ return context . RequestServices
117+ . GetRequiredService < PeerController > ( )
118+ . EnqueueBroadcastAsync ( controller => new RumorSender ( ( IPeerMesh < HttpPeerClient > ) controller , sender , id ) )
119+ . AsTask ( ) ;
120+ }
121+
122+ static Task PrintNeighborsAsync ( HttpContext context )
123+ {
124+ var mesh = context . RequestServices . GetRequiredService < IPeerMesh < HttpPeerClient > > ( ) ;
125+ var sb = new StringBuilder ( ) ;
126+
127+ foreach ( var peer in mesh . Peers )
128+ sb . AppendLine ( peer . ToString ( ) ) ;
129+
130+ return context . Response . WriteAsync ( sb . ToString ( ) , context . RequestAborted ) ;
131+ }
132+
133+ file sealed class RumorSender : Disposable , IRumorSender
134+ {
135+ internal const string SenderAddressHeader = "X-Sender-Address" ;
136+ internal const string SenderIdHeader = "X-Rumor-ID" ;
137+
138+ internal const string RumorResource = "/rumor" ;
139+ internal const string BroadcastResource = "/broadcast" ;
140+ internal const string NeighborsResource = "/neighbors" ;
141+
142+ private readonly IPeerMesh < HttpPeerClient > mesh ;
143+ private readonly Uri senderAddress ;
144+ private readonly RumorTimestamp senderId ;
145+
146+ internal RumorSender ( IPeerMesh < HttpPeerClient > mesh , Uri sender , RumorTimestamp id )
147+ {
148+ this . mesh = mesh ;
149+ senderAddress = sender ;
150+ senderId = id ;
151+ }
152+
153+ private async Task SendAsync ( HttpPeerClient client , CancellationToken token )
154+ {
155+ using var request = new HttpRequestMessage ( HttpMethod . Post , BroadcastResource ) ;
156+ AddSenderAddress ( request . Headers , senderAddress ) ;
157+ AddRumorId ( request . Headers , senderId ) ;
158+ using var response = await client . SendAsync ( request , token ) ;
159+ response . EnsureSuccessStatusCode ( ) ;
160+ }
161+
162+ Task IRumorSender . SendAsync ( EndPoint peer , CancellationToken token )
163+ {
164+ var client = mesh . TryGetPeer ( peer ) ;
165+ return client is not null && ! EndPointFormatter . UriEndPointComparer . Equals ( new UriEndPoint ( senderAddress ) , peer )
166+ ? SendAsync ( client , token )
167+ : Task . CompletedTask ;
168+ }
169+
170+ public new ValueTask DisposeAsync ( ) => base . DisposeAsync ( ) ;
171+
172+ private static void AddSenderAddress ( HttpRequestHeaders headers , Uri address )
173+ => headers . Add ( SenderAddressHeader , address . ToString ( ) ) ;
174+
175+ internal static Uri ParseSenderAddress ( HttpRequest request )
176+ => new ( request . Headers [ SenderAddressHeader ] ! , UriKind . Absolute ) ;
177+
178+ private static void AddRumorId ( HttpRequestHeaders headers , in RumorTimestamp id )
179+ => headers . Add ( SenderIdHeader , id . ToString ( ) ) ;
180+
181+ internal static RumorTimestamp ParseRumorId ( HttpRequest request )
182+ => RumorTimestamp . TryParse ( request . Headers [ SenderIdHeader ] , out var result ) ? result : throw new FormatException ( "Invalid rumor ID" ) ;
61183}
0 commit comments