Skip to content

Commit d701854

Browse files
AlexEndrisDmitryLukyanov
authored andcommitted
CSHARP-1991: Add support for OP_COMPRESSED.
1 parent 8ae8fbe commit d701854

File tree

58 files changed

+2232
-472
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2232
-472
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/* Copyright 2013-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
namespace MongoDB.Driver.Core.Compression
17+
{
18+
/// <summary>
19+
/// Represents the compressor id.
20+
/// </summary>
21+
public enum CompressorId
22+
{
23+
/// <summary>
24+
/// No compression.
25+
/// </summary>
26+
noop = 0,
27+
///// <summary>
28+
///// Compression using snappy algorithm. NOT SUPPORTED YET.
29+
///// </summary>
30+
//snappy = 1,
31+
/// <summary>
32+
/// Compression using zlib algorithm.
33+
/// </summary>
34+
zlib = 2
35+
}
36+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/* Copyright 2013-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
namespace MongoDB.Driver.Core.Compression
17+
{
18+
/// <summary>
19+
/// Represents a compressor.
20+
/// </summary>
21+
public interface ICompressor
22+
{
23+
/// <summary>
24+
/// Gets the name of the compressor.
25+
/// </summary>
26+
string Name { get; }
27+
28+
/// <summary>
29+
/// Gets the id of the compressor
30+
/// </summary>
31+
CompressorId Id { get; }
32+
33+
/// <summary>
34+
/// Compresses the specified byte array with a given offset.
35+
/// </summary>
36+
/// <param name="bytesToCompress">Bytes to compress.</param>
37+
/// <param name="offset">Offset of the bytes.</param>
38+
byte[] Compress(byte[] bytesToCompress, int offset);
39+
40+
/// <summary>
41+
/// Decompresses the specified byte array.
42+
/// </summary>
43+
/// <param name="bytesToDecompress">Bytes to decompress.</param>
44+
byte[] Decompress(byte[] bytesToDecompress);
45+
}
46+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/* Copyright 2013-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Collections.Generic;
17+
using System.Diagnostics.CodeAnalysis;
18+
using System.IO;
19+
using System.Text;
20+
using MongoDB.Bson.IO;
21+
using MongoDB.Driver.Core.WireProtocol.Messages;
22+
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
23+
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders.BinaryEncoders;
24+
25+
namespace MongoDB.Driver.Core.Compression
26+
{
27+
internal sealed class MessageCompressionHelper
28+
{
29+
private const int MessageHeaderLength = 16;
30+
private readonly IDictionary<CompressorId, ICompressor> _compressorDictionary;
31+
private readonly MessageEncoderSettings _messageEncoderSettings;
32+
33+
public MessageCompressionHelper(IDictionary<CompressorId, ICompressor> compressorDictionary, MessageEncoderSettings messageEncoderSettings)
34+
{
35+
_compressorDictionary = compressorDictionary;
36+
_messageEncoderSettings = messageEncoderSettings;
37+
}
38+
39+
public bool IsCompressedMessage(Stream stream)
40+
{
41+
var bsonStream = GetBsonStream(stream);
42+
43+
var position = stream.Position;
44+
45+
// Skip header to opcode
46+
bsonStream.ReadInt32();
47+
bsonStream.ReadInt32();
48+
bsonStream.ReadInt32();
49+
var isCompressed = bsonStream.ReadInt32() == (int)Opcode.Compressed;
50+
51+
stream.Position = position;
52+
53+
return isCompressed;
54+
}
55+
56+
public Stream UncompressMessage(Stream stream)
57+
{
58+
var bsonStream = GetBsonStream(stream);
59+
return ReadCompressedStream(bsonStream);
60+
}
61+
62+
public ResponseMessage ReadCompressedResponseMessage(Stream stream, IMessageEncoderSelector encoderSelector)
63+
{
64+
var bsonStream = GetBsonStream(stream);
65+
using (var uncompressedStream = ReadCompressedStream(bsonStream))
66+
{
67+
return (ResponseMessage) ReadMessage(uncompressedStream, encoderSelector);
68+
}
69+
}
70+
71+
private static BsonStream GetBsonStream(Stream stream)
72+
{
73+
return stream as BsonStream ?? new BsonStreamAdapter(stream);
74+
}
75+
76+
[SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
77+
private ByteBufferStream ReadCompressedStream(BsonStream stream)
78+
{
79+
var compressedMessageLength = stream.ReadInt32();
80+
var requestId = stream.ReadInt32();
81+
var responseTo = stream.ReadInt32();
82+
stream.ReadInt32(); //OP_COMPRESSED
83+
84+
var originalOpCode = (Opcode) stream.ReadInt32();
85+
var originalMessageSize = stream.ReadInt32();
86+
var compressorId = (CompressorId) stream.ReadByte();
87+
var compressor = GetCompressor(compressorId);
88+
89+
var compressedBytes = stream.ReadBytes(compressedMessageLength - (int) stream.Position);
90+
91+
var uncompressedBytes = compressor.Decompress(compressedBytes);
92+
93+
using (var memStream = new MemoryStream())
94+
{
95+
using (var writer = new BinaryWriter(memStream, Encoding.UTF8, true))
96+
{
97+
writer.Write(originalMessageSize + MessageHeaderLength);
98+
writer.Write(requestId);
99+
writer.Write(responseTo);
100+
writer.Write((int)originalOpCode);
101+
102+
writer.Write(uncompressedBytes);
103+
}
104+
105+
var buffer = new ByteArrayBuffer(memStream.ToArray());
106+
buffer.MakeReadOnly();
107+
return new ByteBufferStream(buffer);
108+
}
109+
}
110+
111+
private ICompressor GetCompressor(CompressorId compressorId)
112+
{
113+
ICompressor compressor;
114+
115+
if (_compressorDictionary.TryGetValue(compressorId, out compressor))
116+
return compressor;
117+
118+
throw new MongoClientException($"Unsupported compressor with identifier {(int)compressorId}");
119+
}
120+
121+
private MongoDBMessage ReadMessage(ByteBufferStream stream, IMessageEncoderSelector encoderSelector)
122+
{
123+
var encoderFactory = new BinaryMessageEncoderFactory(stream, _messageEncoderSettings);
124+
var encoder = encoderSelector.GetEncoder(encoderFactory);
125+
return encoder.ReadMessage();
126+
}
127+
}
128+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/* Copyright 2013-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Diagnostics.CodeAnalysis;
17+
using System.IO;
18+
using SharpCompress.Compressors;
19+
using SharpCompress.Compressors.Deflate;
20+
21+
namespace MongoDB.Driver.Core.Compression
22+
{
23+
/// <summary>
24+
/// Compressors using zlib algorithm
25+
/// </summary>
26+
public sealed class ZlibCompressor : ICompressor
27+
{
28+
private readonly CompressionLevel _compressionLevel;
29+
30+
/// <inheritdoc />
31+
public string Name => "zlib";
32+
33+
/// <inheritdoc />
34+
public CompressorId Id => CompressorId.zlib;
35+
36+
/// <summary>
37+
/// Initializes a new instance of the <see cref="ZlibCompressor" /> class.
38+
/// </summary>
39+
/// <param name="compressionLevel">The compression level.</param>
40+
public ZlibCompressor(int compressionLevel)
41+
{
42+
_compressionLevel = GetCompressionLevel(compressionLevel);
43+
}
44+
45+
/// <inheritdoc />
46+
[SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
47+
public byte[] Compress(byte[] bytesToCompress, int offset)
48+
{
49+
using (var memoryStream = new MemoryStream())
50+
{
51+
using (var zlibStream = new ZlibStream(memoryStream, CompressionMode.Compress, _compressionLevel))
52+
{
53+
zlibStream.Write(bytesToCompress, offset, bytesToCompress.Length - offset);
54+
}
55+
56+
return memoryStream.ToArray();
57+
}
58+
}
59+
60+
/// <inheritdoc />
61+
[SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
62+
public byte[] Decompress(byte[] bytesToDecompress)
63+
{
64+
using (var memoryStream = new MemoryStream())
65+
{
66+
using (var zlibStream = new ZlibStream(memoryStream, CompressionMode.Decompress))
67+
{
68+
zlibStream.Write(bytesToDecompress, 0, bytesToDecompress.Length);
69+
}
70+
71+
return memoryStream.ToArray();
72+
}
73+
}
74+
75+
private static CompressionLevel GetCompressionLevel(int compressionLevel)
76+
{
77+
if (compressionLevel < 0)
78+
return CompressionLevel.Default;
79+
80+
if (compressionLevel > 9)
81+
return CompressionLevel.BestCompression;
82+
83+
return (CompressionLevel) compressionLevel;
84+
}
85+
}
86+
}

src/MongoDB.Driver.Core/Core/Configuration/ClusterBuilderExtensions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ public static ClusterBuilder ConfigureWithConnectionString(this ClusterBuilder b
125125
{
126126
builder = builder.ConfigureConnection(s => s.With(maxLifeTime: connectionString.MaxLifeTime.Value));
127127
}
128+
if (connectionString.Compressors != null)
129+
{
130+
builder = builder.ConfigureConnection(s => s.With(compressors: Optional.Enumerable(connectionString.Compressors)));
131+
}
128132

129133
// Connection Pool
130134
if (connectionString.MaxPoolSize != null)

src/MongoDB.Driver.Core/Core/Configuration/ConnectionSettings.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,26 @@ public class ConnectionSettings
3737
private readonly IReadOnlyList<IAuthenticator> _authenticators;
3838
private readonly TimeSpan _maxIdleTime;
3939
private readonly TimeSpan _maxLifeTime;
40+
private readonly IEnumerable<MongoCompressor> _compressors;
4041

4142
// constructors
4243
/// <summary>
4344
/// Initializes a new instance of the <see cref="ConnectionSettings" /> class.
4445
/// </summary>
4546
/// <param name="authenticators">The authenticators.</param>
47+
/// <param name="compressors">The compressors</param>
4648
/// <param name="maxIdleTime">The maximum idle time.</param>
4749
/// <param name="maxLifeTime">The maximum life time.</param>
4850
/// <param name="applicationName">The application name.</param>
4951
public ConnectionSettings(
5052
Optional<IEnumerable<IAuthenticator>> authenticators = default(Optional<IEnumerable<IAuthenticator>>),
53+
Optional<IEnumerable<MongoCompressor>> compressors = default(Optional<IEnumerable<MongoCompressor>>),
5154
Optional<TimeSpan> maxIdleTime = default(Optional<TimeSpan>),
5255
Optional<TimeSpan> maxLifeTime = default(Optional<TimeSpan>),
5356
Optional<string> applicationName = default(Optional<string>))
5457
{
5558
_authenticators = Ensure.IsNotNull(authenticators.WithDefault(__noAuthenticators), "authenticators").ToList();
59+
_compressors = Ensure.IsNotNull(compressors.WithDefault(Enumerable.Empty<MongoCompressor>()), nameof(compressors)).ToList();
5660
_maxIdleTime = Ensure.IsGreaterThanZero(maxIdleTime.WithDefault(TimeSpan.FromMinutes(10)), "maxIdleTime");
5761
_maxLifeTime = Ensure.IsGreaterThanZero(maxLifeTime.WithDefault(TimeSpan.FromMinutes(30)), "maxLifeTime");
5862
_applicationName = ApplicationNameHelper.EnsureApplicationNameIsValid(applicationName.WithDefault(null), nameof(applicationName));
@@ -103,23 +107,34 @@ public TimeSpan MaxLifeTime
103107
get { return _maxLifeTime; }
104108
}
105109

110+
/// <summary>
111+
/// The compressors
112+
/// </summary>
113+
public IEnumerable<MongoCompressor> Compressors
114+
{
115+
get { return _compressors; }
116+
}
117+
106118
// methods
107119
/// <summary>
108120
/// Returns a new ConnectionSettings instance with some settings changed.
109121
/// </summary>
110122
/// <param name="authenticators">The authenticators.</param>
123+
/// <param name="compressors">The compressors.</param>
111124
/// <param name="maxIdleTime">The maximum idle time.</param>
112125
/// <param name="maxLifeTime">The maximum life time.</param>
113126
/// <param name="applicationName">The application name.</param>
114127
/// <returns>A new ConnectionSettings instance.</returns>
115128
public ConnectionSettings With(
116129
Optional<IEnumerable<IAuthenticator>> authenticators = default(Optional<IEnumerable<IAuthenticator>>),
130+
Optional<IEnumerable<MongoCompressor>> compressors = default(Optional<IEnumerable<MongoCompressor>>),
117131
Optional<TimeSpan> maxIdleTime = default(Optional<TimeSpan>),
118132
Optional<TimeSpan> maxLifeTime = default(Optional<TimeSpan>),
119133
Optional<string> applicationName = default(Optional<string>))
120134
{
121135
return new ConnectionSettings(
122136
authenticators: Optional.Enumerable(authenticators.WithDefault(_authenticators)),
137+
compressors: Optional.Enumerable(compressors.WithDefault(_compressors)),
123138
maxIdleTime: maxIdleTime.WithDefault(_maxIdleTime),
124139
maxLifeTime: maxLifeTime.WithDefault(_maxLifeTime),
125140
applicationName: applicationName.WithDefault(_applicationName));

0 commit comments

Comments
 (0)