@@ -90,6 +90,9 @@ def initialize(tag_prefix = nil, *args)
9090 end
9191 @packer = @factory . packer
9292
93+ @require_ack_response = options [ :require_ack_response ]
94+ @ack_response_timeout = options [ :ack_response_timeout ] || 190
95+
9396 @mon = Monitor . new
9497 @pending = nil
9598 @connect_error_history = [ ]
@@ -144,7 +147,7 @@ def close
144147 if @pending
145148 begin
146149 @pending . each do |tag , record |
147- send_data ( [ tag , record ] . to_msgpack )
150+ send_data ( tag , record )
148151 end
149152 rescue => e
150153 set_last_error ( e )
@@ -232,7 +235,7 @@ def write(tag, time, map)
232235
233236 begin
234237 @pending . each do |tag , record |
235- send_data ( [ tag , record ] . to_msgpack )
238+ send_data ( tag , record )
236239 end
237240 @pending = nil
238241 true
@@ -250,11 +253,17 @@ def write(tag, time, map)
250253 }
251254 end
252255
253- def send_data ( data )
256+ def send_data ( tag , record )
254257 unless connect?
255258 connect!
256259 end
257- @con . write data
260+ if @require_ack_response
261+ option = { }
262+ option [ 'chunk' ] = generate_chunk
263+ @con . write [ tag , record , option ] . to_msgpack
264+ else
265+ @con . write [ tag , record ] . to_msgpack
266+ end
258267 #while true
259268 # puts "sending #{data.length} bytes"
260269 # if data.length > 32*1024
@@ -269,6 +278,21 @@ def send_data(data)
269278 # data = data[n..-1]
270279 #end
271280
281+ if @require_ack_response && @ack_response_timeout > 0
282+ if IO . select ( [ @con ] , nil , nil , @ack_response_timeout )
283+ raw_data = @con . recv ( 1024 )
284+
285+ if raw_data . empty?
286+ raise "Closed connection"
287+ else
288+ response = MessagePack . unpack ( raw_data )
289+ if response [ 'ack' ] != option [ 'chunk' ]
290+ raise "ack in response and chunk id in sent data are different"
291+ end
292+ end
293+ end
294+ end
295+
272296 true
273297 end
274298
@@ -307,6 +331,10 @@ def set_last_error(e)
307331 # TODO: Check non GVL env
308332 @last_error [ Thread . current . object_id ] = e
309333 end
334+
335+ def generate_chunk
336+ Base64 . encode64 ( ( [ SecureRandom . random_number ( 1 << 32 ) ] * 4 ) . pack ( 'NNNN' ) ) . chomp
337+ end
310338 end
311339 end
312340end
0 commit comments