Skip to content

Commit a82fa1c

Browse files
committed
notif with always nats
1 parent d7253e0 commit a82fa1c

File tree

5 files changed

+22
-57
lines changed

5 files changed

+22
-57
lines changed

client/events/EventClient.go

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package client
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"encoding/json"
2322
"errors"
@@ -37,13 +36,9 @@ import (
3736
)
3837

3938
type EventClientConfig struct {
40-
DestinationURL string `env:"EVENT_URL" envDefault:"http://localhost:3000/notify" description:"Notifier service url"`
41-
NotificationMedium NotificationMedium `env:"NOTIFICATION_MEDIUM" envDefault:"rest" description:"notification medium"`
42-
EnableNotifierV2 bool `env:"ENABLE_NOTIFIER_V2" envDefault:"false" description:"enable notifier v2"`
39+
DestinationURL string `env:"EVENT_URL" envDefault:"http://localhost:3000/notify" description:"Notifier service url"`
40+
EnableNotifierV2 bool `env:"ENABLE_NOTIFIER_V2" envDefault:"false" description:"enable notifier v2"`
4341
}
44-
type NotificationMedium string
45-
46-
const PUB_SUB NotificationMedium = "nats"
4742

4843
func GetEventClientConfig() (*EventClientConfig, error) {
4944
cfg := &EventClientConfig{}
@@ -234,6 +229,7 @@ func (impl *EventRESTClientImpl) sendEventsOnNats(body []byte) error {
234229
impl.logger.Errorw("err while publishing msg for testing topic", "msg", body, "err", err)
235230
return err
236231
}
232+
impl.logger.Debugw("event successfully delivered via NATS")
237233
return nil
238234

239235
}
@@ -243,25 +239,27 @@ func (impl *EventRESTClientImpl) sendEvent(event Event) (bool, error) {
243239
impl.logger.Debugw("event before send", "event", event)
244240

245241
// Step 1: Create payload and destination URL based on config
246-
bodyBytes, destinationUrl, err := impl.createPayloadAndDestination(event)
242+
bodyBytes, err := impl.createPayloadAndDestination(event)
247243
if err != nil {
248244
return false, err
249245
}
250246

251-
// Step 2: Send via appropriate medium (NATS or REST)
252-
return impl.deliverEvent(bodyBytes, destinationUrl)
247+
// Step 2: Send via NATS
248+
if err = impl.sendEventsOnNats(bodyBytes); err != nil {
249+
return false, err
250+
}
251+
252+
return true, nil
253253
}
254254

255-
func (impl *EventRESTClientImpl) createPayloadAndDestination(event Event) ([]byte, string, error) {
255+
func (impl *EventRESTClientImpl) createPayloadAndDestination(event Event) ([]byte, error) {
256256
if impl.config.EnableNotifierV2 {
257257
return impl.createV2PayloadAndDestination(event)
258258
}
259259
return impl.createDefaultPayloadAndDestination(event)
260260
}
261261

262-
func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]byte, string, error) {
263-
destinationUrl := impl.config.DestinationURL + "/v2"
264-
262+
func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]byte, error) {
265263
// Fetch notification settings
266264
req := repository.GetRulesRequest{
267265
TeamId: event.TeamId,
@@ -278,13 +276,13 @@ func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]b
278276
)
279277
if err != nil {
280278
impl.logger.Errorw("error while fetching notification settings", "err", err)
281-
return nil, "", err
279+
return nil, err
282280
}
283281

284282
// Process notification settings into beans
285283
notificationSettingsBean, err := impl.processNotificationSettings(notificationSettings)
286284
if err != nil {
287-
return nil, "", err
285+
return nil, err
288286
}
289287

290288
// Create combined payload
@@ -296,19 +294,19 @@ func (impl *EventRESTClientImpl) createV2PayloadAndDestination(event Event) ([]b
296294
bodyBytes, err := json.Marshal(combinedPayload)
297295
if err != nil {
298296
impl.logger.Errorw("error while marshaling combined event request", "err", err)
299-
return nil, "", err
297+
return nil, err
300298
}
301299

302-
return bodyBytes, destinationUrl, nil
300+
return bodyBytes, nil
303301
}
304302

