diff --git a/pkg/sql/queue_schema_adapter_mysql.go b/pkg/sql/queue_schema_adapter_mysql.go index 4f9a000..fce7292 100644 --- a/pkg/sql/queue_schema_adapter_mysql.go +++ b/pkg/sql/queue_schema_adapter_mysql.go @@ -47,7 +47,15 @@ func (s MySQLQueueSchema) SchemaInitializingQueries(params SchemaInitializingQue ); ` - return []Query{{Query: createMessagesTable}}, nil + createAckedIndex := ` + CREATE INDEX ` + "`" + strings.Trim(s.MessagesTable(params.Topic), "`") + `_acked_offset_idx` + "`" + ` + ON ` + s.MessagesTable(params.Topic) + ` (` + "`acked`" + `, ` + "`offset`" + `); + ` + + return []Query{ + {Query: createMessagesTable}, + {Query: createAckedIndex}, + }, nil } func (s MySQLQueueSchema) InsertQuery(params InsertQueryParams) (Query, error) { diff --git a/pkg/sql/queue_schema_adapter_postgresql.go b/pkg/sql/queue_schema_adapter_postgresql.go index 5faa769..34393d9 100644 --- a/pkg/sql/queue_schema_adapter_postgresql.go +++ b/pkg/sql/queue_schema_adapter_postgresql.go @@ -40,7 +40,7 @@ type PostgreSQLQueueSchema struct { } func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) { - createMessagesTable := ` + createMessagesTable := ` CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(params.Topic) + ` ( "offset" SERIAL PRIMARY KEY, "uuid" VARCHAR(36) NOT NULL, @@ -51,7 +51,16 @@ func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializi ); ` - return []Query{{Query: createMessagesTable}}, nil + createAckedIndex := ` + CREATE INDEX IF NOT EXISTS "` + strings.Trim(s.MessagesTable(params.Topic), `"`) + `_acked_idx" + ON ` + s.MessagesTable(params.Topic) + ` ("offset") + WHERE acked = false; + ` + + return []Query{ + {Query: createMessagesTable}, + {Query: createAckedIndex}, + }, nil } func (s PostgreSQLQueueSchema) InsertQuery(params InsertQueryParams) (Query, error) {