@@ -264,8 +264,6 @@ defmodule Ch.FaultsTest do
264264 clickhouse: clickhouse ,
265265 query_options: query_options
266266 } do
267- test = self ( )
268-
269267 log =
270268 capture_async_log ( fn ->
271269 { :ok , conn } = Ch . start_link ( port: port , timeout: 100 )
@@ -277,10 +275,10 @@ defmodule Ch.FaultsTest do
277275 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
278276 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
279277
280- spawn_link ( fn ->
281- assert { :error , % Mint.TransportError { reason: :timeout } } =
282- Ch . query ( conn , "select 1 + 1" , [ ] , query_options )
283- end )
278+ select =
279+ Task . async ( fn ->
280+ Ch . query ( conn , "select 1 + 1" , [ ] , query_options )
281+ end )
284282
285283 # failed select 1 + 1
286284 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
@@ -293,18 +291,11 @@ defmodule Ch.FaultsTest do
293291 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
294292 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
295293
296- spawn_link ( fn ->
297- assert { :ok , % { num_rows: 1 , rows: [ [ 2 ] ] } } =
298- Ch . query ( conn , "select 1 + 1" , [ ] , query_options )
299-
300- send ( test , :done )
301- end )
302-
303294 # select 1 + 1
304295 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
305296 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
306297
307- assert_receive :done
298+ assert { :ok , % Ch.Result { rows: [ [ 2 ] ] } } = Task . await ( select )
308299 end )
309300
310301 assert log =~ "disconnected: ** (Mint.TransportError) timeout"
@@ -360,7 +351,6 @@ defmodule Ch.FaultsTest do
360351
361352 test "reconnects after Connection: close response from server" , ctx do
362353 % { port: port , listen: listen , clickhouse: clickhouse , query_options: query_options } = ctx
363- test = self ( )
364354
365355 log =
366356 capture_async_log ( fn ->
@@ -373,12 +363,10 @@ defmodule Ch.FaultsTest do
373363 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
374364 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
375365
376- spawn_link ( fn ->
377- assert { :ok , % { num_rows: 1 , rows: [ [ 2 ] ] } } =
378- Ch . query ( conn , "select 1 + 1" , [ ] , query_options )
379-
380- send ( test , :done )
381- end )
366+ select =
367+ Task . async ( fn ->
368+ Ch . query ( conn , "select 1 + 1" , [ ] , query_options )
369+ end )
382370
383371 # first select 1 + 1
384372 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
@@ -390,9 +378,12 @@ defmodule Ch.FaultsTest do
390378 "Connection: Close"
391379 )
392380
381+ assert response =~ "Connection: Close"
382+
393383 :ok = :gen_tcp . send ( mint , response )
394384 :ok = :gen_tcp . close ( mint )
395- assert_receive :done
385+
386+ assert { :ok , % Ch.Result { rows: [ [ 2 ] ] } } = Task . await ( select )
396387
397388 # reconnect
398389 { :ok , mint } = :gen_tcp . accept ( listen )
@@ -401,30 +392,26 @@ defmodule Ch.FaultsTest do
401392 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
402393 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
403394
404- spawn_link ( fn ->
405- assert { :ok , % { num_rows: 1 , rows: [ [ 2 ] ] } } =
406- Ch . query ( conn , "select 1 + 1" , [ ] , query_options )
395+ select =
396+ Task . async ( fn ->
397+ Ch . query ( conn , "select 2 + 2" , [ ] , query_options )
398+ end )
407399
408- send ( test , :done )
409- end )
410-
411- # select 1 + 1
400+ # select 2 + 2
412401 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
413402 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
414403
415- assert_receive :done
404+ assert { :ok , % Ch.Result { rows: [ [ 4 ] ] } } = Task . await ( select )
416405 end )
417406
418- refute log =~ "disconnected: **"
419- assert log =~ "connection was closed by the server"
407+ assert log =~ "disconnected: ** (Mint.HTTPError) the connection is closed"
420408 end
421409
422410 # TODO non-chunked request
423411
424412 test "reconnects after closed before streaming request" , ctx do
425413 % { port: port , listen: listen , clickhouse: clickhouse , query_options: query_options } = ctx
426414
427- test = self ( )
428415 rows = [ [ 1 , 2 ] , [ 3 , 4 ] ]
429416 stream = Stream . map ( rows , fn row -> Ch.RowBinary . encode_row ( row , [ :u8 , :u8 ] ) end )
430417
@@ -442,15 +429,15 @@ defmodule Ch.FaultsTest do
442429 # disconnect before insert
443430 :ok = :gen_tcp . close ( mint )
444431
445- spawn_link ( fn ->
446- assert { :error , % Mint.TransportError { reason: :closed } } =
447- Ch . query (
448- conn ,
449- "insert into unknown_table(a,b) format RowBinary" ,
450- stream ,
451- Keyword . merge ( query_options , encode: false )
452- )
453- end )
432+ insert =
433+ Task . async ( fn ->
434+ Ch . query (
435+ conn ,
436+ "insert into unknown_table(a,b) format RowBinary" ,
437+ stream ,
438+ Keyword . merge ( query_options , encode: false )
439+ )
440+ end )
454441
455442 # reconnect
456443 { :ok , mint } = :gen_tcp . accept ( listen )
@@ -459,25 +446,12 @@ defmodule Ch.FaultsTest do
459446 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
460447 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
461448
462- spawn_link ( fn ->
463- assert { :error , % Ch.Error { code: 60 , message: message } } =
464- Ch . query (
465- conn ,
466- "insert into unknown_table(a,b) format RowBinary" ,
467- stream ,
468- Keyword . merge ( query_options , encode: false )
469- )
470-
471- assert message =~ ~r/ UNKNOWN_TABLE/
472-
473- send ( test , :done )
474- end )
475-
476449 # insert
477450 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
478451 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
479452
480- assert_receive :done
453+ assert { :error , % Ch.Error { code: 60 , message: message } } = Task . await ( insert )
454+ assert message =~ ~r/ UNKNOWN_TABLE/
481455 end )
482456
483457 assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
@@ -486,7 +460,6 @@ defmodule Ch.FaultsTest do
486460 test "reconnects after closed while streaming request" , ctx do
487461 % { port: port , listen: listen , clickhouse: clickhouse , query_options: query_options } = ctx
488462
489- test = self ( )
490463 rows = [ [ 1 , 2 ] , [ 3 , 4 ] ]
491464 stream = Stream . map ( rows , fn row -> Ch.RowBinary . encode_row ( row , [ :u8 , :u8 ] ) end )
492465
@@ -501,15 +474,15 @@ defmodule Ch.FaultsTest do
501474 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
502475 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
503476
504- spawn_link ( fn ->
505- assert { :error , % Mint.TransportError { reason: :closed } } =
506- Ch . query (
507- conn ,
508- "insert into unknown_table(a,b) format RowBinary" ,
509- stream ,
510- Keyword . merge ( query_options , encode: false )
511- )
512- end )
477+ insert =
478+ Task . async ( fn ->
479+ Ch . query (
480+ conn ,
481+ "insert into unknown_table(a,b) format RowBinary" ,
482+ stream ,
483+ Keyword . merge ( query_options , encode: false )
484+ )
485+ end )
513486
514487 # close after first packet from mint arrives
515488 assert_receive { :tcp , ^ mint , _packet }
@@ -522,25 +495,12 @@ defmodule Ch.FaultsTest do
522495 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
523496 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
524497
525- spawn_link ( fn ->
526- assert { :error , % Ch.Error { code: 60 , message: message } } =
527- Ch . query (
528- conn ,
529- "insert into unknown_table(a,b) format RowBinary" ,
530- stream ,
531- Keyword . merge ( query_options , encode: false )
532- )
533-
534- assert message =~ ~r/ UNKNOWN_TABLE/
535-
536- send ( test , :done )
537- end )
538-
539498 # insert
540499 :ok = :gen_tcp . send ( clickhouse , intercept_packets ( mint ) )
541500 :ok = :gen_tcp . send ( mint , intercept_packets ( clickhouse ) )
542501
543- assert_receive :done
502+ assert { :error , % Ch.Error { code: 60 , message: message } } = Task . await ( insert )
503+ assert message =~ ~r/ UNKNOWN_TABLE/
544504 end )
545505
546506 assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
0 commit comments