@@ -48,6 +48,7 @@ import (
4848type Module interface {
4949 protocol.Module
5050 GetName () string
51+ GetCluster () string
5152 GetGroupAllowlist () * regexp.Regexp
5253 GetGroupDenylist () * regexp.Regexp
5354 GetLogger () * zap.Logger
@@ -95,7 +96,7 @@ type Coordinator struct {
9596
9697// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
9798// is any error, it will panic with an appropriate message describing the problem.
98- func getModuleForClass (app * protocol.ApplicationContext , moduleName , className string , groupAllowlist , groupDenylist * regexp.Regexp , extras map [string ]string , templateOpen , templateClose * template.Template ) protocol.Module {
99+ func getModuleForClass (app * protocol.ApplicationContext , moduleName , className string , groupAllowlist , groupDenylist * regexp.Regexp , extras map [string ]string , templateOpen , templateClose * template.Template , cluster string ) protocol.Module {
99100 logger := app .Logger .With (
100101 zap .String ("type" , "module" ),
101102 zap .String ("coordinator" , "notifier" ),
@@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
113114 extras : extras ,
114115 templateOpen : templateOpen ,
115116 templateClose : templateClose ,
117+ cluster : cluster ,
116118 }
117119 case "email" :
118120 return & EmailNotifier {
@@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
123125 extras : extras ,
124126 templateOpen : templateOpen ,
125127 templateClose : templateClose ,
128+ cluster : cluster ,
126129 }
127130 case "null" :
128131 return & NullNotifier {
@@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
133136 extras : extras ,
134137 templateOpen : templateOpen ,
135138 templateClose : templateClose ,
139+ cluster : cluster ,
136140 }
137141 default :
138142 panic ("Unknown notifier className provided: " + className )
@@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() {
194198 groupAllowlist = re
195199 }
196200
201+ cluster := viper .GetString (configRoot + ".cluster" )
202+
197203 // Compile the denylist for the consumer groups to not notify for
198204 var groupDenylist * regexp.Regexp
199205 denylist := viper .GetString (configRoot + ".group-denylist" )
@@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() {
227233 templateClose = tmpl .Templates ()[0 ]
228234 }
229235
230- module := getModuleForClass (nc .App , name , viper .GetString (configRoot + ".class-name" ), groupAllowlist , groupDenylist , extras , templateOpen , templateClose )
236+ module := getModuleForClass (nc .App , name , viper .GetString (configRoot + ".class-name" ), groupAllowlist , groupDenylist , extras , templateOpen , templateClose , cluster )
231237 module .Configure (name , configRoot )
232238 nc .modules [name ] = module
233239 interval := viper .GetInt64 (configRoot + ".interval" )
@@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
436442 for _ , genericModule := range nc .modules {
437443 module := genericModule .(Module )
438444
445+ if module .GetCluster () != "" && response .Cluster != module .GetCluster () {
446+ continue
447+ }
439448 // No allowlist means everything passes
440449 groupAllowlist := module .GetGroupAllowlist ()
441450 groupDenylist := module .GetGroupDenylist ()
0 commit comments