1
+ using System ;
2
+ using System . IO ;
3
+ using System . IO . Compression ;
4
+ using SpacetimeDB . ClientApi ;
5
+
6
+ namespace SpacetimeDB
7
+ {
8
+ internal class CompressionHelpers
9
+ {
10
+ /// <summary>
11
+ /// Compression algorithms supported for data processing.
12
+ /// Used to specify the compression method for serializing and deserializing messages
13
+ /// between the client and SpacetimeDB server. The selected algorithm determines
14
+ /// how data such as query updates and server messages are compressed or decompressed.
15
+ /// </summary>
16
+ internal enum CompressionAlgos : byte
17
+ {
18
+ None = 0 ,
19
+ Brotli = 1 ,
20
+ Gzip = 2 ,
21
+ }
22
+
23
+ /// <summary>
24
+ /// Creates a <see cref="BrotliStream"/> for decompressing the provided stream.
25
+ /// </summary>
26
+ /// <param name="stream">The input stream containing Brotli-compressed data.</param>
27
+ /// <returns>A <see cref="BrotliStream"/> set to decompression mode.</returns>
28
+ internal static BrotliStream BrotliReader ( Stream stream )
29
+ {
30
+ return new BrotliStream ( stream , CompressionMode . Decompress ) ;
31
+ }
32
+
33
+ /// <summary>
34
+ /// Creates a <see cref="GZipStream"/> for decompressing the provided stream.
35
+ /// </summary>
36
+ /// <param name="stream">The input stream containing GZip-compressed data.</param>
37
+ /// <returns>A <see cref="GZipStream"/> set to decompression mode.</returns>
38
+ internal static GZipStream GzipReader ( Stream stream )
39
+ {
40
+ return new GZipStream ( stream , CompressionMode . Decompress ) ;
41
+ }
42
+
43
+ /// <summary>
44
+ /// Decompresses and decodes a serialized <see cref="ServerMessage"/> from a byte array,
45
+ /// automatically handling the specified compression algorithm (None, Brotli, or Gzip).
46
+ /// Ensures efficient decompression by reading the entire stream at once to avoid
47
+ /// performance issues with certain stream implementations.
48
+ /// Throws <see cref="InvalidOperationException"/> if an unknown compression type is encountered.
49
+ /// </summary>
50
+ /// <param name="bytes">The compressed and encoded server message as a byte array.</param>
51
+ /// <returns>The deserialized <see cref="ServerMessage"/> object.</returns>
52
+ internal static ServerMessage DecompressDecodeMessage ( byte [ ] bytes )
53
+ {
54
+ using var stream = new MemoryStream ( bytes ) ;
55
+
56
+ // The stream will never be empty. It will at least contain the compression algo.
57
+ var compression = ( CompressionAlgos ) stream . ReadByte ( ) ;
58
+ // Conditionally decompress and decode.
59
+ Stream decompressedStream = compression switch
60
+ {
61
+ CompressionAlgos . None => stream ,
62
+ CompressionAlgos . Brotli => BrotliReader ( stream ) ,
63
+ CompressionAlgos . Gzip => GzipReader ( stream ) ,
64
+ _ => throw new InvalidOperationException ( "Unknown compression type" ) ,
65
+ } ;
66
+
67
+ // TODO: consider pooling these.
68
+ // DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array
69
+ // PER BYTE READ. You have to do it all at once to avoid that problem.
70
+ MemoryStream memoryStream = new MemoryStream ( ) ;
71
+ decompressedStream . CopyTo ( memoryStream ) ;
72
+ memoryStream . Seek ( 0 , SeekOrigin . Begin ) ;
73
+ return new ServerMessage . BSATN ( ) . Read ( new BinaryReader ( memoryStream ) ) ;
74
+ }
75
+
76
+
77
+ /// <summary>
78
+ /// Decompresses and decodes a <see cref="CompressableQueryUpdate"/> into a <see cref="QueryUpdate"/> object,
79
+ /// automatically handling uncompressed, Brotli, or Gzip-encoded data. Ensures efficient decompression by
80
+ /// reading the entire stream at once to avoid performance issues with certain stream implementations.
81
+ /// Throws <see cref="InvalidOperationException"/> if the compression type is unrecognized.
82
+ /// </summary>
83
+ /// <param name="update">The compressed or uncompressed query update.</param>
84
+ /// <returns>The deserialized <see cref="QueryUpdate"/> object.</returns>
85
+ internal static QueryUpdate DecompressDecodeQueryUpdate ( CompressableQueryUpdate update )
86
+ {
87
+ Stream decompressedStream ;
88
+
89
+ switch ( update )
90
+ {
91
+ case CompressableQueryUpdate . Uncompressed ( var qu ) :
92
+ return qu ;
93
+
94
+ case CompressableQueryUpdate . Brotli ( var bytes ) :
95
+ decompressedStream = CompressionHelpers . BrotliReader ( new MemoryStream ( bytes . ToArray ( ) ) ) ;
96
+ break ;
97
+
98
+ case CompressableQueryUpdate . Gzip ( var bytes ) :
99
+ decompressedStream = CompressionHelpers . GzipReader ( new MemoryStream ( bytes . ToArray ( ) ) ) ;
100
+ break ;
101
+
102
+ default :
103
+ throw new InvalidOperationException ( ) ;
104
+ }
105
+
106
+ // TODO: consider pooling these.
107
+ // DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array
108
+ // PER BYTE READ. You have to do it all at once to avoid that problem.
109
+ MemoryStream memoryStream = new MemoryStream ( ) ;
110
+ decompressedStream . CopyTo ( memoryStream ) ;
111
+ memoryStream . Seek ( 0 , SeekOrigin . Begin ) ;
112
+ return new QueryUpdate . BSATN ( ) . Read ( new BinaryReader ( memoryStream ) ) ;
113
+ }
114
+
115
+ /// <summary>
116
+ /// Prepare to read a BsatnRowList.
117
+ ///
118
+ /// This could return an IEnumerable, but we return the reader and row count directly to avoid an allocation.
119
+ /// It is legitimate to repeatedly call <c>IStructuralReadWrite.Read<T></c> <c>rowCount</c> times on the resulting
120
+ /// BinaryReader:
121
+ /// Our decoding infrastructure guarantees that reading a value consumes the correct number of bytes
122
+ /// from the BinaryReader. (This is easy because BSATN doesn't have padding.)
123
+ ///
124
+ /// Previously here we were using LINQ to do what we're now doing with a custsom reader.
125
+ ///
126
+ /// Why are we no longer using LINQ?
127
+ ///
128
+ /// The calls in question, namely `Skip().Take()`, were fast under the Mono runtime,
129
+ /// but *much* slower when compiled AOT with IL2CPP.
130
+ /// Apparently Mono's JIT is smart enough to optimize away these LINQ ops,
131
+ /// resulting in a linear scan of the `BsatnRowList`.
132
+ /// Unfortunately IL2CPP could not, resulting in a quadratic scan.
133
+ /// See: https://github.com/clockworklabs/com.clockworklabs.spacetimedbsdk/pull/306
134
+ /// </summary>
135
+ /// <param name="list"></param>
136
+ /// <returns>A reader for the rows of the list and a count of rows.</returns>
137
+ internal static ( BinaryReader reader , int rowCount ) ParseRowList ( BsatnRowList list ) =>
138
+ (
139
+ new BinaryReader ( new ListStream ( list . RowsData ) ) ,
140
+ list . SizeHint switch
141
+ {
142
+ RowSizeHint . FixedSize ( var size ) => list . RowsData . Count / size ,
143
+ RowSizeHint . RowOffsets ( var offsets ) => offsets . Count ,
144
+ _ => throw new NotImplementedException ( )
145
+ }
146
+ ) ;
147
+ }
148
+ }
0 commit comments