-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage.go
More file actions
37 lines (28 loc) · 865 Bytes
/
message.go
File metadata and controls
37 lines (28 loc) · 865 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package nsq
import (
"sync/atomic"
"github.com/nsqio/go-nsq"
)
var _ MessageHandleProc = StopRecursiveForwardUnhandledMessageHandler
func StopRecursiveForwardUnhandledMessageHandler(message *Message) error {
logger.Fatal("invalid forward; it might be recursive forward message to unhandledMessageHandler")
return nil
}
type Message struct {
*nsq.Message
Topic string
unhandledMessageHandler MessageHandleProc
unhandledMessageHandlerFlag int32
}
func (m *Message) ForwardUnhandledMessageHandler() error {
if m.unhandledMessageHandler != nil {
if atomic.CompareAndSwapInt32(&m.unhandledMessageHandlerFlag, 0, 1) {
return m.unhandledMessageHandler(m)
}
return StopRecursiveForwardUnhandledMessageHandler(m)
}
return nil
}
func (m *Message) StopForwardUnhandledMessageHandler() {
atomic.StoreInt32(&m.unhandledMessageHandlerFlag, 1)
}