Skip to content

Commit 5ab9f95

Browse files
authored
Merge pull request #764 from HeerakKashyap/feat/move-channel-broker-to-meshkit
feat: add channel broker implementation
2 parents 188cf7b + 1534cc8 commit 5ab9f95

File tree

4 files changed

+654
-0
lines changed

4 files changed

+654
-0
lines changed

broker/channel/channel.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package channel
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/meshery/meshkit/broker"
10+
"github.com/meshery/meshkit/logger"
11+
"github.com/meshery/meshkit/utils"
12+
)
13+
14+
type ChannelBrokerHandler struct {
15+
Options
16+
name string
17+
// this structure represents [subject] => [queue] => channel
18+
// so there is a channel per queue per subject
19+
storage map[string]map[string]chan *broker.Message
20+
mu sync.RWMutex // protects storage map from concurrent access
21+
log logger.Handler
22+
}
23+
24+
func NewChannelBrokerHandler(optsSetters ...OptionsSetter) *ChannelBrokerHandler {
25+
options := DefaultOptions
26+
for _, setOptions := range optsSetters {
27+
if setOptions != nil {
28+
setOptions(&options)
29+
}
30+
}
31+
32+
// Use provided logger or create a default one
33+
log := options.Logger
34+
if log == nil {
35+
var err error
36+
log, err = logger.New("channel-broker", logger.Options{
37+
Format: logger.TerminalLogFormat,
38+
LogLevel: 4, // Info level
39+
})
40+
if err != nil {
41+
// Fallback to a simple logger if creation fails
42+
log = nil
43+
}
44+
}
45+
46+
return &ChannelBrokerHandler{
47+
name: fmt.Sprintf(
48+
"channel-broker-handler--%s",
49+
uuid.New().String(),
50+
),
51+
Options: options,
52+
storage: make(map[string]map[string]chan *broker.Message),
53+
log: log,
54+
}
55+
}
56+
57+
func (h *ChannelBrokerHandler) ConnectedEndpoints() (endpoints []string) {
58+
// return subjects::queue list intead of connection endpoints
59+
h.mu.RLock()
60+
defer h.mu.RUnlock()
61+
62+
list := make([]string, 0, len(h.storage))
63+
for subject, qstorage := range h.storage {
64+
if qstorage == nil {
65+
continue
66+
}
67+
for queue := range qstorage {
68+
list = append(
69+
list,
70+
fmt.Sprintf(
71+
"%s::%s",
72+
subject,
73+
queue,
74+
),
75+
)
76+
}
77+
78+
}
79+
return list
80+
}
81+
82+
func (h *ChannelBrokerHandler) Info() string {
83+
// return name because nats implementation returns name
84+
return h.name
85+
}
86+
87+
func (h *ChannelBrokerHandler) CloseConnection() {
88+
h.mu.Lock()
89+
defer h.mu.Unlock()
90+
91+
for subject, qstorage := range h.storage {
92+
for queue, ch := range qstorage {
93+
if !utils.IsClosed(ch) {
94+
close(ch)
95+
}
96+
delete(qstorage, queue)
97+
}
98+
delete(h.storage, subject)
99+
}
100+
}
101+
102+
// Publish - to publish messages
103+
func (h *ChannelBrokerHandler) Publish(subject string, message *broker.Message) error {
104+
h.mu.RLock()
105+
defer h.mu.RUnlock()
106+
107+
if len(h.storage[subject]) <= 0 {
108+
// nobody is listening => not publishing
109+
return nil
110+
}
111+
112+
var successList []string
113+
var failedList []string
114+
115+
for queue, ch := range h.storage[subject] {
116+
select {
117+
case ch <- message:
118+
successList = append(successList, queue)
119+
case <-time.After(h.PublishToChannelDelay):
120+
failedList = append(failedList, queue)
121+
}
122+
}
123+
124+
if len(failedList) > 0 {
125+
return NewErrChannelBrokerPublish(
126+
fmt.Errorf("failed to publish to one or more queue for subject %s", subject),
127+
successList,
128+
failedList,
129+
)
130+
}
131+
132+
return nil
133+
}
134+
135+
// PublishWithChannel - to publish messages with channel
136+
func (h *ChannelBrokerHandler) PublishWithChannel(subject string, msgch chan *broker.Message) error {
137+
go func() {
138+
// as soon as this channel will be closed, for loop will end
139+
for msg := range msgch {
140+
if err := h.Publish(subject, msg); err != nil {
141+
// Use proper logger if available, otherwise fallback to fmt.Printf
142+
if h.log != nil {
143+
h.log.Error(err)
144+
} else {
145+
fmt.Printf("Error publishing message to subject %s: %v\n", subject, err)
146+
}
147+
}
148+
}
149+
}()
150+
return nil
151+
}
152+
153+
// Subscribe - for subscribing messages
154+
func (h *ChannelBrokerHandler) Subscribe(subject, queue string, message []byte) error {
155+
// Looks like current version with nats does not seem to be correct
156+
// it adds callback which is executed on every message
157+
// and if queue is populated with messages it keeps waiting, in the end it returns the last one;
158+
// it does not unsubscribe and callback will keep executing and write to local variable, which probably will cause panic;
159+
160+
// Not supported
161+
162+
return nil
163+
}
164+
165+
// SubscribeWithChannel will publish all the messages received to the given channel
166+
func (h *ChannelBrokerHandler) SubscribeWithChannel(subject, queue string, msgch chan *broker.Message) error {
167+
h.mu.Lock()
168+
defer h.mu.Unlock()
169+
170+
if h.storage[subject] == nil {
171+
h.storage[subject] = make(map[string]chan *broker.Message)
172+
}
173+
174+
if h.storage[subject][queue] == nil {
175+
h.storage[subject][queue] = make(chan *broker.Message, h.SingleChannelBufferSize)
176+
}
177+
178+
go func() {
179+
// this loop will terminate when the h.storage[subject][queue] is closed
180+
for message := range h.storage[subject][queue] {
181+
// this flow is correct as if we have more than one consumer for one queue
182+
// only one will receive the message
183+
msgch <- message
184+
}
185+
}()
186+
187+
return nil
188+
}
189+
190+
// DeepCopyInto is a deepcopy function, copying the receiver, writing into out. in must be non-nil.
191+
func (h *ChannelBrokerHandler) DeepCopyInto(out broker.Handler) {
192+
// Not supported: deep copy is not implemented for ChannelBrokerHandler
193+
// This method is required by the broker.Handler interface but not used in practice
194+
// Any modification to the "copied" object will affect the original
195+
}
196+
197+
// DeepCopy is a deepcopy function, copying the receiver, creating a new ChannelBrokerHandler.
198+
func (h *ChannelBrokerHandler) DeepCopy() broker.Handler {
199+
// Not supported: returns the original instance, not a copy
200+
// This method is required by the broker.Handler interface but not used in practice
201+
// Any modification to the "copied" object will affect the original
202+
return h
203+
}
204+
205+
// DeepCopyObject is a deepcopy function, copying the receiver, creating a new broker.Handler.
206+
func (h *ChannelBrokerHandler) DeepCopyObject() broker.Handler {
207+
// Not supported: returns the original instance, not a copy
208+
// This method is required by the broker.Handler interface but not used in practice
209+
// Any modification to the "copied" object will affect the original
210+
return h
211+
}
212+
213+
// Check if the connection object is empty
214+
func (h *ChannelBrokerHandler) IsEmpty() bool {
215+
h.mu.RLock()
216+
defer h.mu.RUnlock()
217+
return len(h.storage) <= 0
218+
}

0 commit comments

Comments
 (0)