Skip to content

Commit 98a2332

Browse files
authored
fix(pipelineloop): caching should include the params for making caching key. (kubeflow#1056)
* fix(pipelineloop): caching should include the params for makeing caching key. * Get params from run spec itself. * Migrated cache for custom task controllers to gorm v2. * code cleanup. * Added retry for cache connect until timoeut. * improved tests to be able to detect config maps. Better error reporting.
1 parent 9b9b932 commit 98a2332

File tree

11 files changed

+119
-160
lines changed

11 files changed

+119
-160
lines changed

tekton-catalog/cache/go.mod

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/cache
33
go 1.13
44

55
require (
6-
github.com/cenkalti/backoff v2.2.1+incompatible
7-
github.com/go-sql-driver/mysql v1.6.0
8-
github.com/jinzhu/gorm v1.9.16
9-
github.com/mattn/go-sqlite3 v1.14.0
6+
gorm.io/driver/mysql v1.4.3
7+
gorm.io/driver/sqlite v1.4.2
8+
gorm.io/gorm v1.24.0
109
)

tekton-catalog/cache/pkg/db/db_conn_manager.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ import (
1818
"fmt"
1919
"time"
2020

21-
_ "github.com/go-sql-driver/mysql"
22-
"github.com/jinzhu/gorm"
2321
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
22+
"gorm.io/gorm"
2423
)
2524

2625
type ConnectionParams struct {
@@ -32,36 +31,32 @@ type ConnectionParams struct {
3231
DbPwd string
3332
DbGroupConcatMaxLen string
3433
DbExtraParams string
35-
Timeout time.Duration
34+
Timeout time.Duration
3635
}
3736

3837
func InitDBClient(params ConnectionParams, initConnectionTimeout time.Duration) (*gorm.DB, error) {
3938
driverName := params.DbDriver
40-
var arg string
39+
var db *gorm.DB
4140
var err error
4241

4342
switch driverName {
44-
case mysqlDBDriverDefault:
45-
arg, err = initMysql(params, initConnectionTimeout)
46-
if err != nil {
47-
return nil, err
48-
}
49-
case sqliteDriverDefault:
50-
arg = initSqlite(params.DbName)
43+
case "mysql":
44+
db, err = initMysql(params)
45+
case "sqlite":
46+
db, err = initSqlite(params.DbName)
5147
default:
52-
return nil, fmt.Errorf("driver %v is not supported", driverName)
48+
return nil, fmt.Errorf("driver %s is not supported", driverName)
5349
}
5450

5551
// db is safe for concurrent use by multiple goroutines
5652
// and maintains its own pool of idle connections.
57-
db, err := gorm.Open(driverName, arg)
5853
if err != nil {
5954
return nil, err
6055
}
6156
// Create table
6257
response := db.AutoMigrate(&model.TaskCache{})
63-
if response.Error != nil {
64-
return nil, fmt.Errorf("failed to initialize the databases: Error: %w", response.Error)
58+
if response != nil {
59+
return nil, fmt.Errorf("failed to initialize the databases: Error: %v", response)
6560
}
6661
return db, nil
6762
}

tekton-catalog/cache/pkg/db/mysql.go

Lines changed: 23 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
package db
1616

1717
import (
18-
"database/sql"
1918
"encoding/json"
2019
"fmt"
2120
"time"
2221

23-
"github.com/cenkalti/backoff"
24-
25-
"github.com/go-sql-driver/mysql"
22+
"gorm.io/driver/mysql"
23+
"gorm.io/gorm"
2624
)
2725

2826
const (
@@ -52,72 +50,37 @@ func (params *ConnectionParams) LoadMySQLDefaults() {
5250
}
5351
}
5452

55-
func initMysql(params ConnectionParams, initConnectionTimeout time.Duration) (string, error) {
53+
func initMysql(params ConnectionParams) (*gorm.DB, error) {
5654
var mysqlExtraParams = map[string]string{}
5755
data := []byte(params.DbExtraParams)
5856
_ = json.Unmarshal(data, &mysqlExtraParams)
59-
mysqlConfig := CreateMySQLConfig(
57+
mysqlConfigDSN := CreateMySQLConfigDSN(
6058
params.DbUser,
6159
params.DbPwd,
6260
params.DbHost,
6361
params.DbPort,
64-
"",
62+
params.DbName,
6563
params.DbGroupConcatMaxLen,
6664
mysqlExtraParams,
6765
)
68-
69-
var db *sql.DB
70-
var err error
71-
var operation = func() error {
72-
db, err = sql.Open(params.DbDriver, mysqlConfig.FormatDSN())
73-
if err != nil {
74-
return err
75-
}
76-
return nil
77-
}
78-
b := backoff.NewExponentialBackOff()
79-
b.MaxElapsedTime = initConnectionTimeout
80-
err = backoff.Retry(operation, b)
81-
if err != nil {
82-
return "", err
83-
}
84-
defer db.Close()
85-
86-
// Create database if not exist
87-
dbName := params.DbName
88-
operation = func() error {
89-
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName))
90-
if err != nil {
91-
return err
92-
}
93-
return nil
94-
}
95-
b = backoff.NewExponentialBackOff()
96-
b.MaxElapsedTime = initConnectionTimeout
97-
err = backoff.Retry(operation, b)
98-
99-
operation = func() error {
100-
_, err = db.Exec(fmt.Sprintf("USE %s", dbName))
101-
if err != nil {
102-
return err
103-
}
104-
return nil
105-
}
106-
b = backoff.NewExponentialBackOff()
107-
b.MaxElapsedTime = initConnectionTimeout
108-
err = backoff.Retry(operation, b)
109-
110-
mysqlConfig.DBName = dbName
111-
// Config reference: https://github.com/go-sql-driver/mysql#clientfoundrows
112-
mysqlConfig.ClientFoundRows = true
113-
return mysqlConfig.FormatDSN(), nil
66+
db, err := gorm.Open(mysql.New(mysql.Config{
67+
DSN: mysqlConfigDSN, // data source name, refer https://github.com/go-sql-driver/mysql#dsn-data-source-name
68+
DefaultStringSize: 256, // add default size for string fields, by default, will use db type `longtext` for fields without size, not a primary key, no index defined and don't have default values
69+
DontSupportRenameIndex: true, // drop & create index when rename index, rename index not supported before MySQL 5.7, MariaDB
70+
DontSupportRenameColumn: true, // use change when rename column, rename rename not supported before MySQL 8, MariaDB
71+
SkipInitializeWithVersion: false, // smart configure based on used version
72+
}), &gorm.Config{})
73+
74+
return db, err
11475
}
11576

116-
func CreateMySQLConfig(user, password, mysqlServiceHost, mysqlServicePort, dbName, mysqlGroupConcatMaxLen string,
117-
mysqlExtraParams map[string]string) *mysql.Config {
77+
func CreateMySQLConfigDSN(user, password, mysqlServiceHost, mysqlServicePort, dbName, mysqlGroupConcatMaxLen string,
78+
mysqlExtraParams map[string]string) string {
11879

80+
if mysqlGroupConcatMaxLen == "" {
81+
mysqlGroupConcatMaxLen = "4194304"
82+
}
11983
params := map[string]string{
120-
"charset": "utf8",
12184
"parseTime": "True",
12285
"loc": "Local",
12386
"group_concat_max_len": mysqlGroupConcatMaxLen,
@@ -126,14 +89,10 @@ func CreateMySQLConfig(user, password, mysqlServiceHost, mysqlServicePort, dbNam
12689
for k, v := range mysqlExtraParams {
12790
params[k] = v
12891
}
92+
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4", user, password, mysqlServiceHost, mysqlServicePort, dbName)
12993

130-
return &mysql.Config{
131-
User: user,
132-
Passwd: password,
133-
Net: "tcp",
134-
Addr: fmt.Sprintf("%s:%s", mysqlServiceHost, mysqlServicePort),
135-
Params: params,
136-
DBName: dbName,
137-
AllowNativePasswords: true,
94+
for k, v := range params {
95+
dsn = fmt.Sprintf("%s&%s=%s", dsn, k, v)
13896
}
97+
return dsn
13998
}

tekton-catalog/cache/pkg/db/sqlite.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414

1515
package db
1616

17-
const sqliteDriverDefault = "sqlite3"
17+
import (
18+
"gorm.io/driver/sqlite"
19+
"gorm.io/gorm"
20+
)
1821

19-
func (params *ConnectionParams) LoadSqliteDefaults() {
20-
setDefault(&params.DbDriver, sqliteDriverDefault)
21-
setDefault(&params.DbName, ":memory:")
22-
}
23-
24-
func initSqlite(dbName string) string {
22+
func initSqlite(dbName string) (*gorm.DB, error) {
23+
var db *gorm.DB
24+
var err error
2525
if dbName == "" {
26-
dbName = ":memory:" // default db.
26+
db, err = gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
27+
} else {
28+
db, err = gorm.Open(sqlite.Open(dbName), &gorm.Config{})
2729
}
28-
return dbName
30+
31+
return db, err
2932
}

tekton-catalog/cache/pkg/task_cache_store.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,15 @@ import (
1818
"fmt"
1919
"time"
2020

21-
"github.com/jinzhu/gorm"
2221
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db"
2322
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
23+
"gorm.io/gorm"
2424
)
2525

26-
2726
type TaskCacheStore struct {
2827
db *gorm.DB
2928
Disabled bool
30-
Params db.ConnectionParams
29+
Params db.ConnectionParams
3130
}
3231

3332
func (t *TaskCacheStore) Connect() error {
@@ -44,28 +43,23 @@ func (t *TaskCacheStore) Get(taskHashKey string) (*model.TaskCache, error) {
4443
return nil, nil
4544
}
4645
entry := &model.TaskCache{}
47-
d := t.db.Table("task_caches").Where("TaskHashKey = ?", taskHashKey).
46+
d := t.db.Model(&model.TaskCache{}).Where("TaskHashKey = ?", taskHashKey).
4847
Order("CreatedAt DESC").First(entry)
4948
if d.Error != nil {
5049
return nil, fmt.Errorf("failed to get entry from cache: %q. Error: %v", taskHashKey, d.Error)
5150
}
5251
return entry, nil
5352
}
5453

55-
func (t *TaskCacheStore) Put(entry *model.TaskCache) (*model.TaskCache, error) {
54+
func (t *TaskCacheStore) Put(entry *model.TaskCache) error {
5655
if t.Disabled || t.db == nil {
57-
return nil, nil
58-
}
59-
ok := t.db.NewRecord(entry)
60-
if !ok {
61-
return nil, fmt.Errorf("failed to create a new cache entry, %#v, Error: %v", entry, t.db.Error)
56+
return nil
6257
}
63-
rowInsert := &model.TaskCache{}
64-
d := t.db.Create(entry).Scan(rowInsert)
58+
d := t.db.Create(entry)
6559
if d.Error != nil {
66-
return nil, d.Error
60+
return fmt.Errorf("failed to create a new cache entry, %#v, Error: %v", entry, t.db.Error)
6761
}
68-
return rowInsert, nil
62+
return nil
6963
}
7064

7165
func (t *TaskCacheStore) Delete(id string) error {

tekton-catalog/cache/pkg/task_cache_store_test.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@ import (
2222

2323
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db"
2424
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
25-
_ "github.com/mattn/go-sqlite3"
2625
)
2726

2827
func newTestingCacheStore(disabled bool) (*TaskCacheStore, error) {
2928
t := TaskCacheStore{
3029
Disabled: disabled,
31-
Params: db.ConnectionParams{DbDriver: "sqlite3", DbName: ":memory:"},
30+
// Params: db.ConnectionParams{DbDriver: "mysql", DbName: "testdb",
31+
// DbHost: "127.0.0.1", DbPort: "3306", DbPwd: "", DbUser: "root",
32+
// Timeout: 10 * time.Second,
33+
// },
34+
Params: db.ConnectionParams{DbDriver: "sqlite", DbName: ":memory:"},
3235
}
3336
err := t.Connect()
3437
return &t, err
@@ -47,19 +50,11 @@ func TestPut(t *testing.T) {
4750
t.Fatal(err)
4851
}
4952
entry := createTaskCache("x", "y")
50-
taskCache, err := taskCacheStore.Put(entry)
53+
err = taskCacheStore.Put(entry)
5154
if err != nil {
5255
t.Fatal(err)
5356
}
54-
if taskCache.TaskHashKey != entry.TaskHashKey {
55-
t.Errorf("Mismatached key. Expected %s Found: %s", entry.TaskHashKey,
56-
taskCache.TaskHashKey)
57-
}
58-
if taskCache.TaskOutput != entry.TaskOutput {
59-
t.Errorf("Mismatached output. Expected : %s Found: %s",
60-
entry.TaskOutput,
61-
taskCache.TaskOutput)
62-
}
57+
6358
}
6459

6560
func TestGet(t *testing.T) {
@@ -68,11 +63,11 @@ func TestGet(t *testing.T) {
6863
t.Fatal(err)
6964
}
7065
entry := createTaskCache("x", "y")
71-
taskCache, err := taskCacheStore.Put(entry)
66+
err = taskCacheStore.Put(entry)
7267
if err != nil {
7368
t.Fatal(err)
7469
}
75-
cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey)
70+
cacheResult, err := taskCacheStore.Get(entry.TaskHashKey)
7671
if err != nil {
7772
t.Error(err)
7873
}
@@ -95,11 +90,11 @@ func TestGetLatest(t *testing.T) {
9590
}
9691
for i := 1; i < 10; i++ {
9792
entry := createTaskCache("x", fmt.Sprintf("y%d", i))
98-
taskCache, err := taskCacheStore.Put(entry)
93+
err := taskCacheStore.Put(entry)
9994
if err != nil {
10095
t.Fatal(err)
10196
}
102-
cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey)
97+
cacheResult, err := taskCacheStore.Get(entry.TaskHashKey)
10398
if err != nil {
10499
t.Error(err)
105100
}
@@ -122,7 +117,7 @@ func TestDisabledCache(t *testing.T) {
122117
}
123118
taskCache, err := taskCacheStore.Get("random")
124119
if err != nil {
125-
t.Errorf("a disabled cache returned non nil error: %w", err)
120+
t.Errorf("a disabled cache returned non nil error: %s", err)
126121
}
127122
if taskCache != nil {
128123
t.Errorf("a disabled cache should return nil")
@@ -141,7 +136,7 @@ func TestPruneOlderThan(t *testing.T) {
141136
TaskOutput: "cacheOutput",
142137
CreatedAt: time.UnixMicro(int64(i * 100)),
143138
}
144-
_, err = taskCacheStore.Put(t1)
139+
err = taskCacheStore.Put(t1)
145140
if err != nil {
146141
t.Fatal(err)
147142
}
@@ -157,7 +152,7 @@ func TestPruneOlderThan(t *testing.T) {
157152
if err != nil {
158153
t.Fatal(err)
159154
}
160-
taskCache, err = taskCacheStore.Get(hashKey)
155+
_, err = taskCacheStore.Get(hashKey)
161156
if err == nil {
162157
t.Errorf("Expected error to be not nil")
163158
}

tekton-catalog/pipeline-loops/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops
33
go 1.13
44

55
require (
6+
github.com/cenkalti/backoff/v4 v4.1.3
67
github.com/google/go-cmp v0.5.8
78
github.com/hashicorp/go-multierror v1.1.1
89
github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0
910
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0
10-
github.com/mattn/go-sqlite3 v1.14.0
1111
github.com/tektoncd/pipeline v0.38.4
1212
go.uber.org/zap v1.21.0
1313
gomodules.xyz/jsonpatch/v2 v2.2.0

0 commit comments

Comments
 (0)