diff --git a/.gitmodules b/.gitmodules index 5e7659f..eed115c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,6 @@ [submodule "ext/zeromq"] path = ext/zeromq - url = https://github.com/zeromq/zeromq4-x + url = https://github.com/zeromq/libzmq [submodule "ext/czmq"] path = ext/czmq url = https://github.com/zeromq/czmq diff --git a/ext/czmq b/ext/czmq index 3e6c11a..e4af2e7 160000 --- a/ext/czmq +++ b/ext/czmq @@ -1 +1 @@ -Subproject commit 3e6c11a7b75ef6f51a0eac895564306b88e5d24f +Subproject commit e4af2e7850ad0cdb74429bb57b6fc9bd63869045 diff --git a/ext/rbczmq/beacon.c b/ext/rbczmq/beacon.c index ef1d285..799d789 100644 --- a/ext/rbczmq/beacon.c +++ b/ext/rbczmq/beacon.c @@ -8,10 +8,15 @@ static VALUE rb_czmq_nogvl_beacon_destroy(void *ptr) { zmq_beacon_wrapper *beacon = ptr; + if (beacon->beacon) { - zbeacon_destroy(&beacon->beacon); + zactor_destroy(&beacon->beacon); beacon->beacon = NULL; } + + if (beacon->hostname) { + zstr_free(&beacon->hostname); + } return Qnil; } @@ -36,8 +41,16 @@ static void rb_czmq_free_beacon_gc(void *ptr) */ static VALUE rb_czmq_nogvl_new_beacon(void *ptr) { - int port = (int)ptr; - return (VALUE)zbeacon_new(port); + zmq_beacon_wrapper *beacon = ptr; + + beacon->beacon = zactor_new(zbeacon, NULL); + beacon->interval = 1000; + + zsock_send(beacon->beacon, "si", "CONFIGURE", beacon->port); + beacon->hostname = zstr_recv(beacon->beacon); + assert (beacon->hostname != NULL); + + return Qnil; } /* @@ -53,11 +66,12 @@ static VALUE rb_czmq_nogvl_new_beacon(void *ptr) static VALUE rb_czmq_beacon_s_new(VALUE beacon, VALUE port) { zmq_beacon_wrapper *bcn = NULL; - int prt; Check_Type(port, T_FIXNUM); beacon = Data_Make_Struct(rb_cZmqBeacon, zmq_beacon_wrapper, 0, rb_czmq_free_beacon_gc, bcn); - prt = FIX2INT(port); - bcn->beacon = (zbeacon_t*)rb_thread_call_without_gvl(rb_czmq_nogvl_new_beacon, (void *)prt, RUBY_UBF_IO, 0); + bcn->port = FIX2INT(port); + if (bcn->port == 0) + rb_raise(rb_eArgError, "port must not be zero!"); + rb_thread_call_without_gvl(rb_czmq_nogvl_new_beacon, (void *)bcn, RUBY_UBF_IO, 0); ZmqAssertObjOnAlloc(bcn->beacon, bcn); rb_obj_call_init(beacon, 0, NULL); return beacon; @@ -94,20 +108,7 @@ static VALUE rb_czmq_beacon_destroy(VALUE obj) static VALUE rb_czmq_beacon_hostname(VALUE obj) { GetZmqBeacon(obj); - return rb_str_new2(zbeacon_hostname(beacon->beacon)); -} - -/* - * :nodoc: - * Set the beacon broadcast interval while the GIL is released. - * -*/ -static VALUE rb_czmq_nogvl_set_interval(void *ptr) -{ - struct nogvl_beacon_interval_args *args = ptr; - zmq_beacon_wrapper *beacon = args->beacon; - zbeacon_set_interval(beacon->beacon, args->interval); - return Qnil; + return rb_str_new2(beacon->hostname); } /* @@ -125,21 +126,7 @@ static VALUE rb_czmq_beacon_set_interval(VALUE obj, VALUE interval) struct nogvl_beacon_interval_args args; GetZmqBeacon(obj); Check_Type(interval, T_FIXNUM); - args.beacon = beacon; - args.interval = FIX2INT(interval); - rb_thread_call_without_gvl(rb_czmq_nogvl_set_interval, (void *)&args, RUBY_UBF_IO, 0); - return Qnil; -} - -/* - * :nodoc: - * Filter beacons while the GIL is released. - * -*/ -static VALUE rb_czmq_nogvl_noecho(void *ptr) -{ - zmq_beacon_wrapper *beacon = ptr; - zbeacon_noecho(beacon->beacon); + beacon->interval = FIX2INT(interval); return Qnil; } @@ -156,7 +143,7 @@ static VALUE rb_czmq_nogvl_noecho(void *ptr) static VALUE rb_czmq_beacon_noecho(VALUE obj) { GetZmqBeacon(obj); - rb_thread_call_without_gvl(rb_czmq_nogvl_noecho, (void *)beacon, RUBY_UBF_IO, 0); + /* XXX: Nothing to do here, as v3 always filters out our own message */ return Qnil; } @@ -169,7 +156,8 @@ static VALUE rb_czmq_nogvl_publish(void *ptr) { struct nogvl_beacon_publish_args *args = ptr; zmq_beacon_wrapper *beacon = args->beacon; - zbeacon_publish(beacon->beacon, (byte *)args->transmit ,args->length); + zsock_send(beacon->beacon, "sbi", "PUBLISH", (byte *)args->transmit, + args->length, beacon->interval); return Qnil; } @@ -203,7 +191,7 @@ static VALUE rb_czmq_beacon_publish(VALUE obj, VALUE transmit) static VALUE rb_czmq_nogvl_silence(void *ptr) { zmq_beacon_wrapper *beacon = ptr; - zbeacon_silence(beacon->beacon); + zstr_sendx(beacon->beacon, "SILENCE", NULL); return Qnil; } @@ -233,7 +221,8 @@ static VALUE rb_czmq_nogvl_subscribe(void *ptr) { struct nogvl_beacon_subscribe_args *args = ptr; zmq_beacon_wrapper *beacon = args->beacon; - zbeacon_subscribe(beacon->beacon, (byte *)args->filter ,args->length); + zsock_send(beacon->beacon, "sb", "SUBSCRIBE", (byte *)args->filter, + args->length); return Qnil; } @@ -272,7 +261,7 @@ static VALUE rb_czmq_beacon_subscribe(VALUE obj, VALUE filter) static VALUE rb_czmq_nogvl_unsubscribe(void *ptr) { zmq_beacon_wrapper *beacon = ptr; - zbeacon_unsubscribe(beacon->beacon); + zstr_sendx(beacon->beacon, "UNSUBSCRIBE", NULL); return Qnil; } @@ -298,7 +287,7 @@ static VALUE rb_czmq_beacon_pipe(VALUE obj) zmq_sock_wrapper *sock = NULL; VALUE socket; GetZmqBeacon(obj); - socket = rb_czmq_socket_alloc(Qnil, NULL, zbeacon_socket(beacon->beacon)); + socket = rb_czmq_socket_alloc(Qnil, NULL, zsock_resolve(beacon->beacon)); GetZmqSocket(socket); sock->state = ZMQ_SOCKET_BOUND; return socket; diff --git a/ext/rbczmq/beacon.h b/ext/rbczmq/beacon.h index 6fc72d9..e9ec4df 100644 --- a/ext/rbczmq/beacon.h +++ b/ext/rbczmq/beacon.h @@ -2,7 +2,10 @@ #define RBCZMQ_BEACON_H typedef struct { - zbeacon_t *beacon; + zactor_t *beacon; + char *hostname; + int interval; + int port; } zmq_beacon_wrapper; #define ZmqAssertBeacon(obj) ZmqAssertType(obj, rb_cZmqBeacon, "ZMQ::Beacon") diff --git a/ext/rbczmq/extconf.rb b/ext/rbczmq/extconf.rb index 14ac131..5d1c6e1 100644 --- a/ext/rbczmq/extconf.rb +++ b/ext/rbczmq/extconf.rb @@ -87,6 +87,7 @@ def check_heads heads = [], fatal = false when /linux/ CZMQ_CFLAGS << "-fPIC" + CONFIG['LDSHARED'] = "$(CXX) -shared -lstdc++ -fPIC" else # on Unix we need a g++ link, not gcc. @@ -105,7 +106,7 @@ def check_heads heads = [], fatal = false lib = libs_path + "libsodium.#{LIBEXT}" Dir.chdir libsodium_path do sys "./autogen.sh", "libsodium autogen failed!" unless File.exist?(libsodium_path + 'configure') - sys "./configure --prefix=#{dst_path} --without-documentation --enable-shared", + sys "./configure CFLAGS='#{CZMQ_CFLAGS.join(" ")}' CXXFLAGS='#{CZMQ_CFLAGS.join(" ")}' --prefix=#{dst_path} --without-documentation --disable-shared --enable-static --disable-pie", "libsodium configure failed" unless File.exist?(libsodium_path + 'Makefile') sys "make && make install", "libsodium compile error!" end @@ -118,7 +119,7 @@ def check_heads heads = [], fatal = false lib = libs_path + "libzmq.#{LIBEXT}" Dir.chdir zmq_path do sys "./autogen.sh", "ZeroMQ autogen failed!" unless File.exist?(zmq_path + 'configure') - sys "./configure --prefix=#{dst_path} --without-documentation --enable-shared", + sys "./configure CFLAGS='#{CZMQ_CFLAGS.join(" ")}' CXXFLAGS='#{CZMQ_CFLAGS.join(" ")}' PKG_CONFIG_PATH='#{libs_path}/pkgconfig' --prefix=#{dst_path} --without-documentation --disable-shared --enable-static --with-libsodium=#{dst_path}", "ZeroMQ configure failed" unless File.exist?(zmq_path + 'Makefile') sys "make && make install", "ZeroMQ compile error!" end @@ -131,7 +132,7 @@ def check_heads heads = [], fatal = false lib = libs_path + "libczmq.#{LIBEXT}" Dir.chdir czmq_path do sys "./autogen.sh", "CZMQ autogen failed!" unless File.exist?(czmq_path + 'configure') - sys "./configure LDFLAGS=-L#{libs_path} CFLAGS='#{CZMQ_CFLAGS.join(" ")}' --prefix=#{dst_path} --with-libzmq=#{dst_path} --disable-shared", + sys "./configure LDFLAGS='-L#{libs_path} -lm' CFLAGS='#{CZMQ_CFLAGS.join(" ")}' PKG_CONFIG_PATH='#{libs_path}/pkgconfig' --prefix=#{dst_path} --disable-shared --enable-static --without-makecert --without-test_zgossip --with-libsodium=#{dst_path}", "CZMQ configure error!" unless File.exist?(czmq_path + 'Makefile') sys "make all && make install", "CZMQ compile error!" end @@ -143,7 +144,7 @@ def check_heads heads = [], fatal = false have_func('rb_thread_blocking_region') have_func('rb_thread_call_without_gvl') -$INCFLAGS << " -I#{libsodium_include_path}" if find_header("sodidum.h", libsodium_include_path) +$INCFLAGS << " -I#{libsodium_include_path}" if find_header("sodium.h", libsodium_include_path) $INCFLAGS << " -I#{zmq_include_path}" if find_header("zmq.h", zmq_include_path) $INCFLAGS << " -I#{czmq_include_path}" if find_header("czmq.h", czmq_include_path) @@ -154,13 +155,15 @@ def check_heads heads = [], fatal = false CONFIG['LDSHARED'] = "#{CONFIG['LDSHARED']} -Wl,-rpath=#{libs_path.to_s}" end +fail "Error compiling and linking libsodium" unless have_library("sodium") fail "Error compiling and linking libzmq" unless have_library("zmq") fail "Error compiling and linking libczmq" unless have_library("czmq") +fail "Error linking against libm" unless have_library("m") $defs << "-pedantic" $CFLAGS << ' -Wall -funroll-loops' $CFLAGS << ' -Wextra -O0 -ggdb3' if ENV['DEBUG'] -$LDFLAGS << " -Wl,-rpath,ext/rbczmq/dst/lib/" +$LDFLAGS << " -Wl,-rpath,'$$ORIGIN/dst/lib/'" create_makefile('rbczmq_ext') diff --git a/ext/rbczmq/rbczmq_ext.c b/ext/rbczmq/rbczmq_ext.c index ab842a6..4f5a3ef 100644 --- a/ext/rbczmq/rbczmq_ext.c +++ b/ext/rbczmq/rbczmq_ext.c @@ -113,7 +113,7 @@ static VALUE rb_czmq_m_now(ZMQ_UNUSED VALUE obj) static VALUE rb_czmq_m_log(ZMQ_UNUSED VALUE obj, VALUE msg) { Check_Type(msg, T_STRING); - zclock_log(StringValueCStr(msg)); + zclock_log("%s", StringValueCStr(msg)); return Qnil; } diff --git a/ext/rbczmq/socket.c b/ext/rbczmq/socket.c index 5d5a6b0..cdd8cfd 100644 --- a/ext/rbczmq/socket.c +++ b/ext/rbczmq/socket.c @@ -1737,6 +1737,39 @@ static VALUE rb_czmq_socket_opt_last_endpoint(VALUE obj) return result; } +/* + * call-seq: + * sock.stream_notify = false => nil + * + * Sets the socket stream_notify value. + * + * === Examples + * ctx = ZMQ::Context.new + * sock = ctx.socket(:STREAM) + * sock.stream_notify = false => nil + * +*/ + +static VALUE rb_czmq_socket_set_opt_stream_notify(VALUE obj, VALUE value) +{ + int rc, optval; + zmq_sock_wrapper *sock = NULL; + + GetZmqSocket(obj); + ZmqSockGuardCrossThread(sock); + CheckBoolean(value); + optval = (value == Qtrue) ? 1 : 0; + + rc = zmq_setsockopt(sock->socket, ZMQ_STREAM_NOTIFY, &optval, sizeof(optval)); + ZmqAssert(rc); + + if (sock->verbose) + zclock_log ("I: %s socket %p: set option \"STREAM_NOTIFY\" %d", + zsocket_type_str(sock->socket), (void *)obj, optval); + + return Qnil; +} + /* * :nodoc: * Receives a monitoring event message while the GIL is released. @@ -1974,4 +2007,5 @@ void _init_rb_czmq_socket() rb_define_method(rb_cZmqSocket, "sndtimeo=", rb_czmq_socket_set_opt_sndtimeo, 1); rb_define_method(rb_cZmqSocket, "monitor", rb_czmq_socket_monitor, -1); rb_define_method(rb_cZmqSocket, "last_endpoint", rb_czmq_socket_opt_last_endpoint, 0); + rb_define_method(rb_cZmqStreamSocket, "stream_notify=", rb_czmq_socket_set_opt_stream_notify, 1); } diff --git a/ext/rbczmq/socket.h b/ext/rbczmq/socket.h index 78760c7..b707ec4 100644 --- a/ext/rbczmq/socket.h +++ b/ext/rbczmq/socket.h @@ -156,4 +156,11 @@ extern VALUE intern_on_disconnected; void _init_rb_czmq_socket(); VALUE rb_czmq_nogvl_zsocket_destroy(void *ptr); +#if (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR >= 1) +typedef struct { + uint16_t event; // id of the event as bitfield + int32_t value; // value is either error code, fd or reconnect interval +} zmq_event_t; +#endif + #endif diff --git a/ext/zeromq b/ext/zeromq index ba8f58e..c557221 160000 --- a/ext/zeromq +++ b/ext/zeromq @@ -1 +1 @@ -Subproject commit ba8f58ecfcc2ca93384db8a8aad4b4063509873c +Subproject commit c5572211e7f3552f65dbc791061298dd1c92c3c8 diff --git a/rbczmq.gemspec b/rbczmq.gemspec index ab120ac..883da5f 100644 --- a/rbczmq.gemspec +++ b/rbczmq.gemspec @@ -34,4 +34,4 @@ Gem::Specification.new do |s| end end end -end \ No newline at end of file +end diff --git a/test/socket/test_stream_socket.rb b/test/socket/test_stream_socket.rb index 3b9caca..eac4512 100644 --- a/test/socket/test_stream_socket.rb +++ b/test/socket/test_stream_socket.rb @@ -23,6 +23,14 @@ def test_recv_and_send tcp.write("hello") Timeout.timeout(5) do + # The first message we receive is a connection message: + # The first frame is an identity frame. + # The second frame is an empty frame. + # see: https://github.com/zeromq/libzmq/pull/1487 + msg = sock.recv_message + assert_equal 2, msg.size + assert_equal "", msg.last.to_s + msg = sock.recv_message # Messages received from a STREAM socket are in two parts: @@ -46,24 +54,81 @@ def test_recv_and_send ctx.destroy end + def test_recv_no_notify + ctx = ZMQ::Context.new + sock = ctx.socket(:STREAM) + sock.stream_notify = false + port = sock.bind("tcp://127.0.0.1:*") + + tcp = TCPSocket.new('127.0.0.1', port) + tcp.write("hello") + + Timeout.timeout(5) do + msg = sock.recv_message + + # Messages received from a STREAM socket are in two parts: + # The first frame is an identiy frame. + # The second frame is the data received over the TCP socket. + assert_equal 2, msg.size + + # first frame is the identity frame. + identity = msg.pop + assert_equal "hello", msg.first.to_s + end + + ensure + tcp.close if tcp + ctx.destroy + end + + def test_notify_connect_disconnect + ctx = ZMQ::Context.new + sock = ctx.socket(:STREAM) + sock.stream_notify = true + port = sock.bind("tcp://127.0.0.1:*") + + tcp = TCPSocket.new('127.0.0.1', port) + + Timeout.timeout(5) do + # The first message we receive is a connection message: + # The first frame is an identity frame. + # The second frame is an empty frame. + # see: https://github.com/zeromq/libzmq/pull/1487 + msg = sock.recv_message + assert_equal 2, msg.size + assert_equal "", msg.last.to_s + + tcp.close + + msg = sock.recv_message + assert_equal 2, msg.size + assert_equal "", msg.last.to_s + end + + ensure + ctx.destroy + end + # This test should work, but may be a bug in zmq. leaving out for now: # def test_close_tcp_connection # ctx = ZMQ::Context.new # sock = ctx.socket(:STREAM) + # sock.stream_notify = false # port = sock.bind("tcp://127.0.0.1:*") # # tcp = TCPSocket.new('127.0.0.1', port) # tcp.write("hello") # # Timeout.timeout(5) do - # identity = sock.recv - # message = sock.recv + # msg = sock.recv_message + # identity = msg.pop # - # sock.sendm identity - # sock.send "" + # # Close the connection by sending a zero-length message. + # sock.sendm(identity) + # sock.send("") # # # receiving a zero length string is a TCP end of stream = closed normally. - # assert_equal "", tcp.recvfrom(100) + # assert_equal "", tcp.recv(500) # end # # ensure diff --git a/test/test_beacon.rb b/test/test_beacon.rb index 5011bb5..0d7a566 100644 --- a/test/test_beacon.rb +++ b/test/test_beacon.rb @@ -9,7 +9,7 @@ def setup end def test_beacon - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) assert_instance_of ZMQ::Beacon, beacon assert_nil beacon.destroy assert_raises TypeError do @@ -20,14 +20,14 @@ def test_beacon end def test_hostname - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) assert_instance_of String, beacon.hostname ensure beacon.destroy end def test_set_interval - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) beacon.interval = 100 assert_raises TypeError do beacon.interval = :invalid @@ -37,14 +37,14 @@ def test_set_interval end def test_noecho - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) assert_nil beacon.noecho ensure beacon.destroy end def test_publish - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) assert_raises TypeError do beacon.publish :invalid end @@ -55,7 +55,7 @@ def test_publish end def test_subscribe - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) assert_raises TypeError do beacon.subscribe :invalid end @@ -67,7 +67,7 @@ def test_subscribe def test_pipe GC.start - beacon = ZMQ::Beacon.new(0) + beacon = ZMQ::Beacon.new(5670) assert_instance_of ZMQ::Socket::Pair, beacon.pipe GC.start # check GC cycle with "detached" socket ensure diff --git a/test/test_loop.rb b/test/test_loop.rb index fc99b61..aa48cac 100644 --- a/test/test_loop.rb +++ b/test/test_loop.rb @@ -86,7 +86,7 @@ def test_cancel_timer ctx = ZMQ::Context.new fired = 0 ZMQ::Loop.run do - timer = ZL.add_timer(0.1, 5) do + timer = ZL.add_timer(0.5, 5) do fired += 1 fired < 5 end diff --git a/test/test_socket.rb b/test/test_socket.rb index bbb65c4..7f2e5f3 100644 --- a/test/test_socket.rb +++ b/test/test_socket.rb @@ -501,11 +501,13 @@ def test_sock_options xpub = ctx.socket(:XPUB) xpub.xpub_verbose = true - assert_equal 0, sock.sndbuf + # TODO: sndbuf/rcvbuf now return -1 before a setsockopt, as per LIBZMQ-195 + # assert_equal 0, sock.sndbuf sock.sndbuf = 1000 assert_equal 1000, sock.sndbuf - assert_equal 0, sock.rcvbuf + # TODO: sndbuf/rcvbuf now return -1 before a setsockopt, as per LIBZMQ-195 + # assert_equal 0, sock.rcvbuf sock.rcvbuf = 1000 assert_equal 1000, sock.rcvbuf