Skip to content

Commit 7334e41

Browse files
general bugfixes
1 parent a653926 commit 7334e41

File tree

8 files changed

+89
-28
lines changed

8 files changed

+89
-28
lines changed

conf/config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"VERSION": "1.2.1",
2+
"VERSION": "1.2.2",
33
"JWT_EXPIRES_IN_MINUTES": 15,
44
"REFRESH_JWT_EXPIRES_IN_MINUTES": 300,
55
"REST_GW_UPDATES_SUBJ": "$memphis_restgw_updates"

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/go-playground/validator/v10 v10.11.1
77
github.com/gofiber/fiber/v2 v2.48.0
88
github.com/golang-jwt/jwt/v4 v4.5.0
9-
github.com/memphisdev/memphis.go v1.1.2
9+
github.com/memphisdev/memphis.go v1.1.3
1010
github.com/nats-io/nats.go v1.25.0
1111
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f
1212
)
@@ -35,6 +35,7 @@ require (
3535
github.com/nats-io/nuid v1.0.1 // indirect
3636
github.com/rivo/uniseg v0.2.0 // indirect
3737
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect
38+
github.com/spaolacci/murmur3 v1.1.0 // indirect
3839
github.com/valyala/bytebufferpool v1.0.0 // indirect
3940
github.com/valyala/fasthttp v1.48.0 // indirect
4041
github.com/valyala/tcplisten v1.0.0 // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
5757
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
5858
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
5959
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
60-
github.com/memphisdev/memphis.go v1.1.2 h1:dWhPQSJ+NmnP3eSLPHOR+TZnI0AXbX1Mb/5kIpac/NM=
61-
github.com/memphisdev/memphis.go v1.1.2/go.mod h1:M/VqaTMdzYS4UJgo0h7JtLQmQrdXz3lvAO/+LhhaGbY=
60+
github.com/memphisdev/memphis.go v1.1.3 h1:YVWYQ4asTE8WYxs88kCO6pweEWuzozBdcgq/239H6n0=
61+
github.com/memphisdev/memphis.go v1.1.3/go.mod h1:2/x3ab0LBqXgFbAjWjWSAREKJEfN1HJJzkxPF08Szmk=
6262
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
6363
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
6464
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@@ -89,6 +89,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
8989
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
9090
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
9191
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
92+
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
93+
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
9294
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
9395
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
9496
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=

handlers/consumer.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,39 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
7070
accountIdStr := strconv.Itoa(int(accountId))
7171
conn := ConnectionsCache[accountIdStr][username].Connection
7272
if conn == nil {
73-
log.Warnf("ConsumeHandleMessage - consume: Connection does not exist")
74-
c.Status(fiber.StatusInternalServerError)
75-
return c.JSON(&fiber.Map{
76-
"success": false,
77-
"error": "Server error",
78-
})
73+
conn, err = Connect(userData.Password, username, userData.ConnectionToken, int(accountId))
74+
if err != nil {
75+
errMsg := strings.ToLower(err.Error())
76+
if strings.Contains(errMsg, ErrorMsgAuthorizationViolation) || strings.Contains(errMsg, "token") || strings.Contains(errMsg, ErrorMsgMissionAccountId) {
77+
log.Warnf("Could not establish new connection with the broker: Authentication error")
78+
return c.Status(401).JSON(fiber.Map{
79+
"message": "Unauthorized",
80+
})
81+
}
82+
83+
log.Errorf("Could not establish new connection with the broker: %s", err.Error())
84+
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
85+
"message": "Server error",
86+
})
87+
}
88+
if ConnectionsCache[accountIdStr] == nil {
89+
ConnectionsCacheLock.Lock()
90+
ConnectionsCache[accountIdStr] = make(map[string]Connection)
91+
ConnectionsCacheLock.Unlock()
92+
}
93+
94+
ConnectionsCacheLock.Lock()
95+
ConnectionsCache[accountIdStr][username] = Connection{Connection: conn, ExpirationTime: userData.TokenExpiry}
96+
ConnectionsCacheLock.Unlock()
7997
}
8098
reqBody.initializeDefaults()
8199
msgs, err := conn.FetchMessages(stationName, reqBody.ConsumerName,
82100
memphis.FetchBatchSize(reqBody.BatchSize),
83101
memphis.FetchConsumerGroup(reqBody.ConsumerGroup),
84102
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTimeMs)*time.Millisecond),
85-
memphis.FetchMaxMsgDeliveries(1))
103+
memphis.FetchMaxMsgDeliveries(3)) // for cases of broker crash before sending the messages to the client
86104

