Skip to content

Commit c8ab867

Browse files
committed
Add connected and disconnected event method
1 parent 86d24d8 commit c8ab867

File tree

9 files changed

+154
-13
lines changed

9 files changed

+154
-13
lines changed

src/LibStored.Net/Protocol/ArqLayer.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public enum ArqEvent
2929
/// </summary>
3030
Retransmit,
3131
/// <summary>
32-
/// A connection has been establised.
32+
/// A connection has been established.
3333
/// </summary>
3434
Connected,
3535
}
@@ -95,6 +95,11 @@ public ArqLayer(int maxEncodeBufferSize = 0)
9595
/// </summary>
9696
public event EventHandler<ArqEventArgs>? EventOccurred;
9797

98+
/// <summary>
99+
/// Gets whether this ARQ layer has an established connection.
100+
/// </summary>
101+
public bool IsConnected => _connected;
102+
98103
/// <inheritdoc />
99104
public override void Reset()
100105
{
@@ -112,7 +117,7 @@ public override void Reset()
112117
_recvSeq = 0;
113118
_buffer.Clear();
114119

115-
// base.Disconnected();
120+
base.Disconnected();
116121
base.Reset();
117122
KeepAlive();
118123
}
@@ -202,7 +207,7 @@ public override void Decode(Span<byte> buffer)
202207
// This is an ack to our reset message.
203208
_connected = true;
204209
_recvSeq = NextSeq(0);
205-
// Libstored uses a method, here an event is used.
210+
base.Connected();
206211
Event(ArqEvent.Connected);
207212
}
208213
}
@@ -231,7 +236,7 @@ public override void Decode(Span<byte> buffer)
231236
Event(ArqEvent.Reconnect);
232237
}
233238

234-
// base.Disconnected()
239+
base.Disconnected();
235240
}
236241
}
237242
else if(headerSeq == _recvSeq)
@@ -331,6 +336,18 @@ public void KeepAlive()
331336
/// <returns>True when a message was sent.</returns>
332337
public bool Process() => Transmit();
333338

339+
/// <summary>
340+
/// Don't propagate the connected event.
341+
/// A reconnection is handled by this layer itself, via re-transmits or resets.
342+
/// </summary>
343+
public override void Connected() {}
344+
345+
/// <summary>
346+
/// Don't propagate the disconnected event.
347+
/// A reconnection is handled by this layer itself, via re-transmits or resets.
348+
/// </summary>
349+
public override void Disconnected() {}
350+
334351
/// <summary>
335352
/// Transmit the first message in the encode queue.
336353
/// </summary>

src/LibStored.Net/Protocol/Crc16Layer.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ public override void Reset()
112112
base.Reset();
113113
}
114114

115+
/// <inheritdoc />
116+
public override void Disconnected()
117+
{
118+
_crc = _init;
119+
base.Disconnected();
120+
}
121+
115122
/// <inheritdoc />
116123
public override int Mtu() => base.Mtu() switch
117124
{

src/LibStored.Net/Protocol/Crc32Layer.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,13 @@ public override void Reset()
128128
base.Reset();
129129
}
130130

131+
/// <inheritdoc />
132+
public override void Disconnected()
133+
{
134+
_crc = _init;
135+
base.Disconnected();
136+
}
137+
131138
/// <inheritdoc />
132139
public override int Mtu() => base.Mtu() switch
133140
{

src/LibStored.Net/Protocol/Crc8Layer.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ public override void Reset()
103103
base.Reset();
104104
}
105105