305-
func (impl *EventRESTClientImpl) createDefaultPayloadAndDestination(event Event) ([]byte, string, error) {
303+
func (impl *EventRESTClientImpl) createDefaultPayloadAndDestination(event Event) ([]byte, error) {
306304
bodyBytes, err := json.Marshal(event)
307305
if err != nil {
308306
impl.logger.Errorw("error while marshaling event request", "err", err)
309-
return nil, "", err
307+
return nil, err
310308
}
311-
return bodyBytes, impl.config.DestinationURL, nil
309+
return bodyBytes, nil
312310
}
313311

314312
func (impl *EventRESTClientImpl) processNotificationSettings(notificationSettings []repository.NotificationSettings) ([]*repository.NotificationSettingsBean, error) {
@@ -336,38 +334,6 @@ func (impl *EventRESTClientImpl) processNotificationSettings(notificationSetting
336334
return notificationSettingsBean, nil
337335
}
338336

339-
func (impl *EventRESTClientImpl) deliverEvent(bodyBytes []byte, destinationUrl string) (bool, error) {
340-
if impl.config.NotificationMedium == PUB_SUB {
341-
if err := impl.sendEventsOnNats(bodyBytes); err != nil {
342-
impl.logger.Errorw("error while publishing event", "err", err)
343-
return false, err
344-
}
345-
return true, nil
346-
}
347-
348-
req, err := http.NewRequest(http.MethodPost, destinationUrl, bytes.NewBuffer(bodyBytes))
349-
if err != nil {
350-
impl.logger.Errorw("error while creating HTTP request", "err", err)
351-
return false, err
352-
}
353-
req.Header.Set("Content-Type", "application/json")
354-
355-
resp, err := impl.client.Do(req)
356-
if err != nil {
357-
impl.logger.Errorw("error while sending HTTP request", "err", err)
358-
return false, err
359-
}
360-
defer resp.Body.Close()
361-
362-
if resp.StatusCode >= 300 {
363-
impl.logger.Errorw("unexpected response from notifier", "status", resp.StatusCode)
364-
return false, fmt.Errorf("unexpected response code: %d", resp.StatusCode)
365-
}
366-
367-
impl.logger.Debugw("event successfully delivered", "status", resp.StatusCode)
368-
return true, nil
369-
}
370-
371337
func (impl *EventRESTClientImpl) WriteNatsEvent(topic string, payload interface{}) error {
372338
body, err := json.Marshal(payload)
373339
if err != nil {

client/events/event_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestSendEventsOnNats(t *testing.T) {
2323
logger: logger,
2424
pubsubClient: mockPubsubClient,
2525
client: client,
26-
config: &EventClientConfig{DestinationURL: "localhost:3000/notify", NotificationMedium: PUB_SUB},
26+
config: &EventClientConfig{DestinationURL: "localhost:3000/notify"},
2727
ciPipelineRepository: pipelineConfig.NewCiPipelineRepositoryImpl(db, logger, trans),
2828
pipelineRepository: pipelineConfig.NewPipelineRepositoryImpl(db, logger),
2929
attributesRepository: repository.NewAttributesRepositoryImpl(db),

env_gen.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

env_gen.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@
221221
| NATS_MSG_MAX_AGE | int |86400 | | | false |
222222
| NATS_MSG_PROCESSING_BATCH_SIZE | int |1 | | | false |
223223
| NATS_MSG_REPLICAS | int |0 | | | false |
224-
| NOTIFICATION_MEDIUM | NotificationMedium |rest | notification medium | | false |
225224
| OTEL_COLLECTOR_URL | string | | Opentelemetry URL | | false |
226225
| PARALLELISM_LIMIT_FOR_TAG_PROCESSING | int | | App manual sync job parallel tag processing count. | | false |
227226
| PG_EXPORT_PROM_METRICS | bool |true | | | false |

wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)