1+ using System ;
2+ using System . Globalization ;
3+ using System . Net . Sockets ;
4+ using System . Text ;
5+
6+ namespace QuestDB
7+ {
8+ public class LineTcpSender : IDisposable
9+ {
10+ private static readonly long EpochTicks = new DateTime ( 1970 , 1 , 1 ) . Ticks ;
11+ private readonly TcpClient _client ;
12+ private readonly bool _closeClient ;
13+ private readonly byte [ ] _sendBuffer ;
14+ private int _position ;
15+ private bool _hasMetric ;
16+ private bool _quoted ;
17+ private bool _noFields = true ;
18+
19+ public LineTcpSender ( TcpClient client , int bufferSize = 4096 )
20+ {
21+ _client = client ;
22+ _sendBuffer = new byte [ bufferSize ] ;
23+ }
24+
25+ public LineTcpSender ( String address , int port , int bufferSize = 4096 ) : this ( new TcpClient ( address , port ) , bufferSize )
26+ {
27+ _closeClient = true ;
28+ }
29+
30+ public LineTcpSender Metric ( ReadOnlySpan < char > name )
31+ {
32+ if ( _hasMetric )
33+ {
34+ throw new InvalidOperationException ( "duplicate metric" ) ;
35+ }
36+
37+ _quoted = false ;
38+ _hasMetric = true ;
39+ EncodeUtf8 ( name ) ;
40+ return this ;
41+ }
42+
43+ public LineTcpSender Tag ( ReadOnlySpan < char > tag , ReadOnlySpan < char > value ) {
44+ if ( _hasMetric && _noFields ) {
45+ Put ( ',' ) . EncodeUtf8 ( tag ) . Put ( '=' ) . EncodeUtf8 ( value ) ;
46+ return this ;
47+ }
48+ throw new InvalidOperationException ( "metric expected" ) ;
49+ }
50+
51+ private LineTcpSender Field ( ReadOnlySpan < char > name ) {
52+ if ( _hasMetric ) {
53+ if ( _noFields ) {
54+ Put ( ' ' ) ;
55+ _noFields = false ;
56+ } else {
57+ Put ( ',' ) ;
58+ }
59+
60+ return EncodeUtf8 ( name ) . Put ( '=' ) ;
61+ }
62+ throw new InvalidOperationException ( "metric expected" ) ;
63+ }
64+
65+ public LineTcpSender Field ( ReadOnlySpan < char > name , ReadOnlySpan < char > value )
66+ {
67+ Field ( name ) . Put ( '\" ' ) ;
68+ _quoted = true ;
69+ EncodeUtf8 ( value ) ;
70+ _quoted = false ;
71+ Put ( '\" ' ) ;
72+ return this ;
73+ }
74+
75+ public LineTcpSender Field ( ReadOnlySpan < char > name , long value ) {
76+ Field ( name ) . Put ( value ) . Put ( 'i' ) ;
77+ return this ;
78+ }
79+
80+ public LineTcpSender Field ( ReadOnlySpan < char > name , double value ) {
81+ Field ( name ) . Put ( value . ToString ( CultureInfo . InvariantCulture ) ) ;
82+ return this ;
83+ }
84+
85+ private LineTcpSender Put ( long value )
86+ {
87+ if ( value == long . MinValue )
88+ {
89+ // Special case, long.MinValue cannot be handled by QuestDB
90+ throw new ArgumentOutOfRangeException ( ) ;
91+ }
92+
93+ Span < byte > num = stackalloc byte [ 20 ] ;
94+ int pos = num . Length ;
95+ long remaining = Math . Abs ( value ) ;
96+ do
97+ {
98+ long digit = remaining % 10 ;
99+ num [ -- pos ] = ( byte ) ( '0' + digit ) ;
100+ remaining /= 10 ;
101+ } while ( remaining != 0 ) ;
102+
103+ if ( value < 0 )
104+ {
105+ num [ -- pos ] = ( byte ) '-' ;
106+ }
107+
108+ int len = num . Length - pos ;
109+ if ( _position + len >= _sendBuffer . Length )
110+ {
111+ Flush ( ) ;
112+ }
113+ num . Slice ( pos , len ) . CopyTo ( _sendBuffer . AsSpan ( _position ) ) ;
114+ _position += len ;
115+
116+ return this ;
117+ }
118+
119+ private LineTcpSender EncodeUtf8 ( ReadOnlySpan < char > name )
120+ {
121+ for ( int i = 0 ; i < name . Length ; i ++ )
122+ {
123+ var c = name [ i ] ;
124+ if ( c < 128 )
125+ {
126+ PutSpecial ( c ) ;
127+ }
128+ else
129+ {
130+ PutUtf8 ( c ) ;
131+ }
132+ }
133+
134+ return this ;
135+ }
136+
137+ private void PutUtf8 ( char c )
138+ {
139+ if ( _position + 4 >= _sendBuffer . Length )
140+ {
141+ Flush ( ) ;
142+ }
143+
144+ Span < byte > bytes = _sendBuffer ;
145+ Span < char > chars = stackalloc char [ 1 ] { c } ;
146+ _position += Encoding . UTF8 . GetBytes ( chars , bytes . Slice ( _position , 4 ) ) ;
147+ }
148+
149+ private void PutSpecial ( char c )
150+ {
151+ switch ( c )
152+ {
153+ case ' ' :
154+ case ',' :
155+ case '=' :
156+ if ( ! _quoted )
157+ {
158+ Put ( '\\ ' ) ;
159+ }
160+ goto default ;
161+ default :
162+ Put ( c ) ;
163+ break ;
164+ case '\n ' :
165+ case '\r ' :
166+ Put ( '\\ ' ) . Put ( c ) ;
167+ break ;
168+ case '"' :
169+ if ( _quoted )
170+ {
171+ Put ( '\\ ' ) ;
172+ }
173+
174+ Put ( c ) ;
175+ break ;
176+ case '\\ ' :
177+ Put ( '\\ ' ) . Put ( '\\ ' ) ;
178+ break ;
179+ }
180+ }
181+
182+ private LineTcpSender Put ( ReadOnlySpan < char > chars )
183+ {
184+ foreach ( var c in chars )
185+ {
186+ Put ( c ) ;
187+ }
188+
189+ return this ;
190+ }
191+
192+ private LineTcpSender Put ( char c )
193+ {
194+ if ( _position + 1 >= _sendBuffer . Length )
195+ {
196+ Flush ( ) ;
197+ }
198+
199+ _sendBuffer [ _position ++ ] = ( byte ) c ;
200+ return this ;
201+ }
202+
203+ public void Flush ( )
204+ {
205+ _position -= _client . Client . Send ( _sendBuffer , 0 , _position , SocketFlags . None ) ;
206+ }
207+
208+ public void Dispose ( )
209+ {
210+ try
211+ {
212+ if ( _position > 0 )
213+ {
214+ Flush ( ) ;
215+ }
216+ }
217+ catch ( Exception ex )
218+ {
219+ Console . Error . WriteLine ( "Error on disposing LineTcpClient: {0}" , ex ) ;
220+ }
221+ finally
222+ {
223+ if ( _closeClient )
224+ {
225+ _client . Dispose ( ) ;
226+ }
227+ }
228+ }
229+
230+ public void AtNow ( )
231+ {
232+ Put ( '\n ' ) ;
233+ _hasMetric = false ;
234+ _noFields = true ;
235+ }
236+
237+ public void At ( DateTime timestamp )
238+ {
239+ long epoch = timestamp . Ticks - EpochTicks ;
240+ Put ( ' ' ) . Put ( epoch ) . Put ( "00" ) . AtNow ( ) ;
241+ }
242+ }
243+ }
0 commit comments