Skip to content

Commit 68b908b

Browse files
committed
Revert "notif with always nats"
This reverts commit a82fa1c.
1 parent a82fa1c commit 68b908b

File tree

5 files changed

+57
-22
lines changed

5 files changed

+57
-22
lines changed

client/events/EventClient.go

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package client
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/json"
2223
"errors"
@@ -36,9 +37,13 @@ import (
3637
)
3738

3839
type EventClientConfig struct {
39-
DestinationURL string `env:"EVENT_URL" envDefault:"http://localhost:3000/notify" description:"Notifier service url"`
40-
EnableNotifierV2 bool `env:"ENABLE_NOTIFIER_V2" envDefault:"false" description:"enable notifier v2"`
40+
DestinationURL string `env:"EVENT_URL" envDefault:"http://localhost:3000/notify" description:"Notifier service url"`
41+
NotificationMedium NotificationMedium `env:"NOTIFICATION_MEDIUM" envDefault:"rest" description:"notification medium"`
42+
EnableNotifierV2 bool `env:"ENABLE_NOTIFIER_V2" envDefault:"false" description:"enable notifier v2"`
4143
}
44+
type NotificationMedium string
45+
46+
const PUB_SUB NotificationMedium = "nats"
4247

4348
func GetEventClientConfig() (*EventClientConfig, error) {
4449
cfg := &EventClientConfig{}
@@ -229,7 +234,6 @@ func (impl *EventRESTClientImpl) sendEventsOnNats(body []byte) error {
229234
impl.logger.Errorw("err while publishing msg for testing topic", "msg", body, "err", err)
230235
return err
231236
}
232-
impl.logger.Debugw("event successfully delivered via NATS")
233237
return nil
234238

235239
}
@@ -239,27 +243,25 @@ func (impl *EventRESTClientImpl) sendEvent(event Event) (bool, error) {
239243
impl.logger.Debugw("event before send", "event", event)
240244

241245
// Step 1: Create payload and destination URL based on config
242-
bodyBytes, err := impl.createPayloadAndDestination(event)
246+
bodyBytes, destinationUrl, err := impl.createPayloadAndDestination(event)
243247
if err != nil {
244248
return false, err
245249
}
246250

247-
// Step 2: Send via NATS
248-
if err = impl.sendEventsOnNats(bodyBytes); err != nil {
249-
return false, err
250-
}
251-
252-
return true, nil
251+
// Step 2: Send via appropriate medium (NATS or REST)
252+
return impl.deliverEvent(bodyBytes, destinationUrl)
253253
}
254254

255-
func (impl *EventRESTClientImpl) createPayloadAndDestination(event Event) ([]byte, error) {
255+
func (impl *EventRESTClientImpl) createPayloadAndDestination(event Event) ([]byte, string, error) {
256256
if impl.config.EnableNotifierV2 {
257257
return impl.createV2PayloadAndDestination(event)
258258
}
259259
return impl.createDefaultPayloadAndDestination(event)
260260
}
261261

262-
func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]byte, error) {
262+
func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]byte, string, error) {
263+
destinationUrl := impl.config.DestinationURL + "/v2"
264+
263265
// Fetch notification settings
264266
req := repository.GetRulesRequest{
265267
TeamId: event.TeamId,
@@ -276,13 +278,13 @@ func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]b
276278
)
277279
if err != nil {
278280
impl.logger.Errorw("error while fetching notification settings", "err", err)
279-
return nil, err
281+
return nil, "", err
280282
}
281283

282284
// Process notification settings into beans
283285
notificationSettingsBean, err := impl.processNotificationSettings(notificationSettings)
284286
if err != nil {
285-
return nil, err
287+
return nil, "", err
286288
}
287289

288290
// Create combined payload
@@ -294,19 +296,19 @@ func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]b
294296
bodyBytes, err := json.Marshal(combinedPayload)
295297
if err != nil {
296298
impl.logger.Errorw("error while marshaling combined event request", "err", err)
297-
return nil, err
299+
return nil, "", err
298300
}
299301

