diff --git a/server/rpc_events.go b/server/rpc_events.go index 84a17fb3ba..ef7e2a48df 100644 --- a/server/rpc_events.go +++ b/server/rpc_events.go @@ -14,59 +14,65 @@ import ( // HandleEvent handles inbound messages to the service directly. // These events are a result of registering to the topic with the service name. // TODO: handle requests from an event. We won't send a response. -func (s *rpcServer) HandleEvent(e broker.Event) error { - // formatting horrible cruft - msg := e.Message() +func (s *rpcServer) HandleEvent(subscriber string) func(e broker.Event) error { + return func(e broker.Event) error { + // formatting horrible cruft + msg := e.Message() - if msg.Header == nil { - msg.Header = make(map[string]string) - } - - contentType, ok := msg.Header["Content-Type"] - if !ok || len(contentType) == 0 { - msg.Header["Content-Type"] = DefaultContentType - contentType = DefaultContentType - } + if msg.Header == nil { + msg.Header = make(map[string]string) + } - cf, err := s.newCodec(contentType) - if err != nil { - return err - } + contentType, ok := msg.Header["Content-Type"] + if !ok || len(contentType) == 0 { + msg.Header["Content-Type"] = DefaultContentType + contentType = DefaultContentType + } - header := make(map[string]string, len(msg.Header)) - for k, v := range msg.Header { - header[k] = v - } + cf, err := s.newCodec(contentType) + if err != nil { + return err + } - // create context - ctx := metadata.NewContext(context.Background(), header) - - // TODO: inspect message header for Micro-Service & Micro-Topic - rpcMsg := &rpcMessage{ - topic: msg.Header[headers.Message], - contentType: contentType, - payload: &raw.Frame{Data: msg.Body}, - codec: cf, - header: msg.Header, - body: msg.Body, - } + header := make(map[string]string, len(msg.Header)) + for k, v := range msg.Header { + header[k] = v + } - // if the router is present then execute it - r := Router(s.router) - if s.opts.Router != nil { - // create a wrapped function - handler := s.opts.Router.ProcessMessage + // create context + ctx := metadata.NewContext(context.Background(), header) + + // TODO: inspect message header for Micro-Service & Micro-Topic + rpcMsg := &rpcMessage{ + topic: msg.Header[headers.Message], + contentType: contentType, + payload: &raw.Frame{Data: msg.Body}, + codec: cf, + header: msg.Header, + body: msg.Body, + } - // execute the wrapper for it - for i := len(s.opts.SubWrappers); i > 0; i-- { - handler = s.opts.SubWrappers[i-1](handler) + // if the router is present then execute it + r := Router(s.router) + if s.opts.Router != nil { + // create a wrapped function + handler := func(ctx context.Context, msg Message) error { + return s.opts.Router.ProcessMessage(ctx, subscriber, msg) + } + + // execute the wrapper for it + for i := len(s.opts.SubWrappers); i > 0; i-- { + handler = s.opts.SubWrappers[i-1](handler) + } + + // set the router + r = rpcRouter{m: func(ctx context.Context, _ string, msg Message) error { + return handler(ctx, msg) + }} } - // set the router - r = rpcRouter{m: handler} + return r.ProcessMessage(ctx, subscriber, rpcMsg) } - - return r.ProcessMessage(ctx, rpcMsg) } func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { @@ -102,7 +108,7 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { // subscribeServer will subscribe the server to the topic with its own name. func (s *rpcServer) subscribeServer(config Options) error { if s.opts.Router != nil && s.subscriber == nil { - sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) + sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent(config.Name)) if err != nil { return err } @@ -134,7 +140,7 @@ func (s *rpcServer) reSubscribe(config Options) { } config.Logger.Logf(log.InfoLevel, "Subscribing to topic: %s", sb.Topic()) - sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) + sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent(sb.Topic()), opts...) if err != nil { config.Logger.Logf(log.WarnLevel, "Unable to subscribing to topic: %s, error: %s", sb.Topic(), err) continue diff --git a/server/rpc_router.go b/server/rpc_router.go index 7dcb83422b..98b5b2e4ac 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -81,11 +81,11 @@ type router struct { // rpcRouter encapsulates functions that become a Router. type rpcRouter struct { h func(context.Context, Request, interface{}) error - m func(context.Context, Message) error + m func(context.Context, string, Message) error } -func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error { - return r.m(ctx, msg) +func (r rpcRouter) ProcessMessage(ctx context.Context, subscriber string, msg Message) error { + return r.m(ctx, subscriber, msg) } func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error { @@ -188,7 +188,11 @@ func prepareMethod(method reflect.Method, logger log.Logger) *methodType { return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} } -func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error { +func (router *router) sendResponse(sending sync.Locker, + req *request, + reply interface{}, + cc codec.Writer, + last bool) error { msg := new(codec.Message) msg.Type = codec.Response resp := router.getResponse() @@ -205,7 +209,13 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte return err } -func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error { +func (s *service) call(ctx context.Context, + router *router, + sending *sync.Mutex, + mtype *methodType, + req *request, + argv, replyv reflect.Value, + cc codec.Writer) error { defer router.freeRequest(req) function := mtype.method.Func @@ -227,7 +237,8 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, if !mtype.stream { fn := func(ctx context.Context, req Request, rsp interface{}) error { - returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) + returnValues = function.Call([]reflect.Value{s.rcvr, mtype.prepareContext(ctx), + reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) // The return value for the method is an error. if err := returnValues[0].Interface(); err != nil { @@ -534,7 +545,7 @@ func (router *router) Subscribe(s Subscriber) error { return nil } -func (router *router) ProcessMessage(ctx context.Context, msg Message) (err error) { +func (router *router) ProcessMessage(ctx context.Context, subscriber string, msg Message) (err error) { defer func() { // recover any panics if r := recover(); r != nil { @@ -546,7 +557,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro // get the subscribers by topic router.su.RLock() - subs, ok := router.subscribers[msg.Topic()] + subs, ok := router.subscribers[subscriber] router.su.RUnlock() if !ok { log.Warnf("Subscriber not found for topic %s", msg.Topic()) diff --git a/server/rpc_server.go b/server/rpc_server.go index a2e7892f62..828cabca46 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -167,7 +167,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // Process the event ev := newEvent(msg) - if err := s.HandleEvent(ev); err != nil { + if err := s.HandleEvent(ev.Topic())(ev); err != nil { msg.Header[headers.Error] = err.Error() logger.Logf(log.ErrorLevel, "failed to handle event: %v", err) } diff --git a/server/server.go b/server/server.go index 9c7e91074d..551b2642ab 100644 --- a/server/server.go +++ b/server/server.go @@ -40,7 +40,7 @@ type Server interface { // Router handle serving messages. type Router interface { // ProcessMessage processes a message - ProcessMessage(context.Context, Message) error + ProcessMessage(context.Context, string, Message) error // ServeRequest processes a request to completion ServeRequest(context.Context, Request, Response) error }