Skip to content

Commit 6f77812

Browse files
authored
feat: emotes (#7)
1 parent 0f99c2b commit 6f77812

15 files changed

+1912
-68
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Decentraland.Pulse;
2+
using Pulse.Peers;
3+
using Pulse.Peers.Simulation;
4+
5+
namespace Pulse.Messaging;
6+
7+
public class EmoteStartHandler(EmoteBoard emoteBoard, ITimeProvider timeProvider, ILogger<EmoteStartHandler> logger)
8+
: RuntimePacketHandlerBase<EmoteStartHandler>(logger), IMessageHandler
9+
{
10+
public void Handle(Dictionary<PeerIndex, PeerState> peers, PeerIndex from, ClientMessage message)
11+
{
12+
if (SkipFromUnauthorizedPeer(peers, from, message, out _))
13+
return;
14+
15+
EmoteStart emoteStart = message.EmoteStart;
16+
17+
uint? durationMs = emoteStart.HasDurationMs ? emoteStart.DurationMs : null;
18+
19+
emoteBoard.Start(from, emoteStart.EmoteId, timeProvider.MonotonicTime, durationMs);
20+
21+
logger.LogDebug("Peer {Peer} started emote {EmoteId}", from.Value, emoteStart.EmoteId);
22+
}
23+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using Decentraland.Pulse;
2+
using Pulse.Peers;
3+
using Pulse.Peers.Simulation;
4+
5+
namespace Pulse.Messaging;
6+
7+
public class EmoteStopHandler(EmoteBoard emoteBoard, ITimeProvider timeProvider, ILogger<EmoteStopHandler> logger)
8+
: RuntimePacketHandlerBase<EmoteStopHandler>(logger), IMessageHandler
9+
{
10+
public void Handle(Dictionary<PeerIndex, PeerState> peers, PeerIndex from, ClientMessage message)
11+
{
12+
if (SkipFromUnauthorizedPeer(peers, from, message, out _))
13+
return;
14+
15+
if (!emoteBoard.IsEmoting(from))
16+
{
17+
logger.LogWarning("Peer {Peer} sent EmoteStop but no emote is active", from.Value);
18+
return;
19+
}
20+
21+
logger.LogDebug("Peer {Peer} stopped emote", from.Value);
22+
23+
emoteBoard.Stop(from, timeProvider.MonotonicTime);
24+
}
25+
}

src/DCLPulse/Peers/PeerToPeerView.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,10 @@ public struct PeerToPeerView
3434
/// Compared against <see cref="Simulation.ProfileBoard" /> each tick to detect changes.
3535
/// </summary>
3636
public int LastSentProfileVersion;
37+
38+
/// <summary>
39+
/// The emote ID last sent to the observer for this subject, or null if idle.
40+
/// Compared against <see cref="Simulation.EmoteBoard" /> each tick to detect transitions.
41+
/// </summary>
42+
public string? LastSentEmoteId;
3743
}

src/DCLPulse/Peers/PeersManager.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public sealed class PeersManager : BackgroundService
4646
private readonly Dictionary<ClientMessage.MessageOneofCase, IMessageHandler> messageHandlers;
4747
private readonly ITransport transport;
4848
private readonly ProfileBoard profileBoard;
49+
private readonly EmoteBoard emoteBoard;
4950

5051
public PeersManager(
5152
MessagePipe messagePipe,
@@ -60,7 +61,8 @@ public PeersManager(
6061
ITimeProvider timeProvider,
6162
Dictionary<ClientMessage.MessageOneofCase, IMessageHandler> messageHandlers,
6263
ITransport transport,
63-
ProfileBoard profileBoard)
64+
ProfileBoard profileBoard,
65+
EmoteBoard emoteBoard)
6466
{
6567
this.messagePipe = messagePipe;
6668
this.logger = logger;
@@ -69,6 +71,7 @@ public PeersManager(
6971
this.messageHandlers = messageHandlers;
7072
this.transport = transport;
7173
this.profileBoard = profileBoard;
74+
this.emoteBoard = emoteBoard;
7275
this.peerStateFactory = peerStateFactory;
7376
this.areaOfInterest = areaOfInterest;
7477
this.snapshotBoard = snapshotBoard;
@@ -103,7 +106,7 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
103106
{
104107
var simulation = new PeerSimulation(
105108
areaOfInterest, snapshotBoard, spatialGrid, identityBoard,
106-
messagePipe, peerOptions.SimulationSteps, timeProvider, transport, profileBoard, peerSimulationLogger);
109+
messagePipe, peerOptions.SimulationSteps, timeProvider, transport, profileBoard, emoteBoard, peerSimulationLogger);
107110

108111
tasks[i + 1] = WorkerAsync(i, messageChannels[i].Reader,
109112
peerLifeCycleChannels[i].Reader, simulation, stoppingToken);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using Decentraland.Pulse;
2+
3+
namespace Pulse.Peers.Simulation;
4+
5+
/// <summary>
6+
/// Shared emote state store indexed by <see cref="PeerIndex" />.
7+
/// Tracks which peer is currently emoting.
8+
/// <para />
9+
/// Writer: the handler on whichever worker owns the peer.
10+
/// Readers: simulation step on any worker, diffs against <see cref="PeerToPeerView.LastSentEmoteId" />.
11+
/// </summary>
12+
public class EmoteBoard(int maxPeers)
13+
{
14+
private readonly EmoteState?[] states = new EmoteState?[maxPeers];
15+
16+
public void Start(PeerIndex id, string emoteId, uint serverTick, uint? durationMs) =>
17+
Volatile.Write(ref states[(int)id.Value], new EmoteState(emoteId, serverTick, null, durationMs));
18+
19+
public void Stop(PeerIndex id, uint serverTick) =>
20+
// Maybe it could be useful to keep the start tick if its needed somehow in the future
21+
Volatile.Write(ref states[(int)id.Value], new EmoteState(null, 0, serverTick, StopReason: EmoteStopReason.Cancelled));
22+
23+
public EmoteState? Get(PeerIndex id) =>
24+
Volatile.Read(ref states[(int)id.Value]);
25+
26+
public bool IsEmoting(PeerIndex id) =>
27+
Volatile.Read(ref states[(int)id.Value])?.EmoteId != null;
28+
29+
public void TryComplete(PeerIndex id, uint now)
30+
{
31+
EmoteState? state = Volatile.Read(ref states[(int)id.Value]);
32+
33+
if (state?.EmoteId == null || state.DurationMs == null)
34+
return;
35+
36+
if (now - state.StartTick >= state.DurationMs.Value)
37+
Volatile.Write(ref states[(int)id.Value], new EmoteState(null, 0, now, StopReason: EmoteStopReason.Completed));
38+
}
39+
40+
public void Remove(PeerIndex id) =>
41+
Volatile.Write(ref states[(int)id.Value], null);
42+
}
43+
44+
public record EmoteState(string? EmoteId, uint StartTick, uint? StopTick, uint? DurationMs = null, EmoteStopReason? StopReason = null);

src/DCLPulse/Peers/Simulation/PeerSimulation.cs

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public sealed class PeerSimulation : IPeerSimulation
3131
private readonly ITimeProvider timeProvider;
3232
private readonly ITransport transport;
3333
private readonly ProfileBoard profileBoard;
34+
private readonly EmoteBoard emoteBoard;
3435
private readonly ILogger<PeerSimulation> logger;
3536

3637
/// <summary>
@@ -69,6 +70,7 @@ public PeerSimulation(
6970
ITimeProvider timeProvider,
7071
ITransport transport,
7172
ProfileBoard profileBoard,
73+
EmoteBoard emoteBoard,
7274
ILogger<PeerSimulation> logger)
7375
{
7476
this.areaOfInterest = areaOfInterest;
@@ -80,6 +82,7 @@ public PeerSimulation(
8082
this.timeProvider = timeProvider;
8183
this.transport = transport;
8284
this.profileBoard = profileBoard;
85+
this.emoteBoard = emoteBoard;
8386
this.logger = logger;
8487

8588
BaseTickMs = simulationSteps[0];
@@ -116,6 +119,7 @@ public void SimulateTick(Dictionary<PeerIndex, PeerState> peers, uint tickCounte
116119
spatialGrid.Remove(observerId);
117120
identityBoard.Remove(observerId);
118121
profileBoard.Remove(observerId);
122+
emoteBoard.Remove(observerId);
119123
observerViews.Remove(observerId);
120124
peersToBeRemoved.Add(observerId);
121125
logger.LogInformation("Peer {Peer} removed after disconnected", observerId);
@@ -126,6 +130,9 @@ public void SimulateTick(Dictionary<PeerIndex, PeerState> peers, uint tickCounte
126130
if (observerState.ConnectionState != PeerConnectionState.AUTHENTICATED)
127131
continue;
128132

133+
// Completes the emote in case no observer is near this peer
134+
emoteBoard.TryComplete(observerId, timeProvider.MonotonicTime);
135+
129136
if (!snapshotBoard.TryRead(observerId, out PeerSnapshot observerSnapshot))
130137
continue;
131138

@@ -236,9 +243,12 @@ private void ProcessVisibleSubjects(
236243
else
237244
SendDelta(view.LastSentSnapshot, ITransport.PacketMode.UNRELIABLE_SEQUENCED);
238245

246+
// Only announce on delta because PlayerJoined is considered as an announcement
239247
TryAnnounceProfile();
240248
}
241249

250+
SyncEmoteState();
251+
242252
view.LastSentSeq = subjectSnapshot.Seq;
243253
view.LastSentSnapshot = subjectSnapshot;
244254
view.LastSeenTick = tickCounter;
@@ -265,6 +275,46 @@ void TryAnnounceProfile()
265275
}
266276
}
267277

278+
void SyncEmoteState()
279+
{
280+
// If the emote completion has not been process by the peer's worker yet, try to complete it now
281+
emoteBoard.TryComplete(entry.Subject, timeProvider.MonotonicTime);
282+
283+
EmoteState? emoteState = emoteBoard.Get(entry.Subject);
284+
string? currentEmote = emoteState?.EmoteId;
285+
286+
if (currentEmote == view.LastSentEmoteId)
287+
return;
288+
289+
if (currentEmote != null)
290+
{
291+
messagePipe.Send(new OutgoingMessage(observerId, new ServerMessage
292+
{
293+
EmoteStarted = new EmoteStarted
294+
{
295+
SubjectId = entry.Subject.Value,
296+
ServerTick = emoteState!.StartTick,
297+
EmoteId = currentEmote,
298+
PlayerState = CreatePlayerState(subjectSnapshot),
299+
},
300+
}, ITransport.PacketMode.RELIABLE));
301+
}
302+
else if (emoteState is { StopTick: not null, StopReason: not null })
303+
{
304+
messagePipe.Send(new OutgoingMessage(observerId, new ServerMessage
305+
{
306+
EmoteStopped = new EmoteStopped
307+
{
308+
SubjectId = entry.Subject.Value,
309+
ServerTick = emoteState.StopTick.Value,
310+
Reason = emoteState.StopReason.Value,
311+
},
312+
}, ITransport.PacketMode.RELIABLE));
313+
}
314+
315+
view.LastSentEmoteId = currentEmote;
316+
}
317+
268318
bool SendDelta(PeerSnapshot baseline, ITransport.PacketMode packetMode)
269319
{
270320
if (baseline.Seq == subjectSnapshot.Seq)
@@ -313,7 +363,16 @@ private void SweepStaleViews(PeerIndex observerId, Dictionary<PeerIndex, PeerToP
313363
}
314364
}
315365

316-
private PlayerStateFull CreateFullState(PeerIndex subjectId, PeerSnapshot snapshot)
366+
private PlayerStateFull CreateFullState(PeerIndex subjectId, PeerSnapshot snapshot) =>
367+
new ()
368+
{
369+
SubjectId = subjectId.Value,
370+
Sequence = snapshot.Seq,
371+
ServerTick = snapshot.ServerTick,
372+
State = CreatePlayerState(snapshot),
373+
};
374+
375+
private static PlayerState CreatePlayerState(PeerSnapshot snapshot)
317376
{
318377
var state = new PlayerState
319378
{
@@ -333,12 +392,6 @@ private PlayerStateFull CreateFullState(PeerIndex subjectId, PeerSnapshot snapsh
333392
if (snapshot.HeadPitch.HasValue)
334393
state.HeadPitch = snapshot.HeadPitch.Value;
335394

336-
return new PlayerStateFull
337-
{
338-
SubjectId = subjectId.Value,
339-
Sequence = snapshot.Seq,
340-
ServerTick = snapshot.ServerTick,
341-
State = state,
342-
};
395+
return state;
343396
}
344397
}

src/DCLPulse/Program.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
builder.Services.AddSingleton<ResyncRequestHandler>();
3232
builder.Services.AddSingleton<HandshakeHandler>();
3333
builder.Services.AddSingleton<ProfileAnnouncementHandler>();
34+
builder.Services.AddSingleton<EmoteStartHandler>();
35+
builder.Services.AddSingleton<EmoteStopHandler>();
3436
builder.Services.AddSingleton(new AuthChainValidator(new NethereumPersonalSignVerifier()));
3537

3638
builder.Services.AddSingleton(sp => new Dictionary<ClientMessage.MessageOneofCase, IMessageHandler>
@@ -39,6 +41,8 @@
3941
{ ClientMessage.MessageOneofCase.Input, sp.GetRequiredService<PlayerStateInputHandler>() },
4042
{ ClientMessage.MessageOneofCase.Resync, sp.GetRequiredService<ResyncRequestHandler>() },
4143
{ ClientMessage.MessageOneofCase.ProfileAnnouncement, sp.GetRequiredService<ProfileAnnouncementHandler>() },
44+
{ ClientMessage.MessageOneofCase.EmoteStart, sp.GetRequiredService<EmoteStartHandler>() },
45+
{ ClientMessage.MessageOneofCase.EmoteStop, sp.GetRequiredService<EmoteStopHandler>() },
4246
});
4347

4448
builder.Services.AddSingleton<ProfileBoard>(sp =>
@@ -47,6 +51,12 @@
4751
return new ProfileBoard(transportOptions.MaxPeers);
4852
});
4953

54+
builder.Services.AddSingleton(sp =>
55+
{
56+
ENetTransportOptions transportOptions = sp.GetRequiredService<IOptions<ENetTransportOptions>>().Value;
57+
return new EmoteBoard(transportOptions.MaxPeers);
58+
});
59+
5060
// Simulation
5161
builder.Services.AddSingleton(sp =>
5262
{

src/DCLPulseTests/DrainPeerLifeCycleEventsTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public void SetUp()
4141
timeProvider,
4242
new Dictionary<ClientMessage.MessageOneofCase, IMessageHandler>(),
4343
Substitute.For<ITransport>(),
44-
new ProfileBoard(100));
44+
new ProfileBoard(100),
45+
new EmoteBoard(100));
4546

4647
lifeCycleChannel = Channel.CreateUnbounded<MessagePipe.PeerLifeCycleEvent>();
4748
peers = new Dictionary<PeerIndex, PeerState>();

0 commit comments

Comments
 (0)