106+
/// <inheritdoc />
107+
public override void Disconnected()
108+
{
109+
_crc = _init;
110+
base.Disconnected();
111+
}
112+
106113
/// <inheritdoc />
107114
public override int Mtu() => base.Mtu() switch
108115
{

src/LibStored.Net/Protocol/Protocol.cs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// SPDX-FileCopyrightText: 2025 Guus Kuiper
2-
//
2+
//
33
// SPDX-License-Identifier: MIT
44

55
namespace LibStored.Net.Protocol;
@@ -69,15 +69,52 @@ public ProtocolLayer() { }
6969
/// <param name="up">The upper protocol layer to wrap.</param>
7070
public void Wrap(ProtocolLayer up)
7171
{
72-
if (up._down is not null)
72+
// Disconnect our old upper layer.
73+
ProtocolLayer? oldUp = _up;
74+
if (oldUp is not null)
7375
{
74-
throw new InvalidOperationException("Cannot wrap a layer that already has a down layer.");
76+
oldUp._down = null;
77+
_up = null;
7578
}
7679

80+
// Inject ourselves below the given layer.
81+
ProtocolLayer? currentBottom = Bottom();
82+
ProtocolLayer? injectAbove = up._down;
83+
84+
if (injectAbove is not null)
85+
{
86+
currentBottom._down = injectAbove;
87+
injectAbove._up = currentBottom;
88+
currentBottom = injectAbove.Bottom();
89+
}
90+
91+
// Set out new upper layer
7792
up._down = this;
7893
_up = up;
94+
95+
// Invoke all notifications.
96+
oldUp?.Disconnected();
97+
98+
if (injectAbove is not null)
99+
{
100+
currentBottom.Connected();
101+
}
102+
else
103+
{
104+
up.Connected();
105+
}
79106
}
80107

108+
/// <summary>
109+
/// (Re)connected notification (bottom-up).
110+
/// </summary>
111+
public virtual void Connected() => _up?.Connected();
112+
113+
/// <summary>
114+
/// Disconnected notification (bottom-up).
115+
/// </summary>
116+
public virtual void Disconnected() => _up?.Disconnected();
117+
81118
private ProtocolLayer Bottom()
82119
{
83120
ProtocolLayer p = this;

src/LibStored.Net/Protocol/SegmentationLayer.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ public override void Reset()
113113
/// <inheritdoc />
114114
public override int Mtu() => 0; // No MTU limit for segmentation layer
115115

116+
/// <inheritdoc />
117+
public override void Connected()
118+
{
119+
_encoded = 0;
120+
base.Connected();
121+
}
122+
123+
/// <inheritdoc />
124+
public override void Disconnected()
125+
{
126+
_decodeBuffer.Clear();
127+
base.Disconnected();
128+
}
129+
116130
private int LowerMtu()
117131
{
118132
int lowerMtu = base.Mtu();

src/LibStored.Net/Protocol/TerminalLayer.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,14 @@ public override void Reset()
133133
base.Reset();
134134
}
135135

136+
/// <inheritdoc />
137+
public override void Disconnected()
138+
{
139+
_data.Clear();
140+
_decodingMessage = false;
141+
base.Disconnected();
142+
}
143+
136144
/// <inheritdoc />
137145
public override int Mtu() => base.Mtu() switch
138146
{

src/LibStored.Net/Synchronization/SyncConnection.cs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// SPDX-FileCopyrightText: 2025 Guus Kuiper
2-
//
2+
//
33
// SPDX-License-Identifier: MIT
44

55
using System.Buffers;
@@ -18,14 +18,22 @@ public class StoreInfo
1818
/// Gets or sets the last known sequence number for the store.
1919
/// </summary>
2020
public Seq Seq { get; set; }
21+
2122
/// <summary>
2223
/// Gets or sets the outgoing ID for the store in the connection.
24+
/// The value 0 indicates not connected.
2325
/// </summary>
2426
public Id IdOut { get; set; }
27+
2528
/// <summary>
2629
/// Gets or sets a value indicating whether this store is a source in the connection.
2730
/// </summary>
2831
public bool Source { get; set; }
32+
33+
/// <summary>
34+
///
35+
/// </summary>
36+
public bool IsConnected => IdOut != 0;
2937
}
3038

3139
/// <summary>
@@ -133,7 +141,7 @@ public Seq Process(StoreJournal store)
133141
return 0;
134142
}
135143

136-
if (!store.HasChanged(info.Seq))
144+
if (!info.IsConnected || !store.HasChanged(info.Seq))
137145
{
138146
// No recent changes
139147
return 0;
@@ -209,6 +217,7 @@ public override void Decode(Span<byte> buffer)
209217
};
210218
_stores[journal] = info;
211219

220+
Debug.Assert(info.IsConnected);
212221
id = NextId();
213222
_idIn[id] = journal;
214223
EncodeId(id);
@@ -246,6 +255,7 @@ public override void Decode(Span<byte> buffer)
246255
info.Seq = seq;
247256
info.IdOut = welcomeId;
248257
Debug.Assert(info.Source);
258+
Debug.Assert(info.IsConnected);
249259

250260
break;
251261
}
@@ -300,7 +310,7 @@ public override void Decode(Span<byte> buffer)
300310
}
301311

