From 21bac54ad2976be4390e67110689666c8d904aa1 Mon Sep 17 00:00:00 2001 From: Johnson C Date: Fri, 17 Dec 2021 11:43:55 +0800 Subject: [PATCH 1/4] [fix] etcd config source prefix issue (#2389) --- plugins/config/source/etcd/etcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/config/source/etcd/etcd.go b/plugins/config/source/etcd/etcd.go index 3366013dbf..0aaf2c5d41 100644 --- a/plugins/config/source/etcd/etcd.go +++ b/plugins/config/source/etcd/etcd.go @@ -29,7 +29,7 @@ func (c *etcd) Read() (*source.ChangeSet, error) { return nil, c.cerr } - rsp, err := c.client.Get(context.Background(), c.prefix, clientv3.WithPrefix()) + rsp, err := c.client.Get(context.Background(), c.prefix) if err != nil { return nil, err } From 5a2f37c7189591c90b0b39bb89760bf3a527b716 Mon Sep 17 00:00:00 2001 From: Johnson C Date: Wed, 23 Feb 2022 11:55:18 +0800 Subject: [PATCH 2/4] http transport data race issue (#2436) * [fix] #2431 http transport data race issue --- client/rpc_stream.go | 5 ++- transport/http_transport.go | 13 ++++-- transport/http_transport_test.go | 76 ++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/client/rpc_stream.go b/client/rpc_stream.go index a5b617c46b..16f0fa5342 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -74,10 +74,10 @@ func (r *rpcStream) Send(msg interface{}) error { func (r *rpcStream) Recv(msg interface{}) error { r.Lock() - defer r.Unlock() if r.isClosed() { r.err = errShutdown + r.Unlock() return errShutdown } @@ -89,9 +89,11 @@ func (r *rpcStream) Recv(msg interface{}) error { if err != nil { if err == io.EOF && !r.isClosed() { r.err = io.ErrUnexpectedEOF + r.Unlock() return io.ErrUnexpectedEOF } r.err = err + r.Unlock() return err } @@ -120,6 +122,7 @@ func (r *rpcStream) Recv(msg interface{}) error { } } + r.Unlock() return r.err } diff --git a/transport/http_transport.go b/transport/http_transport.go index 1063769cf5..449443f0e7 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -35,9 +35,10 @@ type httpTransportClient struct { sync.RWMutex // request must be stored for response processing - r chan *http.Request - bl []*http.Request - buff *bufio.Reader + r chan *http.Request + bl []*http.Request + buff *bufio.Reader + closed bool // local/remote ip local string @@ -138,7 +139,12 @@ func (h *httpTransportClient) Recv(m *Message) error { h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)) } + h.Lock() + if h.closed { + return io.EOF + } rsp, err := http.ReadResponse(h.buff, r) + h.Unlock() if err != nil { return err } @@ -174,6 +180,7 @@ func (h *httpTransportClient) Close() error { h.once.Do(func() { h.Lock() h.buff.Reset(nil) + h.closed = true h.Unlock() close(h.r) }) diff --git a/transport/http_transport_test.go b/transport/http_transport_test.go index 85bea2f6b8..eb9221678a 100644 --- a/transport/http_transport_test.go +++ b/transport/http_transport_test.go @@ -1,8 +1,10 @@ package transport import ( + "fmt" "io" "net" + "sync" "testing" "time" ) @@ -244,3 +246,77 @@ func TestHTTPTransportTimeout(t *testing.T) { <-done } + +func TestHTTPTransportCloseWhenRecv(t *testing.T) { + tr := NewHTTPTransport() + + l, err := tr.Listen("127.0.0.1:0") + if err != nil { + t.Errorf("Unexpected listen err: %v", err) + } + defer l.Close() + + fn := func(sock Socket) { + defer sock.Close() + + for { + var m Message + if err := sock.Recv(&m); err != nil { + return + } + if err := sock.Send(&m); err != nil { + return + } + } + } + + done := make(chan bool) + + go func() { + if err := l.Accept(fn); err != nil { + select { + case <-done: + default: + t.Errorf("Unexpected accept err: %v", err) + } + } + }() + + c, err := tr.Dial(l.Addr()) + if err != nil { + t.Errorf("Unexpected dial err: %v", err) + } + defer c.Close() + + m := Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + var rm Message + + if err := c.Recv(&rm); err != nil { + if err == io.EOF { + return + } + t.Errorf("Unexpected recv err: %v", err) + } + fmt.Println("aa") + } + }() + for i := 1; i < 3; i++ { + if err := c.Send(&m); err != nil { + t.Errorf("Unexpected send err: %v", err) + } + } + close(done) + + c.Close() + wg.Wait() +} From 735aee88b8d28a85c911b8b9f2ed31fa6ca3ed4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hunyadv=C3=A1ri=20P=C3=A9ter?= Date: Fri, 15 Jul 2022 11:49:11 +0200 Subject: [PATCH 3/4] [feature] Ability to close connection while receiving. Ability to send messages while receiving. Icreased r channel limit to 100 to more fluently communication. Do not dropp sent request if r channel is full. --- transport/http_transport.go | 44 ++++++++++++---- transport/http_transport_test.go | 86 +++++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 12 deletions(-) diff --git a/transport/http_transport.go b/transport/http_transport.go index f41c25ce7e..def8a49eed 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -103,14 +103,20 @@ func (h *httpTransportClient) Send(m *Message) error { Host: h.addr, } - h.Lock() - h.bl = append(h.bl, req) - select { - case h.r <- h.bl[0]: - h.bl = h.bl[1:] - default: + if !h.dialOpts.Stream { + h.Lock() + if h.closed { + h.Unlock() + return io.EOF + } + h.bl = append(h.bl, req) + select { + case h.r <- h.bl[0]: + h.bl = h.bl[1:] + default: + } + h.Unlock() } - h.Unlock() // set timeout if its greater than 0 if h.ht.opts.Timeout > time.Duration(0) { @@ -129,7 +135,14 @@ func (h *httpTransportClient) Recv(m *Message) error { if !h.dialOpts.Stream { rc, ok := <-h.r if !ok { - return io.EOF + h.Lock() + if len(h.bl) == 0 { + h.Unlock() + return io.EOF + } + rc = h.bl[0] + h.bl = h.bl[1:] + h.Unlock() } r = rc } @@ -178,6 +191,17 @@ func (h *httpTransportClient) Recv(m *Message) error { } func (h *httpTransportClient) Close() error { + if !h.dialOpts.Stream { + h.once.Do(func() { + h.Lock() + h.buff.Reset(nil) + h.closed = true + h.Unlock() + close(h.r) + }) + return h.conn.Close() + } + err := h.conn.Close() h.once.Do(func() { h.Lock() h.buff.Reset(nil) @@ -185,7 +209,7 @@ func (h *httpTransportClient) Close() error { h.Unlock() close(h.r) }) - return h.conn.Close() + return err } func (h *httpTransportSocket) Local() string { @@ -524,7 +548,7 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) { conn: conn, buff: bufio.NewReader(conn), dialOpts: dopts, - r: make(chan *http.Request, 1), + r: make(chan *http.Request, 100), local: conn.LocalAddr().String(), remote: conn.RemoteAddr().String(), }, nil diff --git a/transport/http_transport_test.go b/transport/http_transport_test.go index b08073ef37..debe5d905a 100644 --- a/transport/http_transport_test.go +++ b/transport/http_transport_test.go @@ -302,10 +302,8 @@ func TestHTTPTransportCloseWhenRecv(t *testing.T) { if err := c.Recv(&rm); err != nil { if err == io.EOF { - c.Recv(&rm) return } - t.Errorf("Unexpected recv err: %v", err) } } }() @@ -319,3 +317,87 @@ func TestHTTPTransportCloseWhenRecv(t *testing.T) { c.Close() wg.Wait() } + +func TestHTTPTransportMultipleSendWhenRecv(t *testing.T) { + tr := NewHTTPTransport() + + l, err := tr.Listen("127.0.0.1:0") + if err != nil { + t.Errorf("Unexpected listen err: %v", err) + } + defer l.Close() + + readyToSend := make(chan struct{}) + m := Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + wgSend := sync.WaitGroup{} + fn := func(sock Socket) { + defer sock.Close() + + for { + var mr Message + if err := sock.Recv(&mr); err != nil { + return + } + wgSend.Add(1) + go func() { + defer wgSend.Done() + <-readyToSend + if err := sock.Send(&m); err != nil { + return + } + }() + } + } + + done := make(chan bool) + + go func() { + if err := l.Accept(fn); err != nil { + select { + case <-done: + default: + t.Errorf("Unexpected accept err: %v", err) + } + } + }() + + c, err := tr.Dial(l.Addr(), WithStream()) + if err != nil { + t.Errorf("Unexpected dial err: %v", err) + } + defer c.Close() + + var wg sync.WaitGroup + wg.Add(1) + readyForRecv := make(chan struct{}) + go func() { + defer wg.Done() + close(readyForRecv) + for { + var rm Message + if err := c.Recv(&rm); err != nil { + if err == io.EOF { + return + } + } + } + }() + <-readyForRecv + for i := 0; i < 3; i++ { + if err := c.Send(&m); err != nil { + t.Errorf("Unexpected send err: %v", err) + } + } + close(readyToSend) + wgSend.Wait() + close(done) + + c.Close() + wg.Wait() +} From 563bf624277456045aa37f2bb3bcd579d6fdef0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hunyadv=C3=A1ri=20P=C3=A9ter?= Date: Wed, 4 Jun 2025 09:46:55 +0200 Subject: [PATCH 4/4] [feature] ability to use wildcard in topic name (factory-events.*.*) --- server/rpc_events.go | 98 +++++++++++++++++++++++--------------------- server/rpc_router.go | 27 ++++++++---- server/rpc_server.go | 2 +- server/server.go | 2 +- 4 files changed, 73 insertions(+), 56 deletions(-) diff --git a/server/rpc_events.go b/server/rpc_events.go index 04ac3f1553..a75af327cd 100644 --- a/server/rpc_events.go +++ b/server/rpc_events.go @@ -14,59 +14,65 @@ import ( // HandleEvent handles inbound messages to the service directly. // These events are a result of registering to the topic with the service name. // TODO: handle requests from an event. We won't send a response. -func (s *rpcServer) HandleEvent(e broker.Event) error { - // formatting horrible cruft - msg := e.Message() +func (s *rpcServer) HandleEvent(subscriber string) func(e broker.Event) error { + return func(e broker.Event) error { + // formatting horrible cruft + msg := e.Message() - if msg.Header == nil { - msg.Header = make(map[string]string) - } - - contentType, ok := msg.Header["Content-Type"] - if !ok || len(contentType) == 0 { - msg.Header["Content-Type"] = DefaultContentType - contentType = DefaultContentType - } + if msg.Header == nil { + msg.Header = make(map[string]string) + } - cf, err := s.newCodec(contentType) - if err != nil { - return err - } + contentType, ok := msg.Header["Content-Type"] + if !ok || len(contentType) == 0 { + msg.Header["Content-Type"] = DefaultContentType + contentType = DefaultContentType + } - header := make(map[string]string, len(msg.Header)) - for k, v := range msg.Header { - header[k] = v - } + cf, err := s.newCodec(contentType) + if err != nil { + return err + } - // create context - ctx := metadata.NewContext(context.Background(), header) - - // TODO: inspect message header for Micro-Service & Micro-Topic - rpcMsg := &rpcMessage{ - topic: msg.Header[headers.Message], - contentType: contentType, - payload: &raw.Frame{Data: msg.Body}, - codec: cf, - header: msg.Header, - body: msg.Body, - } + header := make(map[string]string, len(msg.Header)) + for k, v := range msg.Header { + header[k] = v + } - // if the router is present then execute it - r := Router(s.router) - if s.opts.Router != nil { - // create a wrapped function - handler := s.opts.Router.ProcessMessage + // create context + ctx := metadata.NewContext(context.Background(), header) + + // TODO: inspect message header for Micro-Service & Micro-Topic + rpcMsg := &rpcMessage{ + topic: msg.Header[headers.Message], + contentType: contentType, + payload: &raw.Frame{Data: msg.Body}, + codec: cf, + header: msg.Header, + body: msg.Body, + } - // execute the wrapper for it - for i := len(s.opts.SubWrappers); i > 0; i-- { - handler = s.opts.SubWrappers[i-1](handler) + // if the router is present then execute it + r := Router(s.router) + if s.opts.Router != nil { + // create a wrapped function + handler := func(ctx context.Context, msg Message) error { + return s.opts.Router.ProcessMessage(ctx, subscriber, msg) + } + + // execute the wrapper for it + for i := len(s.opts.SubWrappers); i > 0; i-- { + handler = s.opts.SubWrappers[i-1](handler) + } + + // set the router + r = rpcRouter{m: func(ctx context.Context, _ string, msg Message) error { + return handler(ctx, msg) + }} } - // set the router - r = rpcRouter{m: handler} + return r.ProcessMessage(ctx, subscriber, rpcMsg) } - - return r.ProcessMessage(ctx, rpcMsg) } func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { @@ -102,7 +108,7 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { // subscribeServer will subscribe the server to the topic with its own name. func (s *rpcServer) subscribeServer(config Options) error { if s.opts.Router != nil { - sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) + sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent(config.Name)) if err != nil { return err } @@ -131,7 +137,7 @@ func (s *rpcServer) reSubscribe(config Options) error { } config.Logger.Logf(log.InfoLevel, "Subscribing to topic: %s", sb.Topic()) - sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) + sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent(sb.Topic()), opts...) if err != nil { return err } diff --git a/server/rpc_router.go b/server/rpc_router.go index 7dcb83422b..98b5b2e4ac 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -81,11 +81,11 @@ type router struct { // rpcRouter encapsulates functions that become a Router. type rpcRouter struct { h func(context.Context, Request, interface{}) error - m func(context.Context, Message) error + m func(context.Context, string, Message) error } -func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error { - return r.m(ctx, msg) +func (r rpcRouter) ProcessMessage(ctx context.Context, subscriber string, msg Message) error { + return r.m(ctx, subscriber, msg) } func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error { @@ -188,7 +188,11 @@ func prepareMethod(method reflect.Method, logger log.Logger) *methodType { return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} } -func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error { +func (router *router) sendResponse(sending sync.Locker, + req *request, + reply interface{}, + cc codec.Writer, + last bool) error { msg := new(codec.Message) msg.Type = codec.Response resp := router.getResponse() @@ -205,7 +209,13 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte return err } -func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error { +func (s *service) call(ctx context.Context, + router *router, + sending *sync.Mutex, + mtype *methodType, + req *request, + argv, replyv reflect.Value, + cc codec.Writer) error { defer router.freeRequest(req) function := mtype.method.Func @@ -227,7 +237,8 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, if !mtype.stream { fn := func(ctx context.Context, req Request, rsp interface{}) error { - returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) + returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), + reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) // The return value for the method is an error. if err := returnValues[0].Interface(); err != nil { @@ -534,7 +545,7 @@ func (router *router) Subscribe(s Subscriber) error { return nil } -func (router *router) ProcessMessage(ctx context.Context, msg Message) (err error) { +func (router *router) ProcessMessage(ctx context.Context, subscriber string, msg Message) (err error) { defer func() { // recover any panics if r := recover(); r != nil { @@ -546,7 +557,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro // get the subscribers by topic router.su.RLock() - subs, ok := router.subscribers[msg.Topic()] + subs, ok := router.subscribers[subscriber] router.su.RUnlock() if !ok { log.Warnf("Subscriber not found for topic %s", msg.Topic()) diff --git a/server/rpc_server.go b/server/rpc_server.go index 4d4040cabf..62503abedb 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -167,7 +167,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // Process the event ev := newEvent(msg) - if err := s.HandleEvent(ev); err != nil { + if err := s.HandleEvent(ev.Topic())(ev); err != nil { msg.Header[headers.Error] = err.Error() logger.Logf(log.ErrorLevel, "failed to handle event: %v", err) } diff --git a/server/server.go b/server/server.go index 9c7e91074d..551b2642ab 100644 --- a/server/server.go +++ b/server/server.go @@ -40,7 +40,7 @@ type Server interface { // Router handle serving messages. type Router interface { // ProcessMessage processes a message - ProcessMessage(context.Context, Message) error + ProcessMessage(context.Context, string, Message) error // ServeRequest processes a request to completion ServeRequest(context.Context, Request, Response) error }