Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 155 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,46 @@ func createTables(MetadataDbClient MetadataStorage) error {
END IF;
END $$;`

alterAccessTokenTable := `
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.tables WHERE table_name = 'accessTokens' AND table_schema = 'public'
) THEN
ALTER TABLE accessTokens ADD COLUMN IF NOT EXISTS tenant_name VARCHAR NOT NULL DEFAULT '$memphis';
ALTER TABLE accessTokens ADD COLUMN IF NOT EXISTS is_active BOOL NOT NULL DEFAULT false;
ALTER TABLE accessTokens ADD COLUMN IF NOT EXISTS generated_by INTEGER NOT NULL;
ALTER TABLE accessTokens ADD COLUMN IF NOT EXISTS access_key_id VARCHAR NOT NULL DEFAULT '';
ALTER TABLE accessTokens ADD COLUMN IF NOT EXISTS secret_key VARCHAR(200) NOT NULL DEFAULT '';
ALTER TABLE accessTokens ADD COLUMN IF NOT EXISTS description VARCHAR NOT NULL DEFAULT '';
CREATE INDEX IF NOT EXISTS access_token_access_key_id ON accessTokens(access_key_id);
END IF;
END $$;`

accessTokensTable := `
CREATE TABLE IF NOT EXISTS accessTokens(
id SERIAL NOT NULL,
tenant_name VARCHAR NOT NULL DEFAULT '$memphis',
is_active BOOL NOT NULL DEFAULT true,
generated_by INTEGER NOT NULL,
access_key_id VARCHAR NOT NULL,
secret_key VARCHAR(200) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
description VARCHAR NOT NULL DEFAULT '',
PRIMARY KEY (id),
CONSTRAINT fk_tenant_name
FOREIGN KEY(tenant_name)
REFERENCES tenants(name),
CONSTRAINT fk_generated_by
FOREIGN KEY(generated_by)
REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS access_token_access_key_id ON accessTokens(access_key_id);`

db := MetadataDbClient.Client
ctx := MetadataDbClient.Ctx

tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable}
tables := []string{alterTenantsTable, tenantsTable, alterUsersTable, usersTable, alterAuditLogsTable, auditLogsTable, alterConfigurationsTable, configurationsTable, alterIntegrationsTable, integrationsTable, alterSchemasTable, schemasTable, alterTagsTable, tagsTable, alterStationsTable, stationsTable, alterDlsMsgsTable, dlsMessagesTable, alterConsumersTable, consumersTable, alterSchemaVerseTable, schemaVersionsTable, alterProducersTable, producersTable, alterConnectionsTable, asyncTasksTable, alterAsyncTasks, testEventsTable, alterAccessTokenTable, accessTokensTable}

for _, table := range tables {
_, err := db.Exec(ctx, table)
Expand Down Expand Up @@ -7048,3 +7084,121 @@ func CountProudcersForStation(stationId int) (int64, error) {

return count, nil
}

// AccessToken Functions
func InsertNewAccessToken(generatedBy int, accessKeyID, hashedSecretKey, description, tenantName string) error {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()

conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()

query := `INSERT INTO accessTokens(
tenant_name,
is_active,
generated_by,
access_key_id,
secret_key,
created_at,
description)
VALUES($1, $2, $3, $4, $5, $6, $7) RETURNING id`

stmt, err := conn.Conn().Prepare(ctx, "insert_new_access_token", query)
if err != nil {
return err
}

createdAt := time.Now()

var accessTokenId int
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := conn.Conn().Query(ctx, stmt.Name, tenantName, true, generatedBy, accessKeyID, hashedSecretKey, createdAt, description)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&accessTokenId)
if err != nil {
return err
}
}

if err := rows.Err(); err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Detail != "" {
return errors.New(pgErr.Detail)
} else {
return errors.New(pgErr.Message)
}
} else {
return err
}
}

return nil
}

func GetAccessTokenByAccessKeyId(accessKeyId string) (bool, models.AccessToken, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return false, models.AccessToken{}, err
}
defer conn.Release()
query := `SELECT * FROM accessTokens WHERE access_key_id=$1 LIMIT 1`
stmt, err := conn.Conn().Prepare(ctx, "get_access_token_by_secret_key_id", query)
if err != nil {
return false, models.AccessToken{}, err
}

rows, err := conn.Conn().Query(ctx, stmt.Name, accessKeyId)
if err != nil {
return false, models.AccessToken{}, err
}
defer rows.Close()
accessTokens, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.AccessToken])
if err != nil {
return false, models.AccessToken{}, err
}
if len(accessTokens) == 0 {
return false, models.AccessToken{}, nil
}
return true, accessTokens[0], nil
}

func GetAllAccessTokens() ([]models.AccessToken, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
conn, err := MetadataDbClient.Client.Acquire(ctx)
if err != nil {
return []models.AccessToken{}, err
}
defer conn.Release()
query := `SELECT * FROM accessTokens`
stmt, err := conn.Conn().Prepare(ctx, "get_all_access_tokens", query)
if err != nil {
return []models.AccessToken{}, err
}
rows, err := conn.Conn().Query(ctx, stmt.Name)
if err != nil {
return []models.AccessToken{}, err
}
defer rows.Close()
accessTokens, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.AccessToken])
if err != nil {
return []models.AccessToken{}, err
}
if len(accessTokens) == 0 {
return []models.AccessToken{}, nil
}

return accessTokens, nil
}
1 change: 1 addition & 0 deletions http_server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func InitializeHttpServer(s *server.Server) {
Integrations: server.IntegrationsHandler{S: s},
Tenants: server.TenantHandler{S: s},
Billing: server.BillingHandler{S: s},
AccessToken: server.AccessTokenHandler{S: s},
}

httpServer := routes.InitializeHttpRoutes(&handlers)
Expand Down
25 changes: 25 additions & 0 deletions http_server/routes/access_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022-2023 The Memphis.dev Authors
// Licensed under the Memphis Business Source License 1.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// Changed License: [Apache License, Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0), as published by the Apache Foundation.
//
// https://github.com/memphisdev/memphis/blob/master/LICENSE
//
// Additional Use Grant: You may make use of the Licensed Work (i) only as part of your own product or service, provided it is not a message broker or a message queue product or service; and (ii) provided that you do not use, provide, distribute, or make available the Licensed Work as a Service.
// A "Service" is a commercial offering, product, hosted, or managed service, that allows third parties (other than your own employees and contractors acting on your behalf) to access and/or use the Licensed Work or a substantial set of the features or functionality of the Licensed Work to third parties as a software-as-a-service, platform-as-a-service, infrastructure-as-a-service or other similar services that compete with Licensor products or services.
package routes

import (
"github.com/memphisdev/memphis/server"

"github.com/gin-gonic/gin"
)

func InitializeAccessTokenRoutes(router *gin.RouterGroup, h *server.Handlers) {
accessTokenHandler := h.AccessToken
accessTokenRoutes := router.Group("/accessToken")
accessTokenRoutes.POST("/createNewAccessToken", accessTokenHandler.CreateNewAccessToken)
accessTokenRoutes.GET("/getAllAccessTokens", accessTokenHandler.GetAllAccessTokens)
}
1 change: 1 addition & 0 deletions http_server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func InitializeHttpRoutes(handlers *server.Handlers) *gin.Engine {
InitializeStationsRoutes(mainRouter, handlers)
InitializeMonitoringRoutes(mainRouter, handlers)
InitializeTagsRoutes(mainRouter, handlers)
InitializeAccessTokenRoutes(mainRouter, handlers)
InitializeSchemasRoutes(mainRouter, handlers)
InitializeIntegrationsRoutes(mainRouter, handlers)
InitializeConfigurationsRoutes(mainRouter, handlers)
Expand Down
18 changes: 18 additions & 0 deletions models/access_tokens.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package models

import "time"

type AccessToken struct {
ID int `json:"id"`
TenantName string `json:"tenant_name"`
IsActive bool `json:"is_active"`
GeneratedBY int `json:"generated_by"`
AccessKeyID string `json:"access_key_id"`
SecretKey string `json:"-"`
CreatedAt time.Time `json:"created_at"`
Description string `json:"description"`
}

type CreateAccessTokenSchema struct {
Description string `json:"description"`
}
1 change: 1 addition & 0 deletions server/memphis_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Handlers struct {
userMgmt UserMgmtHandler
AsyncTasks AsyncTasksHandler
Functions FunctionsHandler
AccessToken AccessTokenHandler
}

var serv *Server
Expand Down
Loading