Skip to content

Commit 023fd16

Browse files
author
Mikhail Podtserkovskiy
committed
bugfix
1 parent 0fb6a05 commit 023fd16

File tree

4 files changed

+22
-12
lines changed

4 files changed

+22
-12
lines changed

main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,13 @@ func main() {
4040
}
4141

4242
busyNodeDuration, err := time.ParseDuration(cfg.Grid.BusyNodeDuration)
43-
reservedNodeDuration, err := time.ParseDuration(cfg.Grid.BusyNodeDuration)
4443
if err != nil {
4544
log.Fatal("Invalid value grid.busy_node_duration in config")
4645
}
46+
reservedNodeDuration, err := time.ParseDuration(cfg.Grid.ReservedDuration)
47+
if err != nil {
48+
log.Fatal("Invalid value grid.reserved_node_duration in config")
49+
}
4750
storageFactory, err := invokeStorageFactory(*cfg)
4851
if err != nil {
4952
log.Fatalf("Can't invoke storage factory, %s", err)
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11

22
-- +migrate Up
33

4+
ALTER TABLE `node` ADD COLUMN `id` BIGINT(20) NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST;
5+
ALTER TABLE `capabilities` ADD COLUMN `id` BIGINT(20) NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST;
46
CREATE INDEX `status_of_updated` ON `node` (`status`,`updated`);
5-
CREATE INDEX `sessionId` ON `node` (`sessionId`);
6-
ALTER TABLE `node` ADD PRIMARY KEY (`address`);
7-
DROP INDEX `address` ON `node`;
87
CREATE INDEX `address` ON `capabilities` (`nodeAddress`);
8+
CREATE INDEX `sessionId` ON `node` (`sessionId`);
9+
DROP INDEX `addressSetName` ON `capabilities`;
10+
CREATE UNIQUE INDEX `addressSetName` ON `capabilities` (`name`, `nodeAddress`, `setId`);
11+
# SLEEP 1;
912

1013
-- +migrate Down
1114
SIGNAL SQLSTATE '45000' SET message_text = 'Impossible down this migration';

storage/mysql/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ func (f *Factory) Create(config config.Config) (pool.StorageInterface, error) {
2121
return nil, err
2222
}
2323

24+
db.SetMaxIdleConns(0) // this is the root problem! set it to 0 to remove all idle connections
25+
db.SetMaxOpenConns(50) // or whatever is appropriate for your setup.
26+
2427
storage := NewMysqlStorage(db)
2528

2629
migrations := &migrate.AssetMigrationSource{

storage/mysql/storage.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package mysql
22

33
import (
44
"errors"
5-
"fmt"
65
"github.com/jmoiron/sqlx"
76
"github.com/qa-dev/jsonwire-grid/pool"
87
"sort"
98
"strconv"
10-
"strings"
119
"time"
10+
"fmt"
11+
"strings"
1212
)
1313

1414
type MysqlNodeModel struct {
15+
ID string `db:"id"`
1516
Type string `db:"type"`
1617
Address string `db:"address"`
1718
Status string `db:"status"`
@@ -53,7 +54,7 @@ func (s *MysqlStorage) Add(node pool.Node) error {
5354
err = errors.New("[MysqlStorage/Add] Can't insert new node: " + err.Error())
5455
return err
5556
}
56-
_, err = s.db.Exec("DELETE FROM capabilities WHERE nodeAddress = ?", node.Address)
57+
_, err = tx.Exec("DELETE FROM capabilities WHERE nodeAddress = ?", node.Address)
5758
if err != nil {
5859
tx.Rollback()
5960
err = errors.New("[MysqlStorage/Add] Can't delete old capabilities: " + err.Error())
@@ -74,7 +75,7 @@ func (s *MysqlStorage) Add(node pool.Node) error {
7475
}
7576

7677
for _, preparedCapability := range preparedCapabilities {
77-
_, err = s.db.NamedExec(
78+
_, err = tx.NamedExec(
7879
"INSERT INTO capabilities (nodeAddress, setId, name, value) "+
7980
"VALUES (:nodeAddress, :setId, :name, :value)",
8081
preparedCapability,
@@ -144,26 +145,26 @@ func (s *MysqlStorage) ReserveAvailable(capabilities pool.Capabilities) (node po
144145
err = tx.QueryRowx(
145146
`
146147
SELECT
148+
n.id,
147149
n.type,
148150
n.status,
149151
n.address,
150152
n.sessionId,
151153
n.updated,
152154
n.registred
153155
FROM node n
154-
LEFT JOIN capabilities c ON n.address = c.nodeAddress
155-
WHERE `+where+`
156+
LEFT JOIN capabilities c ON n.address = c.nodeAddress AND `+where+`
156157
GROUP BY c.setId
157158
HAVING count(c.setId) = `+countCapabilities+`
158-
ORDER BY n.updated DESC
159+
ORDER BY n.updated ASC
159160
LIMIT 1
160161
FOR UPDATE
161162
`,
162163
args...).
163164
StructScan(nodeModel)
164165
default:
165166
err = tx.QueryRowx(
166-
`SELECT n.* FROM node n WHERE `+where+` ORDER BY n.updated DESC LIMIT 1 FOR UPDATE`,
167+
`SELECT n.* FROM node n WHERE `+where+` ORDER BY n.updated ASC LIMIT 1 FOR UPDATE`,
167168
args...).
168169
StructScan(nodeModel)
169170
}

0 commit comments

Comments
 (0)