Skip to content

Commit bba8e5c

Browse files
authored
Fix a bug about chunk loader (#36)
1 parent e24858c commit bba8e5c

File tree

18 files changed

+305
-125
lines changed

18 files changed

+305
-125
lines changed

src/MineCase.Gateway/Network/ClientSession.cs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Microsoft.Extensions.ObjectPool;
2+
using MineCase.Buffers;
23
using MineCase.Protocol;
34
using MineCase.Server.Network;
45
using Orleans;
@@ -23,12 +24,14 @@ class ClientSession : IDisposable
2324
private readonly OutcomingPacketObserver _outcomingPacketObserver;
2425
private readonly ActionBlock<UncompressedPacket> _outcomingPacketDispatcher;
2526
private readonly ObjectPool<UncompressedPacket> _uncompressedPacketObjectPool;
27+
private readonly IBufferPool<byte> _bufferPool;
2628

27-
public ClientSession(TcpClient tcpClient, IGrainFactory grainFactory, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
29+
public ClientSession(TcpClient tcpClient, IGrainFactory grainFactory, IBufferPool<byte> bufferPool, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
2830
{
2931
_sessionId = Guid.NewGuid();
3032
_tcpClient = tcpClient;
3133
_grainFactory = grainFactory;
34+
_bufferPool = bufferPool;
3235
_uncompressedPacketObjectPool = uncompressedPacketObjectPool;
3336
_outcomingPacketObserver = new OutcomingPacketObserver(this);
3437
_outcomingPacketDispatcher = new ActionBlock<UncompressedPacket>(SendOutcomingPacket);
@@ -57,35 +60,43 @@ public async Task Startup(CancellationToken cancellationToken)
5760

5861
private void OnClosed()
5962
{
60-
_outcomingPacketDispatcher.Complete();
61-
_tcpClient.Client.Shutdown(SocketShutdown.Send);
63+
_outcomingPacketDispatcher.Post(null);
6264
}
6365

6466
private async Task DispatchIncomingPacket()
6567
{
66-
var packet = _uncompressedPacketObjectPool.Get();
67-
try
68+
using (var bufferScope = _bufferPool.CreateScope())
6869
{
69-
if (_useCompression)
70+
var packet = _uncompressedPacketObjectPool.Get();
71+
try
7072
{
71-
var compressedPacket = await CompressedPacket.DeserializeAsync(_remoteStream, null);
72-
packet = PacketCompress.Decompress(ref compressedPacket);
73+
if (_useCompression)
74+
{
75+
var compressedPacket = await CompressedPacket.DeserializeAsync(_remoteStream, null);
76+
packet = PacketCompress.Decompress(ref compressedPacket);
77+
}
78+
else
79+
{
80+
packet = await UncompressedPacket.DeserializeAsync(_remoteStream, bufferScope, packet);
81+
}
82+
await DispatchIncomingPacket(packet);
7383
}
74-
else
84+
finally
7585
{
76-
packet = await UncompressedPacket.DeserializeAsync(_remoteStream, packet);
86+
_uncompressedPacketObjectPool.Return(packet);
7787
}
78-
await DispatchIncomingPacket(packet);
79-
}
80-
finally
81-
{
82-
_uncompressedPacketObjectPool.Return(packet);
8388
}
8489
}
8590

8691
private async Task SendOutcomingPacket(UncompressedPacket packet)
8792
{
88-
if (_useCompression)
93+
// Close
94+
if(packet == null)
95+
{
96+
_tcpClient.Client.Shutdown(SocketShutdown.Send);
97+
_outcomingPacketDispatcher.Complete();
98+
}
99+
else if (_useCompression)
89100
{
90101
var newPacket = PacketCompress.Compress(ref packet);
91102
await newPacket.SerializeAsync(_remoteStream);

src/MineCase.Gateway/Network/ConnectionRouter.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Microsoft.Extensions.Logging;
22
using Microsoft.Extensions.ObjectPool;
3+
using MineCase.Buffers;
34
using MineCase.Protocol;
45
using MineCase.Server.Settings;
56
using Orleans;
@@ -18,12 +19,14 @@ class ConnectionRouter
1819
private readonly TcpListener _listener;
1920
private readonly IGrainFactory _grainFactory;
2021
private readonly ILogger _logger;
22+
private readonly IBufferPool<byte> _bufferPool;
2123
private readonly ObjectPool<UncompressedPacket> _uncompressedPacketObjectPool;
2224

23-
public ConnectionRouter(IGrainFactory grainFactory, ILoggerFactory loggerFactory, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
25+
public ConnectionRouter(IGrainFactory grainFactory, ILoggerFactory loggerFactory, IBufferPool<byte> bufferPool, ObjectPool<UncompressedPacket> uncompressedPacketObjectPool)
2426
{
2527
_grainFactory = grainFactory;
2628
_logger = loggerFactory.CreateLogger<ConnectionRouter>();
29+
_bufferPool = bufferPool;
2730
_uncompressedPacketObjectPool = uncompressedPacketObjectPool;
2831
_listener = new TcpListener(new IPEndPoint(IPAddress.Any, 25565));
2932
}
@@ -44,7 +47,7 @@ private async void DispatchIncomingClient(TcpClient tcpClient, CancellationToken
4447
try
4548
{
4649
_logger.LogInformation($"Incoming connection from {tcpClient.Client.RemoteEndPoint}.");
47-
using (var session = new ClientSession(tcpClient, _grainFactory, _uncompressedPacketObjectPool))
50+
using (var session = new ClientSession(tcpClient, _grainFactory, _bufferPool, _uncompressedPacketObjectPool))
4851
{
4952
await session.Startup(cancellationToken);
5053
}

src/MineCase.Gateway/Program.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
using MineCase.Gateway.Network;
99
using Microsoft.Extensions.ObjectPool;
1010
using MineCase.Protocol;
11+
using MineCase.Buffers;
12+
using System.Buffers;
1113

1214
namespace MineCase.Gateway
1315
{
@@ -55,6 +57,7 @@ private static void ConfigureObjectPools(IServiceCollection services)
5557
var provider = s.GetRequiredService<ObjectPoolProvider>();
5658
return provider.Create<UncompressedPacket>();
5759
});
60+
services.AddSingleton<IBufferPool<byte>>(s => new BufferPool<byte>(ArrayPool<byte>.Shared));
5861
}
5962

6063
private static IConfiguration LoadConfiguration()
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Text;
6+
7+
namespace MineCase.Buffers
8+
{
9+
public interface IBufferPool<T>
10+
{
11+
IBufferPoolScope<T> CreateScope();
12+
}
13+
14+
public interface IBufferPoolScope<T> : IDisposable
15+
{
16+
ArraySegment<T> Rent(int length);
17+
}
18+
19+
public class BufferPool<T> : IBufferPool<T>
20+
{
21+
private readonly ArrayPool<T> _arrayPool;
22+
private readonly ConcurrentBag<BufferPoolScope> _scopes = new ConcurrentBag<BufferPoolScope>();
23+
24+
public BufferPool(ArrayPool<T> arrayPool)
25+
{
26+
_arrayPool = arrayPool;
27+
}
28+
29+
public IBufferPoolScope<T> CreateScope()
30+
{
31+
if (!_scopes.TryTake(out var scope))
32+
scope = new BufferPoolScope(this, _arrayPool);
33+
return scope;
34+
}
35+
36+
private void Return(BufferPoolScope scope)
37+
{
38+
_scopes.Add(scope);
39+
}
40+
41+
private class BufferPoolScope : IBufferPoolScope<T>
42+
{
43+
private readonly BufferPool<T> _bufferPool;
44+
private readonly ArrayPool<T> _arrayPool;
45+
private readonly ConcurrentBag<T[]> _rents = new ConcurrentBag<T[]>();
46+
47+
public BufferPoolScope(BufferPool<T> bufferPool, ArrayPool<T> arrayPool)
48+
{
49+
_bufferPool = bufferPool;
50+
_arrayPool = arrayPool;
51+
}
52+
53+
public void Dispose()
54+
{
55+
while (_rents.TryTake(out var rent))
56+
_arrayPool.Return(rent);
57+
_bufferPool.Return(this);
58+
}
59+
60+
public ArraySegment<T> Rent(int length)
61+
{
62+
var rent = _arrayPool.Rent(length);
63+
_rents.Add(rent);
64+
return new ArraySegment<T>(rent, 0, length);
65+
}
66+
}
67+
}
68+
}

src/MineCase.Protocol/Protocol/Packet.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Runtime.ExceptionServices;
55
using System.Text;
66
using System.Threading.Tasks;
7+
using MineCase.Buffers;
78
using MineCase.Serialization;
89
using Orleans.Concurrency;
910

@@ -19,11 +20,11 @@ public class UncompressedPacket
1920
public uint PacketId;
2021

2122
[SerializeAs(DataType.ByteArray)]
22-
public byte[] Data;
23+
public ArraySegment<byte> Data;
2324

2425
public async Task SerializeAsync(Stream stream)
2526
{
26-
Length = (uint)Data.Length + PacketId.SizeOfVarInt();
27+
Length = (uint)Data.Count + PacketId.SizeOfVarInt();
2728

2829
using (var bw = new BinaryWriter(stream, Encoding.UTF8, true))
2930
{
@@ -32,10 +33,10 @@ public async Task SerializeAsync(Stream stream)
3233
bw.Flush();
3334
}
3435

35-
await stream.WriteAsync(Data, 0, Data.Length);
36+
await stream.WriteAsync(Data.Array, Data.Offset, Data.Count);
3637
}
3738

38-
public static async Task<UncompressedPacket> DeserializeAsync(Stream stream, UncompressedPacket packet = null)
39+
public static async Task<UncompressedPacket> DeserializeAsync(Stream stream, IBufferPoolScope<byte> bufferPool, UncompressedPacket packet = null)
3940
{
4041
packet = packet ?? new UncompressedPacket();
4142
int packetIdLen;
@@ -45,8 +46,8 @@ public static async Task<UncompressedPacket> DeserializeAsync(Stream stream, Unc
4546
packet.PacketId = br.ReadAsVarInt(out packetIdLen);
4647
}
4748

48-
packet.Data = new byte[packet.Length - packetIdLen];
49-
await stream.ReadExactAsync(packet.Data, 0, packet.Data.Length);
49+
packet.Data = bufferPool.Rent((int)(packet.Length - packetIdLen));
50+
await stream.ReadExactAsync(packet.Data.Array, packet.Data.Offset, packet.Data.Count);
5051
return packet;
5152
}
5253
}

src/MineCase.Protocol/Protocol/PacketCompress.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public static class PacketCompress
1111
{
1212
public static UncompressedPacket Decompress(ref CompressedPacket packet)
1313
{
14+
throw new NotImplementedException();
15+
/*
1416
var targetPacket = default(UncompressedPacket);
1517
using (var br = new BinaryReader(new DeflateStream(new MemoryStream(packet.CompressedData), CompressionMode.Decompress)))
1618
{
@@ -19,11 +21,13 @@ public static UncompressedPacket Decompress(ref CompressedPacket packet)
1921
}
2022
2123
targetPacket.Length = packet.DataLength;
22-
return targetPacket;
24+
return targetPacket;*/
2325
}
2426

2527
public static CompressedPacket Compress(ref UncompressedPacket packet)
2628
{
29+
throw new NotImplementedException();
30+
/*
2731
var targetPacket = default(CompressedPacket);
2832
using (var stream = new MemoryStream())
2933
using (var bw = new BinaryWriter(new DeflateStream(stream, CompressionMode.Compress)))
@@ -36,7 +40,7 @@ public static CompressedPacket Compress(ref UncompressedPacket packet)
3640
targetPacket.CompressedData = stream.ToArray();
3741
}
3842
39-
return targetPacket;
43+
return targetPacket;*/
4044
}
4145
}
4246
}

src/MineCase.Server.Grains/Game/ChunkSenderGrain.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public override Task OnActivateAsync()
1919
return base.OnActivateAsync();
2020
}
2121

22-
public Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink> clients, IReadOnlyCollection<IUser> users)
22+
public Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink> clients, IReadOnlyCollection<IUserChunkLoader> loaders)
2323
{
2424
var stream = GetStreamProvider(StreamProviders.JobsProvider).GetStream<SendChunkJob>(_jobWorkerId, StreamProviders.Namespaces.ChunkSender);
2525
stream.OnNextAsync(new SendChunkJob
@@ -28,7 +28,7 @@ public Task PostChunk(int x, int z, IReadOnlyCollection<IClientboundPacketSink>
2828
ChunkX = x,
2929
ChunkZ = z,
3030
Clients = clients,
31-
Users = users
31+
Loaders = loaders
3232
}).Ignore();
3333
return Task.CompletedTask;
3434
}

src/MineCase.Server.Grains/Game/ChunkSenderJobWorker.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public sealed class SendChunkJob
2424

2525
public IReadOnlyCollection<IClientboundPacketSink> Clients { get; set; }
2626

27-
public IReadOnlyCollection<IUser> Users { get; set; }
27+
public IReadOnlyCollection<IUserChunkLoader> Loaders { get; set; }
2828
}
2929

3030
internal interface IChunkSenderJobWorker : IGrainWithGuidKey
@@ -53,8 +53,8 @@ private async Task OnNextAsync(SendChunkJob job, StreamSequenceToken token)
5353

5454
var generator = new ClientPlayPacketGenerator(new BroadcastPacketSink(job.Clients, _packetPackager));
5555
await generator.ChunkData(Dimension.Overworld, job.ChunkX, job.ChunkZ, await chunkColumn.GetState());
56-
foreach (var user in job.Users)
57-
user.OnChunkSent(job.ChunkX, job.ChunkZ).Ignore();
56+
foreach (var loader in job.Loaders)
57+
loader.OnChunkSent(job.ChunkX, job.ChunkZ).Ignore();
5858
}
5959

6060
private class BroadcastPacketSink : IPacketSink

src/MineCase.Server.Grains/MineCase.Server.Grains.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="Autofac" Version="4.6.1" />
1212
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
13+
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="1.2.2" />
1314
<PackageReference Include="Microsoft.Orleans.Core" Version="2.0.0-preview2-20170724" />
1415
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004" PrivateAssets="All" />
1516
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />

src/MineCase.Server.Grains/Network/ClientboundPacketSinkGrain.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public Task SendPacket(uint packetId, Immutable<byte[]> data)
5252
var packet = new UncompressedPacket
5353
{
5454
PacketId = packetId,
55-
Data = data.Value
55+
Data = new ArraySegment<byte>(data.Value)
5656
};
5757
_subsManager.Notify(n => n.ReceivePacket(packet));
5858
return Task.CompletedTask;

0 commit comments

Comments
 (0)