302312
StoreInfo info = _stores[value];
303-
if (info.Source && info.IdOut == 0)
313+
if (info.Source && info.IsConnected)
304314
{
305315
HelloAgain(value);
306316
}
@@ -323,7 +333,7 @@ public override void Decode(Span<byte> buffer)
323333
break;
324334
}
325335

326-
if (info.Source && info.IdOut == 0)
336+
if (info.Source && info.IsConnected)
327337
{
328338
HelloAgain(journal);
329339
}
@@ -349,6 +359,13 @@ public override void Decode(Span<byte> buffer)
349359
/// <returns>True if the store is being synchronized; otherwise, false.</returns>
350360
public bool IsSynchronizing(StoreJournal journal) => _stores.ContainsKey(journal);
351361

362+
/// <summary>
363+
/// Returns if the given store is currently connected over this connection.
364+
/// </summary>
365+
/// <param name="journal"></param>
366+
/// <returns></returns>
367+
public bool IsConnected(StoreJournal journal) => _stores.TryGetValue(journal, out StoreInfo? info) && info.IsConnected;
368+
352369
/// <summary>
353370
/// Resets the connection, drops all non-source stores, and sends Hello messages for sources.
354371
/// </summary>
@@ -361,6 +378,20 @@ public override void Reset()
361378
HelloAgain();
362379
}
363380

381+
/// <inheritdoc />
382+
public override void Connected()
383+
{
384+
HelloAgain();
385+
base.Connected();
386+
}
387+
388+
/// <inheritdoc />
389+
public override void Disconnected()
390+
{
391+
DropNonSources();
392+
base.Disconnected();
393+
}
394+
364395
/// <summary>
365396
/// Encode a Bye and drop all stores.
366397
/// </summary>
@@ -372,7 +403,7 @@ protected void SendBye()
372403
}
373404

374405
/// <summary>
375-
///
406+
///
376407
/// </summary>
377408
/// <param name="hash"></param>
378409
protected void SendBye(string hash)
@@ -572,6 +603,7 @@ private void HelloAgain(StoreJournal store)
572603
Id id = _idIn.FirstOrDefault(x => x.Value == store).Key;
573604

574605
Debug.Assert(id > 0);
606+
Debug.Assert(!info.IsConnected);
575607

576608
EncodeCmd(SyncConnection.Hello);
577609
store.EncodeHash(this, false);

src/LibStored.Net/Synchronization/Synchronizer.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// SPDX-FileCopyrightText: 2025 Guus Kuiper
2-
//
2+
//
33
// SPDX-License-Identifier: MIT
44

55
namespace LibStored.Net.Synchronization;
@@ -149,6 +149,18 @@ public bool IsSynchronizing(StoreJournal journal, SyncConnection notOverConnecti
149149
return false;
150150
}
151151

152+
/// <summary>
153+
/// Returns if the given store is currently connected over the given connection.
154+
/// </summary>
155+
/// <param name="journal"></param>
156+
/// <param name="connection"></param>
157+
/// <returns></returns>
158+
public bool IsConnected(StoreJournal journal, SyncConnection connection)
159+
{
160+
SyncConnection? c = _connections.Contains(connection) ? connection : null;
161+
return c is not null && c.IsConnected(journal);
162+
}
163+
152164
internal StoreJournal? ToJournal(string hash)
153165
{
154166
if (string.IsNullOrEmpty(hash))

0 commit comments

Comments
 (0)