Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/v1beta3/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ const (
PagerDutyProvider string = "pagerduty"
DataDogProvider string = "datadog"
NATSProvider string = "nats"
ZulipProvider string = "zulip"
)

// ProviderSpec defines the desired state of the Provider.
// +kubebuilder:validation:XValidation:rule="self.type == 'github' || self.type == 'gitlab' || self.type == 'gitea' || self.type == 'bitbucketserver' || self.type == 'bitbucket' || self.type == 'azuredevops' || !has(self.commitStatusExpr)", message="spec.commitStatusExpr is only supported for the 'github', 'gitlab', 'gitea', 'bitbucketserver', 'bitbucket', 'azuredevops' provider types"
type ProviderSpec struct {
// Type specifies which Provider implementation to use.
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog;nats
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog;nats;zulip
// +required
Type string `json:"type"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ spec:
- pagerduty
- datadog
- nats
- zulip
type: string
username:
description: Username specifies the name under which events are posted.
Expand Down
42 changes: 42 additions & 0 deletions docs/spec/v1beta3/providers.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ The supported alerting providers are:
| [Telegram](#telegram) | `telegram` |
| [WebEx](#webex) | `webex` |
| [NATS](#nats) | `nats` |
| [Zulip](#zulip) | `zulip` |

#### Types supporting Git commit status updates

Expand Down Expand Up @@ -1139,6 +1140,46 @@ stringData:
password: <NATS Password>
```

##### Zulip

When `.spec.type` is set to `zulip`, the controller will send a Zulip message
to the provided [Address](#address). The address is expected to be an HTTP/S URL
pointing to a Zulip server. The `POST /api/v1/messages` API will be used to
send the message. See the API docs [here](https://zulip.com/api/send-message).

The Zulip *channel* and *topic* must be specified in the [Channel](#channel) field,
separated by a forward slash (`/`). For example: `general/alerts`. In this case, the
message will be sent to the `general` channel, under the `alerts` topic.

The `zulip` Provider supports the configuration of an [HTTP/S proxy](#https-proxy),
a [certificate secret reference](#certificate-secret-reference) for configuring TLS,
and a [secret reference](#secret-reference) for HTTP Basic Authentication.

Example of a `zulip` Provider with HTTP Basic Authentication:

```yaml
apiVersion: notification.toolkit.fluxcd.io/v1beta3
kind: Provider
metadata:
name: zulip
namespace: default
spec:
type: zulip
address: https://my-org.zulipchat.com
channel: my-channel/my-topic
secretRef:
name: zulip-basic-auth
---
apiVersion: v1
kind: Secret
metadata:
name: zulip-basic-auth
namespace: default
stringData:
username: my-bot@my-org.zulipchat.com # the Zulip bot email address
password: F8KXuAylZOta3L5tjgVm3r1YVruUNGXu # the Zulip bot API key
```

### Address

`.spec.address` is an optional field that specifies the endpoint where the events are posted.
Expand Down Expand Up @@ -1300,6 +1341,7 @@ The following providers support client certificate authentication:
| `sentry` | Sentry |
| `slack` | Slack API |
| `webex` | Webex messages |
| `zulip` | Zulip API |

Support for client certificate authentication is being expanded to additional providers over time.

Expand Down
67 changes: 49 additions & 18 deletions internal/notifier/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,32 @@ import (
type postOptions struct {
proxy string
tlsConfig *tls.Config
contentType string
username string
password string
requestModifier func(*retryablehttp.Request)
responseValidator func(statusCode int, body []byte) error
responseValidator func(*http.Response) error
}

type postOption func(*postOptions)

func postMessage(ctx context.Context, address string, payload interface{}, opts ...postOption) error {
func postMessage(ctx context.Context, address string, payload any, opts ...postOption) error {
options := &postOptions{
// Default validateResponse function verifies that the response status code is 200, 202 or 201.
responseValidator: func(statusCode int, body []byte) error {
if statusCode == http.StatusOK ||
statusCode == http.StatusAccepted ||
statusCode == http.StatusCreated {
responseValidator: func(resp *http.Response) error {
s := resp.StatusCode
if 200 <= s && s < 300 {
return nil
}

return fmt.Errorf("request failed with status code %d, %s", statusCode, string(body))
err := fmt.Errorf("request failed with status code %d", s)

b, bodyErr := io.ReadAll(resp.Body)
if bodyErr != nil {
return fmt.Errorf("%w: unable to read response body: %w", err, bodyErr)
}

return fmt.Errorf("%w: %s", err, string(b))
},
}

Expand All @@ -61,17 +70,31 @@ func postMessage(ctx context.Context, address string, payload interface{}, opts
return err
}

data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
contentType := options.contentType
var data []byte
switch contentType {
case "":
contentType = "application/json"
var err error
data, err = json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
}
default:
data = payload.([]byte)
}

req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPost, address, data)
if err != nil {
return fmt.Errorf("failed to create a new request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Type", contentType)

if options.username != "" || options.password != "" {
req.SetBasicAuth(options.username, options.password)
}

if options.requestModifier != nil {
options.requestModifier(req)
}
Expand All @@ -82,12 +105,7 @@ func postMessage(ctx context.Context, address string, payload interface{}, opts
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}

if err := options.responseValidator(resp.StatusCode, body); err != nil {
if err := options.responseValidator(resp); err != nil {
return fmt.Errorf("request failed: %w", err)
}

Expand All @@ -106,13 +124,26 @@ func withTLSConfig(tlsConfig *tls.Config) postOption {
}
}

func withContentType(contentType string) postOption {
return func(opts *postOptions) {
opts.contentType = contentType
}
}

func withBasicAuth(username, password string) postOption {
return func(opts *postOptions) {
opts.username = username
opts.password = password
}
}

func withRequestModifier(reqModifier func(*retryablehttp.Request)) postOption {
return func(opts *postOptions) {
opts.requestModifier = reqModifier
}
}

func withResponseValidator(respValidator func(statusCode int, body []byte) error) postOption {
func withResponseValidator(respValidator func(resp *http.Response) error) postOption {
return func(opts *postOptions) {
opts.responseValidator = respValidator
}
Expand Down
44 changes: 43 additions & 1 deletion internal/notifier/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,44 @@ func Test_postSelfSignedCert(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
}

func Test_postMessage_contentType(t *testing.T) {
const contentType = "application/x-www-form-urlencoded"

g := NewWithT(t)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
g.Expect(r.Header.Get("Content-Type")).To(Equal(contentType))

err := r.ParseForm()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r.Form.Get("status")).To(Equal("success"))
}))
defer ts.Close()

err := postMessage(context.Background(), ts.URL, []byte("status=success"), withContentType(contentType))
g.Expect(err).ToNot(HaveOccurred())
}

func Test_postMessage_basicAuth(t *testing.T) {
g := NewWithT(t)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, ok := r.BasicAuth()
g.Expect(ok).To(BeTrue())
g.Expect(username).To(Equal("user"))
g.Expect(password).To(Equal("pass"))

b, err := io.ReadAll(r.Body)
g.Expect(err).ToNot(HaveOccurred())
var payload = make(map[string]string)
err = json.Unmarshal(b, &payload)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(payload["status"]).To(Equal("success"))
}))
defer ts.Close()

err := postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withBasicAuth("user", "pass"))
g.Expect(err).ToNot(HaveOccurred())
}

func Test_postMessage_requestModifier(t *testing.T) {
g := NewWithT(t)
ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
Expand All @@ -114,7 +152,11 @@ func Test_postMessage_responseValidator(t *testing.T) {
err := postMessage(context.Background(), ts.URL, map[string]string{"status": "success"})
g.Expect(err).ToNot(HaveOccurred())

err = postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withResponseValidator(func(_ int, body []byte) error {
err = postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withResponseValidator(func(resp *http.Response) error {
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if strings.HasPrefix(string(body), "error:") {
return errors.New(string(body))
}
Expand Down
5 changes: 5 additions & 0 deletions internal/notifier/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
apiv1.BitbucketServerProvider: bitbucketServerNotifierFunc,
apiv1.BitbucketProvider: bitbucketNotifierFunc,
apiv1.AzureDevOpsProvider: azureDevOpsNotifierFunc,
apiv1.ZulipProvider: zulipNotifierFunc,
}
)

Expand Down Expand Up @@ -355,3 +356,7 @@ func azureDevOpsNotifierFunc(opts notifierOptions) (Interface, error) {
opts.TLSConfig, opts.ProxyURL, opts.ServiceAccountName, opts.ProviderName,
opts.ProviderNamespace, opts.TokenClient, opts.TokenCache)
}

func zulipNotifierFunc(opts notifierOptions) (Interface, error) {
return NewZulip(opts.URL, opts.Channel, opts.ProxyURL, opts.TLSConfig, opts.Username, opts.Password)
}
8 changes: 4 additions & 4 deletions internal/notifier/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"

Expand Down Expand Up @@ -149,14 +150,13 @@ func (s *Slack) Post(ctx context.Context, event eventv1.Event) error {
//
// On the other hand, incoming webhooks return more expressive HTTP status codes.
// See https://api.slack.com/messaging/webhooks#handling_errors.
func validateSlackResponse(_ int, body []byte) error {
type slackResponse struct {
func validateSlackResponse(resp *http.Response) error {
var slackResp struct {
Ok bool `json:"ok"`
Error string `json:"error"`
}

slackResp := slackResponse{}
if err := json.Unmarshal(body, &slackResp); err != nil {
if err := json.NewDecoder(resp.Body).Decode(&slackResp); err != nil {
return fmt.Errorf("unable to unmarshal response body: %w", err)
}

Expand Down
15 changes: 9 additions & 6 deletions internal/notifier/slack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func TestSlack_PostUpdate(t *testing.T) {

func TestSlack_ValidateResponse(t *testing.T) {
g := NewWithT(t)
body := []byte(`{

resp := httptest.NewRecorder()
resp.Write([]byte(`{
"ok": true
}`)
err := validateSlackResponse(http.StatusOK, body)
}`))
err := validateSlackResponse(resp.Result())
g.Expect(err).ToNot(HaveOccurred())

body = []byte(`{
resp = httptest.NewRecorder()
resp.Write([]byte(`{
"ok": false,
"error": "too_many_attachments"
}`)
err = validateSlackResponse(http.StatusOK, body)
}`))
err = validateSlackResponse(resp.Result())
g.Expect(err).To(MatchError(ContainSubstring("Slack responded with error: too_many_attachments")))
}
Loading