Skip to content

Commit 4865667

Browse files
for backward compatability
1 parent d4ad1d2 commit 4865667

File tree

3 files changed

+81
-35
lines changed

3 files changed

+81
-35
lines changed

handlers/auth.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
)
1717

1818
var configuration = conf.GetConfig()
19-
var connectionsCacheLock sync.Mutex
19+
var ConnectionsCacheLock sync.Mutex
2020

2121
const (
2222
errorMsgAuthorizationViolation = "authorization violation"
@@ -35,7 +35,7 @@ type refreshTokenExpiration struct {
3535
RefreshTokenExpiration int64 `json:"refresh_token_expiration"`
3636
}
3737

38-
var connectionsCache = map[string]map[string]Connection{}
38+
var ConnectionsCache = map[string]map[string]Connection{}
3939

4040
func connect(password, username, connectionToken string, accountId int) (*memphis.Conn, error) {
4141
if configuration.USER_PASS_BASED_AUTH {
@@ -103,15 +103,15 @@ func (ah AuthHandler) Authenticate(c *fiber.Ctx) error {
103103

104104
username := strings.ToLower(body.Username)
105105
accountId := strconv.Itoa(int(body.AccountId))
106-
if connectionsCache[accountId] == nil {
107-
connectionsCacheLock.Lock()
108-
connectionsCache[accountId] = make(map[string]Connection)
109-
connectionsCacheLock.Unlock()
106+
if ConnectionsCache[accountId] == nil {
107+
ConnectionsCacheLock.Lock()
108+
ConnectionsCache[accountId] = make(map[string]Connection)
109+
ConnectionsCacheLock.Unlock()
110110
}
111111

112-
connectionsCacheLock.Lock()
113-
connectionsCache[accountId][username] = Connection{Connection: conn, ExpirationTime: tokenExpiry}
114-
connectionsCacheLock.Unlock()
112+
ConnectionsCacheLock.Lock()
113+
ConnectionsCache[accountId][username] = Connection{Connection: conn, ExpirationTime: tokenExpiry}
114+
ConnectionsCacheLock.Unlock()
115115
return c.Status(fiber.StatusOK).JSON(fiber.Map{
116116
"jwt": token,
117117
"expires_in": tokenExpiry * 60 * 1000,
@@ -211,15 +211,15 @@ func (ah AuthHandler) RefreshToken(c *fiber.Ctx) error {
211211
}
212212

213213
accountId = int(accountId)
214-
if connectionsCache[strconv.Itoa(int(accountId))] == nil {
215-
connectionsCacheLock.Lock()
216-
connectionsCache[strconv.Itoa(accountId)] = make(map[string]Connection)
217-
connectionsCacheLock.Unlock()
214+
if ConnectionsCache[strconv.Itoa(int(accountId))] == nil {
215+
ConnectionsCacheLock.Lock()
216+
ConnectionsCache[strconv.Itoa(accountId)] = make(map[string]Connection)
217+
ConnectionsCacheLock.Unlock()
218218
}
219219

220-
connectionsCacheLock.Lock()
221-
connectionsCache[strconv.Itoa(accountId)][username] = Connection{Connection: conn, ExpirationTime: refreshTokenExpiry}
222-
connectionsCacheLock.Unlock()
220+
ConnectionsCacheLock.Lock()
221+
ConnectionsCache[strconv.Itoa(accountId)][username] = Connection{Connection: conn, ExpirationTime: refreshTokenExpiry}
222+
ConnectionsCacheLock.Unlock()
223223
return c.Status(fiber.StatusOK).JSON(fiber.Map{
224224
"jwt": token,
225225
"expires_in": tokenExpiry * 60 * 1000,
@@ -230,22 +230,22 @@ func (ah AuthHandler) RefreshToken(c *fiber.Ctx) error {
230230

231231
func CleanConnectionsCache() {
232232
for range time.Tick(time.Second * 30) {
233-
for t, tenant := range connectionsCache {
233+
for t, tenant := range ConnectionsCache {
234234
for u, user := range tenant {
235235
currentTime := time.Now()
236236
unixTimeNow := currentTime.Unix()
237-
conn := connectionsCache[t][u].Connection
237+
conn := ConnectionsCache[t][u].Connection
238238
if unixTimeNow > int64(user.ExpirationTime) {
239239
conn.Close()
240-
connectionsCacheLock.Lock()
241-
delete(connectionsCache[t], u)
242-
connectionsCacheLock.Unlock()
240+
ConnectionsCacheLock.Lock()
241+
delete(ConnectionsCache[t], u)
242+
ConnectionsCacheLock.Unlock()
243243
}
244244
}
245-
if len(connectionsCache[t]) == 0 {
246-
connectionsCacheLock.Lock()
247-
delete(connectionsCache, t)
248-
connectionsCacheLock.Unlock()
245+
if len(ConnectionsCache[t]) == 0 {
246+
ConnectionsCacheLock.Lock()
247+
delete(ConnectionsCache, t)
248+
ConnectionsCacheLock.Unlock()
249249
}
250250
}
251251
}

handlers/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func CreateHandleMessage() func(*fiber.Ctx) error {
7070
username := userData.Username
7171
accountId := userData.AccountId
7272
accountIdStr := strconv.Itoa(int(accountId))
73-
conn := connectionsCache[accountIdStr][username].Connection
73+
conn := ConnectionsCache[accountIdStr][username].Connection
7474
if conn == nil {
7575
errMsg := fmt.Sprintf("Connection does not exist")
7676
log.Errorf("CreateHandleMessage - produce: %s", errMsg)
@@ -145,7 +145,7 @@ func CreateHandleBatch() func(*fiber.Ctx) error {
145145
username := userData.Username
146146
accountId := userData.AccountId
147147
accountIdStr := strconv.Itoa(int(accountId))
148-
conn := connectionsCache[accountIdStr][username].Connection
148+
conn := ConnectionsCache[accountIdStr][username].Connection
149149
if conn == nil {
150150
errMsg := fmt.Sprintf("Connection does not exist")
151151
log.Errorf("CreateHandleBatch - produce: %s", errMsg)

middlewares/auth.go

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@ import (
44
"errors"
55
"fmt"
66
"rest-gateway/conf"
7+
"rest-gateway/handlers"
78
"rest-gateway/logger"
89
"rest-gateway/models"
910
"strings"
11+
"time"
1012

1113
"github.com/gofiber/fiber/v2"
1214
"github.com/golang-jwt/jwt/v4"
15+
"github.com/memphisdev/memphis.go"
1316
)
1417

1518
var configuration = conf.GetConfig()
@@ -62,19 +65,26 @@ func verifyToken(tokenString string, secret string) (models.AuthSchema, error) {
6265
}
6366

6467
var user models.AuthSchema
65-
if !configuration.USER_PASS_BASED_AUTH {
66-
user = models.AuthSchema{
67-
Username: claims["username"].(string),
68-
ConnectionToken: claims["connection_token"].(string),
69-
AccountId: 1,
68+
if _, ok := claims["username"].(string); ok {
69+
if !configuration.USER_PASS_BASED_AUTH {
70+
user = models.AuthSchema{
71+
Username: claims["username"].(string),
72+
ConnectionToken: claims["connection_token"].(string),
73+
AccountId: 1,
74+
}
75+
} else {
76+
user = models.AuthSchema{
77+
Username: claims["username"].(string),
78+
Password: claims["password"].(string),
79+
AccountId: claims["account_id"].(float64),
80+
}
7081
}
7182
} else {
7283
user = models.AuthSchema{
73-
Username: claims["username"].(string),
74-
Password: claims["password"].(string),
75-
AccountId: claims["account_id"].(float64),
84+
TokenExpiryMins: int(claims["exp"].(float64)),
7685
}
7786
}
87+
7888
return user, nil
7989
}
8090

@@ -144,6 +154,42 @@ func Authenticate(c *fiber.Ctx) error {
144154
user.AccountId = 1
145155
}
146156

157+
// for backward compatability
158+
if user.Username == "" {
159+
opts := []memphis.Option{memphis.Reconnect(true), memphis.MaxReconnect(10), memphis.ReconnectInterval(3 * time.Second)}
160+
if configuration.USER_PASS_BASED_AUTH {
161+
opts = append(opts, memphis.Password(configuration.ROOT_PASSWORD))
162+
} else {
163+
opts = append(opts, memphis.ConnectionToken(configuration.CONNECTION_TOKEN))
164+
}
165+
if configuration.CLIENT_CERT_PATH != "" && configuration.CLIENT_KEY_PATH != "" && configuration.ROOT_CA_PATH != "" {
166+
opts = append(opts, memphis.Tls(configuration.CLIENT_CERT_PATH, configuration.CLIENT_KEY_PATH, configuration.ROOT_CA_PATH))
167+
}
168+
conn, _ := memphis.Connect(configuration.MEMPHIS_HOST, configuration.ROOT_USER, opts...)
169+
170+
if handlers.ConnectionsCache["1"] == nil {
171+
handlers.ConnectionsCacheLock.Lock()
172+
handlers.ConnectionsCache["1"] = make(map[string]handlers.Connection)
173+
handlers.ConnectionsCacheLock.Unlock()
174+
}
175+
176+
handlers.ConnectionsCache["1"][configuration.ROOT_USER] = handlers.Connection{Connection: conn, ExpirationTime: int64(user.TokenExpiryMins)}
177+
178+
if !configuration.USER_PASS_BASED_AUTH {
179+
user = models.AuthSchema{
180+
Username: configuration.ROOT_USER,
181+
ConnectionToken: configuration.CONNECTION_TOKEN,
182+
AccountId: 1,
183+
}
184+
} else {
185+
user = models.AuthSchema{
186+
Username: configuration.ROOT_USER,
187+
Password: configuration.ROOT_PASSWORD,
188+
AccountId: 1,
189+
}
190+
}
191+
}
192+
147193
c.Locals("userData", user)
148194
return c.Next()
149195
}

0 commit comments

Comments
 (0)