87-
if err != nil {
105+
if err != nil && !strings.Contains(err.Error(), "fetch timed out") {
88106
log.Errorf("ConsumeHandleMessage - fetch messages: %s", err.Error())
89107
c.Status(fiber.StatusBadRequest)
90108
return c.JSON(&fiber.Map{

handlers/producer.go

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,30 @@ func CreateHandleMessage() func(*fiber.Ctx) error {
7171
accountIdStr := strconv.Itoa(int(accountId))
7272
conn := ConnectionsCache[accountIdStr][username].Connection
7373
if conn == nil {
74-
log.Warnf("CreateHandleMessage - produce: Connection does not exist")
75-
c.Status(fiber.StatusInternalServerError)
76-
return c.JSON(&fiber.Map{
77-
"success": false,
78-
"error": "Server error",
79-
})
74+
conn, err = Connect(userData.Password, username, userData.ConnectionToken, int(accountId))
75+
if err != nil {
76+
errMsg := strings.ToLower(err.Error())
77+
if strings.Contains(errMsg, ErrorMsgAuthorizationViolation) || strings.Contains(errMsg, "token") || strings.Contains(errMsg, ErrorMsgMissionAccountId) {
78+
log.Warnf("Could not establish new connection with the broker: Authentication error")
79+
return c.Status(401).JSON(fiber.Map{
80+
"message": "Unauthorized",
81+
})
82+
}
83+
84+
log.Errorf("Could not establish new connection with the broker: %s", err.Error())
85+
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
86+
"message": "Server error",
87+
})
88+
}
89+
if ConnectionsCache[accountIdStr] == nil {
90+
ConnectionsCacheLock.Lock()
91+
ConnectionsCache[accountIdStr] = make(map[string]Connection)
92+
ConnectionsCacheLock.Unlock()
93+
}
94+
95+
ConnectionsCacheLock.Lock()
96+
ConnectionsCache[accountIdStr][username] = Connection{Connection: conn, ExpirationTime: userData.TokenExpiry}
97+
ConnectionsCacheLock.Unlock()
8098
}
8199
err = conn.Produce(stationName, "rest-gateway", message, []memphis.ProducerOpt{}, []memphis.ProduceOpt{memphis.MsgHeaders(hdrs)})
82100
if err != nil {
@@ -144,12 +162,30 @@ func CreateHandleBatch() func(*fiber.Ctx) error {
144162
accountIdStr := strconv.Itoa(int(accountId))
145163
conn := ConnectionsCache[accountIdStr][username].Connection
146164
if conn == nil {
147-
log.Warnf("CreateHandleBatch - produce: Connection does not exist")
148-
c.Status(fiber.StatusInternalServerError)
149-
return c.JSON(&fiber.Map{
150-
"success": false,
151-
"error": "Server error",
152-
})
165+
conn, err = Connect(userData.Password, username, userData.ConnectionToken, int(accountId))
166+
if err != nil {
167+
errMsg := strings.ToLower(err.Error())
168+
if strings.Contains(errMsg, ErrorMsgAuthorizationViolation) || strings.Contains(errMsg, "token") || strings.Contains(errMsg, ErrorMsgMissionAccountId) {
169+
log.Warnf("Could not establish new connection with the broker: Authentication error")
170+
return c.Status(401).JSON(fiber.Map{
171+
"message": "Unauthorized",
172+
})
173+
}
174+
175+
log.Errorf("Could not establish new connection with the broker: %s", err.Error())
176+
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
177+
"message": "Server error",
178+
})
179+
}
180+
if ConnectionsCache[accountIdStr] == nil {
181+
ConnectionsCacheLock.Lock()
182+
ConnectionsCache[accountIdStr] = make(map[string]Connection)
183+
ConnectionsCacheLock.Unlock()
184+
}
185+
186+
ConnectionsCacheLock.Lock()
187+
ConnectionsCache[accountIdStr][username] = Connection{Connection: conn, ExpirationTime: userData.TokenExpiry}
188+
ConnectionsCacheLock.Unlock()
153189
}
154190

155191
errCount := 0

middlewares/auth.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,21 @@ func verifyToken(tokenString string, secret string) (models.AuthSchema, error) {
6969
Username: claims["username"].(string),
7070
ConnectionToken: claims["connection_token"].(string),
7171
AccountId: 1,
72+
TokenExpiry: claims["exp"].(int64),
7273
}
7374
} else {
7475
user = models.AuthSchema{
75-
Username: claims["username"].(string),
76-
Password: claims["password"].(string),
77-
AccountId: claims["account_id"].(float64),
76+
Username: claims["username"].(string),
77+
Password: claims["password"].(string),
78+
AccountId: claims["account_id"].(float64),
79+
TokenExpiry: int64(claims["exp"].(float64)),
7880
}
7981
}
8082
} else {
8183
// for backward compatability
8284
user = models.AuthSchema{
8385
TokenExpiryMins: int(claims["exp"].(float64)),
86+
TokenExpiry: int64(claims["exp"].(float64)),
8487
}
8588
}
8689
return user, nil

models/auth.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ type AuthSchema struct {
77
TokenExpiryMins int `json:"token_expiry_in_minutes"`
88
RefreshTokenExpiryMins int `json:"refresh_token_expiry_in_minutes"`
99
AccountId float64 `json:"account_id"`
10+
TokenExpiry int64 `json:"token_expiry"`
1011
}
1112

1213
type RefreshTokenSchema struct {

version.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.2.1
1+
1.2.2

0 commit comments

Comments
 (0)