@@ -3,9 +3,9 @@ package pg
3
3
import (
4
4
"database/sql/driver"
5
5
"github.com/cevian/pq"
6
- "log"
7
6
"reflect"
8
7
"stash.cloudflare.com/go-stream/cube"
8
+ "stash.cloudflare.com/go-stream/util/slog"
9
9
)
10
10
11
11
type Executor struct {
@@ -25,7 +25,7 @@ func (e *Executor) ExecErr(sql string, args ...interface{}) (driver.Result, erro
25
25
var err error
26
26
dargs [n ], err = driver .DefaultParameterConverter .ConvertValue (arg )
27
27
if err != nil {
28
- log .Fatalf ("sql: converting Exec argument #%d's type: %v" , n , err )
28
+ slog .Fatalf ("sql: converting Exec argument #%d's type: %v" , n , err )
29
29
}
30
30
}
31
31
return exec .Exec (sql , dargs )
@@ -34,7 +34,7 @@ func (e *Executor) ExecErr(sql string, args ...interface{}) (driver.Result, erro
34
34
func (e * Executor ) Exec (sql string , args ... interface {}) driver.Result {
35
35
res , err := e .ExecErr (sql , args ... )
36
36
if err != nil {
37
- log . Fatal ("Sql:" , sql , " Err:" , err )
37
+ slog . Fatalf ("Sql: %v Err: %v" , sql , err )
38
38
}
39
39
return res
40
40
}
@@ -68,7 +68,7 @@ func getPartition(p cube.Partition) Partition {
68
68
case cube.TimePartition :
69
69
return & TimePartition {& pt }
70
70
default :
71
- log . Fatal ("Unknown Partition Type" , reflect .TypeOf (pt ))
71
+ slog . Fatalf ("Unknown Partition Type %v " , reflect .TypeOf (pt ))
72
72
}
73
73
panic ("Never Here" )
74
74
}
@@ -84,7 +84,7 @@ func (e *Executor) UpsertCube(p cube.Partition, c cube.Cuber) {
84
84
func (e * Executor ) UpsertCubes (p cube.Partition , c []cube.Cuber ) {
85
85
tx , err := e .conn .Begin ()
86
86
if err != nil {
87
- log . Fatal ("Error starting transaction" , err )
87
+ slog . Fatalf ("Error starting transaction %v " , err )
88
88
}
89
89
90
90
part := getPartition (p )
@@ -96,26 +96,26 @@ func (e *Executor) UpsertCubes(p cube.Partition, c []cube.Cuber) {
96
96
cy := pq .NewCopierFromConn (e .conn )
97
97
err = cy .Start (e .table .CopyTableSql (part ))
98
98
if err != nil {
99
- log . Fatal ("Error starting copy" , err )
99
+ slog . Fatalf ("Error starting copy %v " , err )
100
100
}
101
101
102
102
for _ , cube := range c {
103
103
err = cy .Send (e .table .CopyDataFull (cube ))
104
104
if err != nil {
105
- log . Fatal ("Error copying " , err )
105
+ slog . Fatalf ("Error copying %v " , err )
106
106
}
107
107
}
108
108
109
109
err = cy .Close ()
110
110
if err != nil {
111
- log . Fatal ("Error Ending Copy " , err )
111
+ slog . Fatalf ("Error Ending Copy %v " , err )
112
112
}
113
113
114
114
e .Exec (e .table .MergeCopySql (part ))
115
115
116
116
err = tx .Commit ()
117
117
if err != nil {
118
- log . Fatal ("Error Committing tx " , err )
118
+ slog . Fatalf ("Error Committing tx %v " , err )
119
119
}
120
120
121
121
}
0 commit comments