300-
return bodyBytes, nil
302+
return bodyBytes, destinationUrl, nil
301303
}
302304

303-
func (impl *EventRESTClientImpl) createDefaultPayloadAndDestination(event Event) ([]byte, error) {
305+
func (impl *EventRESTClientImpl) createDefaultPayloadAndDestination(event Event) ([]byte, string, error) {
304306
bodyBytes, err := json.Marshal(event)
305307
if err != nil {
306308
impl.logger.Errorw("error while marshaling event request", "err", err)
307-
return nil, err
309+
return nil, "", err
308310
}
309-
return bodyBytes, nil
311+
return bodyBytes, impl.config.DestinationURL, nil
310312
}
311313

312314
func (impl *EventRESTClientImpl) processNotificationSettings(notificationSettings []repository.NotificationSettings) ([]*repository.NotificationSettingsBean, error) {
@@ -334,6 +336,38 @@ func (impl *EventRESTClientImpl) processNotificationSettings(notificationSetting
334336
return notificationSettingsBean, nil
335337
}
336338

339+
func (impl *EventRESTClientImpl) deliverEvent(bodyBytes []byte, destinationUrl string) (bool, error) {
340+
if impl.config.NotificationMedium == PUB_SUB {
341+
if err := impl.sendEventsOnNats(bodyBytes); err != nil {
342+
impl.logger.Errorw("error while publishing event", "err", err)
343+
return false, err
344+
}
345+
return true, nil
346+
}
347+
348+
req, err := http.NewRequest(http.MethodPost, destinationUrl, bytes.NewBuffer(bodyBytes))
349+
if err != nil {
350+
impl.logger.Errorw("error while creating HTTP request", "err", err)
351+
return false, err
352+
}
353+
req.Header.Set("Content-Type", "application/json")
354+
355+
resp, err := impl.client.Do(req)
356+
if err != nil {
357+
impl.logger.Errorw("error while sending HTTP request", "err", err)
358+
return false, err
359+
}
360+
defer resp.Body.Close()
361+
362+
if resp.StatusCode >= 300 {
363+
impl.logger.Errorw("unexpected response from notifier", "status", resp.StatusCode)
364+
return false, fmt.Errorf("unexpected response code: %d", resp.StatusCode)
365+
}
366+
367+
impl.logger.Debugw("event successfully delivered", "status", resp.StatusCode)
368+
return true, nil
369+
}
370+
337371
func (impl *EventRESTClientImpl) WriteNatsEvent(topic string, payload interface{}) error {
338372
body, err := json.Marshal(payload)
339373
if err != nil {

client/events/event_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestSendEventsOnNats(t *testing.T) {
2323
logger: logger,
2424
pubsubClient: mockPubsubClient,
2525
client: client,
26-
config: &EventClientConfig{DestinationURL: "localhost:3000/notify"},
26+
config: &EventClientConfig{DestinationURL: "localhost:3000/notify", NotificationMedium: PUB_SUB},
2727
ciPipelineRepository: pipelineConfig.NewCiPipelineRepositoryImpl(db, logger, trans),
2828
pipelineRepository: pipelineConfig.NewPipelineRepositoryImpl(db, logger),
2929
attributesRepository: repository.NewAttributesRepositoryImpl(db),

env_gen.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

env_gen.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@
221221
| NATS_MSG_MAX_AGE | int |86400 | | | false |
222222
| NATS_MSG_PROCESSING_BATCH_SIZE | int |1 | | | false |
223223
| NATS_MSG_REPLICAS | int |0 | | | false |
224+
| NOTIFICATION_MEDIUM | NotificationMedium |rest | notification medium | | false |
224225
| OTEL_COLLECTOR_URL | string | | Opentelemetry URL | | false |
225226
| PARALLELISM_LIMIT_FOR_TAG_PROCESSING | int | | App manual sync job parallel tag processing count. | | false |
226227
| PG_EXPORT_PROM_METRICS | bool |true | | | false |

wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)