Skip to content

Commit 6b57bce

Browse files
committed
fix: cassandra + mongo + arangodb issues with webhook
1 parent bfbeb6a commit 6b57bce

File tree

19 files changed

+167
-94
lines changed

19 files changed

+167
-94
lines changed

server/db/models/user.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,13 @@ func (user *User) AsAPIUser() *model.User {
3838
email := user.Email
3939
createdAt := user.CreatedAt
4040
updatedAt := user.UpdatedAt
41+
42+
id := user.ID
43+
if strings.Contains(id, Collections.WebhookLog+"/") {
44+
id = strings.TrimPrefix(id, Collections.WebhookLog+"/")
45+
}
4146
return &model.User{
42-
ID: user.ID,
47+
ID: id,
4348
Email: user.Email,
4449
EmailVerified: isEmailVerified,
4550
SignupMethods: user.SignupMethods,

server/db/models/verification_requests.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package models
22

3-
import "github.com/authorizerdev/authorizer/server/graph/model"
3+
import (
4+
"strings"
5+
6+
"github.com/authorizerdev/authorizer/server/graph/model"
7+
)
48

59
// Note: any change here should be reflected in providers/casandra/provider.go as it does not have model support in collection creation
610

@@ -27,8 +31,13 @@ func (v *VerificationRequest) AsAPIVerificationRequest() *model.VerificationRequ
2731
redirectURI := v.RedirectURI
2832
expires := v.ExpiresAt
2933
identifier := v.Identifier
34+
35+
id := v.ID
36+
if strings.Contains(id, Collections.WebhookLog+"/") {
37+
id = strings.TrimPrefix(id, Collections.WebhookLog+"/")
38+
}
3039
return &model.VerificationRequest{
31-
ID: v.ID,
40+
ID: id,
3241
Token: &token,
3342
Identifier: &identifier,
3443
Expires: &expires,

server/db/models/webhook.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package models
22

33
import (
44
"encoding/json"
5+
"strings"
56

67
"github.com/authorizerdev/authorizer/server/graph/model"
78
)
@@ -23,8 +24,13 @@ type Webhook struct {
2324
func (w *Webhook) AsAPIWebhook() *model.Webhook {
2425
headersMap := make(map[string]interface{})
2526
json.Unmarshal([]byte(w.Headers), &headersMap)
27+
28+
id := w.ID
29+
if strings.Contains(id, Collections.Webhook+"/") {
30+
id = strings.TrimPrefix(id, Collections.Webhook+"/")
31+
}
2632
return &model.Webhook{
27-
ID: w.ID,
33+
ID: id,
2834
EventName: &w.EventName,
2935
Endpoint: &w.EndPoint,
3036
Headers: headersMap,

server/db/models/webhook_log.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package models
22

3-
import "github.com/authorizerdev/authorizer/server/graph/model"
3+
import (
4+
"strings"
5+
6+
"github.com/authorizerdev/authorizer/server/graph/model"
7+
)
48

59
// Note: any change here should be reflected in providers/casandra/provider.go as it does not have model support in collection creation
610

@@ -17,8 +21,12 @@ type WebhookLog struct {
1721
}
1822

1923
func (w *WebhookLog) AsAPIWebhookLog() *model.WebhookLog {
24+
id := w.ID
25+
if strings.Contains(id, Collections.WebhookLog+"/") {
26+
id = strings.TrimPrefix(id, Collections.WebhookLog+"/")
27+
}
2028
return &model.WebhookLog{
21-
ID: w.ID,
29+
ID: id,
2230
HTTPStatus: &w.HttpStatus,
2331
Response: &w.Response,
2432
Request: &w.Request,

server/db/providers/arangodb/user.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ func (p *provider) DeleteUser(ctx context.Context, user models.User) error {
6363
return err
6464
}
6565

66-
query := fmt.Sprintf(`FOR d IN %s FILTER d.user_id == @userId REMOVE { _key: d._key } IN %s`, models.Collections.Session, models.Collections.Session)
66+
query := fmt.Sprintf(`FOR d IN %s FILTER d.user_id == @user_id REMOVE { _key: d._key } IN %s`, models.Collections.Session, models.Collections.Session)
6767
bindVars := map[string]interface{}{
68-
"userId": user.ID,
68+
"user_id": user.ID,
6969
}
7070
cursor, err := p.db.Query(ctx, query, bindVars)
7171
if err != nil {

server/db/providers/arangodb/webhook.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (p *provider) ListWebhook(ctx context.Context, pagination model.Pagination)
8383
// GetWebhookByID to get webhook by id
8484
func (p *provider) GetWebhookByID(ctx context.Context, webhookID string) (*model.Webhook, error) {
8585
var webhook models.Webhook
86-
query := fmt.Sprintf("FOR d in %s FILTER d._id == @webhook_id RETURN d", models.Collections.Webhook)
86+
query := fmt.Sprintf("FOR d in %s FILTER d._key == @webhook_id RETURN d", models.Collections.Webhook)
8787
bindVars := map[string]interface{}{
8888
"webhook_id": webhookID,
8989
}
@@ -146,9 +146,9 @@ func (p *provider) DeleteWebhook(ctx context.Context, webhook *model.Webhook) er
146146
return err
147147
}
148148

149-
query := fmt.Sprintf("FOR d in %s FILTER d.event_id == @event_id REMOVE { _key: d._key }", models.Collections.WebhookLog)
149+
query := fmt.Sprintf("FOR d IN %s FILTER d.webhook_id == @webhook_id REMOVE { _key: d._key } IN %s", models.Collections.WebhookLog, models.Collections.WebhookLog)
150150
bindVars := map[string]interface{}{
151-
"event_id": webhook.ID,
151+
"webhook_id": webhook.ID,
152152
}
153153

154154
cursor, err := p.db.Query(ctx, query, bindVars)

server/db/providers/arangodb/webhook_log.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ func (p *provider) ListWebhookLogs(ctx context.Context, pagination model.Paginat
3737
query := fmt.Sprintf("FOR d in %s SORT d.created_at DESC LIMIT %d, %d RETURN d", models.Collections.WebhookLog, pagination.Offset, pagination.Limit)
3838

3939
if webhookID != "" {
40-
query = fmt.Sprintf("FOR d in %s FILTER d.webhook_id == @webhookID SORT d.created_at DESC LIMIT %d, %d RETURN d", models.Collections.WebhookLog, pagination.Offset, pagination.Limit)
40+
query = fmt.Sprintf("FOR d in %s FILTER d.webhook_id == @webhook_id SORT d.created_at DESC LIMIT %d, %d RETURN d", models.Collections.WebhookLog, pagination.Offset, pagination.Limit)
4141
bindVariables = map[string]interface{}{
4242
"webhook_id": webhookID,
4343
}
4444
}
45+
4546
sctx := driver.WithQueryFullCount(ctx)
4647
cursor, err := p.db.Query(sctx, query, bindVariables)
4748
if err != nil {

server/db/providers/cassandradb/provider.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ func NewProvider() (*provider, error) {
143143
if err != nil {
144144
return nil, err
145145
}
146+
sessionIndexQuery := fmt.Sprintf("CREATE INDEX IF NOT EXISTS authorizer_session_user_id ON %s.%s (user_id)", KeySpace, models.Collections.Session)
147+
err = session.Query(sessionIndexQuery).Exec()
148+
if err != nil {
149+
return nil, err
150+
}
146151

147152
userCollectionQuery := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text, email text, email_verified_at bigint, password text, signup_methods text, given_name text, family_name text, middle_name text, nickname text, gender text, birthdate text, phone_number text, phone_number_verified_at bigint, picture text, roles text, updated_at bigint, created_at bigint, revoked_timestamp bigint, PRIMARY KEY (id))", KeySpace, models.Collections.User)
148153
err = session.Query(userCollectionQuery).Exec()
@@ -177,7 +182,7 @@ func NewProvider() (*provider, error) {
177182
return nil, err
178183
}
179184

180-
webhookCollectionQuery := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text, event_name text, endpoint text, enabled boolean, updated_at bigint, created_at bigint, PRIMARY KEY (id))", KeySpace, models.Collections.Webhook)
185+
webhookCollectionQuery := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text, event_name text, endpoint text, enabled boolean, headers text, updated_at bigint, created_at bigint, PRIMARY KEY (id))", KeySpace, models.Collections.Webhook)
181186
err = session.Query(webhookCollectionQuery).Exec()
182187
if err != nil {
183188
return nil, err

server/db/providers/cassandradb/user.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ func (p *provider) UpdateUser(ctx context.Context, user models.User) (models.Use
102102
continue
103103
}
104104

105+
if key == "_key" {
106+
continue
107+
}
108+
105109
if value == nil {
106110
updateFields += fmt.Sprintf("%s = null,", key)
107111
continue
@@ -135,7 +139,19 @@ func (p *provider) DeleteUser(ctx context.Context, user models.User) error {
135139
return err
136140
}
137141

138-
deleteSessionQuery := fmt.Sprintf("DELETE FROM %s WHERE user_id = '%s'", KeySpace+"."+models.Collections.Session, user.ID)
142+
getSessionsQuery := fmt.Sprintf("SELECT id FROM %s WHERE user_id = '%s' ALLOW FILTERING", KeySpace+"."+models.Collections.Session, user.ID)
143+
scanner := p.db.Query(getSessionsQuery).Iter().Scanner()
144+
sessionIDs := ""
145+
for scanner.Next() {
146+
var wlID string
147+
err = scanner.Scan(&wlID)
148+
if err != nil {
149+
return err
150+
}
151+
sessionIDs += fmt.Sprintf("'%s',", wlID)
152+
}
153+
sessionIDs = strings.TrimSuffix(sessionIDs, ",")
154+
deleteSessionQuery := fmt.Sprintf("DELETE FROM %s WHERE id IN (%s)", KeySpace+"."+models.Collections.Session, sessionIDs)
139155
err = p.db.Query(deleteSessionQuery).Exec()
140156
if err != nil {
141157
return err
@@ -181,7 +197,7 @@ func (p *provider) ListUsers(ctx context.Context, pagination model.Pagination) (
181197
// GetUserByEmail to get user information from database using email address
182198
func (p *provider) GetUserByEmail(ctx context.Context, email string) (models.User, error) {
183199
var user models.User
184-
query := fmt.Sprintf("SELECT id, email, email_verified_at, password, signup_methods, given_name, family_name, middle_name, nickname, birthdate, phone_number, phone_number_verified_at, picture, roles, revoked_timestamp, created_at, updated_at FROM %s WHERE email = '%s' LIMIT 1", KeySpace+"."+models.Collections.User, email)
200+
query := fmt.Sprintf("SELECT id, email, email_verified_at, password, signup_methods, given_name, family_name, middle_name, nickname, birthdate, phone_number, phone_number_verified_at, picture, roles, revoked_timestamp, created_at, updated_at FROM %s WHERE email = '%s' LIMIT 1 ALLOW FILTERING", KeySpace+"."+models.Collections.User, email)
185201
err := p.db.Query(query).Consistency(gocql.One).Scan(&user.ID, &user.Email, &user.EmailVerifiedAt, &user.Password, &user.SignupMethods, &user.GivenName, &user.FamilyName, &user.MiddleName, &user.Nickname, &user.Birthdate, &user.PhoneNumber, &user.PhoneNumberVerifiedAt, &user.Picture, &user.Roles, &user.RevokedTimestamp, &user.CreatedAt, &user.UpdatedAt)
186202
if err != nil {
187203
return user, err

server/db/providers/cassandradb/webhook.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ func (p *provider) AddWebhook(ctx context.Context, webhook models.Webhook) (*mod
2424
webhook.CreatedAt = time.Now().Unix()
2525
webhook.UpdatedAt = time.Now().Unix()
2626

27+
existingHook, _ := p.GetWebhookByEventName(ctx, webhook.EventName)
28+
if existingHook != nil {
29+
return nil, fmt.Errorf("Webhook with %s event_name already exists", webhook.EventName)
30+
}
31+
2732
insertQuery := fmt.Sprintf("INSERT INTO %s (id, event_name, endpoint, headers, enabled, created_at, updated_at) VALUES ('%s', '%s', '%s', '%s', %t, %d, %d)", KeySpace+"."+models.Collections.Webhook, webhook.ID, webhook.EventName, webhook.EndPoint, webhook.Headers, webhook.Enabled, webhook.CreatedAt, webhook.UpdatedAt)
2833
err := p.db.Query(insertQuery).Exec()
2934
if err != nil {
@@ -56,6 +61,10 @@ func (p *provider) UpdateWebhook(ctx context.Context, webhook models.Webhook) (*
5661
continue
5762
}
5863

64+
if key == "_key" {
65+
continue
66+
}
67+
5968
if value == nil {
6069
updateFields += fmt.Sprintf("%s = null,", key)
6170
continue
@@ -72,7 +81,6 @@ func (p *provider) UpdateWebhook(ctx context.Context, webhook models.Webhook) (*
7281
updateFields = strings.TrimSuffix(updateFields, ",")
7382

7483
query := fmt.Sprintf("UPDATE %s SET %s WHERE id = '%s'", KeySpace+"."+models.Collections.Webhook, updateFields, webhook.ID)
75-
7684
err = p.db.Query(query).Exec()
7785
if err != nil {
7886
return nil, err
@@ -130,7 +138,7 @@ func (p *provider) GetWebhookByID(ctx context.Context, webhookID string) (*model
130138
// GetWebhookByEventName to get webhook by event_name
131139
func (p *provider) GetWebhookByEventName(ctx context.Context, eventName string) (*model.Webhook, error) {
132140
var webhook models.Webhook
133-
query := fmt.Sprintf(`SELECT id, event_name, endpoint, headers, enabled, created_at, updated_at FROM %s WHERE event_name = '%s' LIMIT 1`, KeySpace+"."+models.Collections.Webhook, eventName)
141+
query := fmt.Sprintf(`SELECT id, event_name, endpoint, headers, enabled, created_at, updated_at FROM %s WHERE event_name = '%s' LIMIT 1 ALLOW FILTERING`, KeySpace+"."+models.Collections.Webhook, eventName)
134142
err := p.db.Query(query).Consistency(gocql.One).Scan(&webhook.ID, &webhook.EventName, &webhook.EndPoint, &webhook.Headers, &webhook.Enabled, &webhook.CreatedAt, &webhook.UpdatedAt)
135143
if err != nil {
136144
return nil, err
@@ -146,7 +154,19 @@ func (p *provider) DeleteWebhook(ctx context.Context, webhook *model.Webhook) er
146154
return err
147155
}
148156

149-
query = fmt.Sprintf("DELETE FROM %s WHERE webhook_id = '%s'", KeySpace+"."+models.Collections.WebhookLog, webhook.ID)
157+
getWebhookLogQuery := fmt.Sprintf("SELECT id FROM %s WHERE webhook_id = '%s' ALLOW FILTERING", KeySpace+"."+models.Collections.WebhookLog, webhook.ID)
158+
scanner := p.db.Query(getWebhookLogQuery).Iter().Scanner()
159+
webhookLogIDs := ""
160+
for scanner.Next() {
161+
var wlID string
162+
err = scanner.Scan(&wlID)
163+
if err != nil {
164+
return err
165+
}
166+
webhookLogIDs += fmt.Sprintf("'%s',", wlID)
167+
}
168+
webhookLogIDs = strings.TrimSuffix(webhookLogIDs, ",")
169+
query = fmt.Sprintf("DELETE FROM %s WHERE id IN (%s)", KeySpace+"."+models.Collections.WebhookLog, webhookLogIDs)
150170
err = p.db.Query(query).Exec()
151171
return err
152172
}

0 commit comments

Comments
 (0)