Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit f7c4a67

Browse files
authored
VDB-919 Generalise converter (#152)
* Generalise transformer stack to use InsertionModel * Add tests for event repository * Restrict accepted values in InsertionModel * Add call to repository.SetDB * Improve error propagation/clarity on GetABI() * Remove maker references in example * Please golint * refactor rollback error handling in repository * Cleaner errors in repository, refactor tests
1 parent 6c055a9 commit f7c4a67

File tree

13 files changed

+460
-76
lines changed

13 files changed

+460
-76
lines changed

integration_test/integration_test_suite_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package integration_test
1818

1919
import (
20+
"github.com/sirupsen/logrus"
21+
"io/ioutil"
2022
"testing"
2123

2224
. "github.com/onsi/ginkgo"
@@ -27,3 +29,7 @@ func TestIntegrationTest(t *testing.T) {
2729
RegisterFailHandler(Fail)
2830
RunSpecs(t, "IntegrationTest Suite")
2931
}
32+
33+
var _ = BeforeSuite(func() {
34+
logrus.SetOutput(ioutil.Discard)
35+
})

libraries/shared/factories/event/converter.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
package event
1818

19-
import "github.com/vulcanize/vulcanizedb/pkg/core"
19+
import (
20+
"github.com/vulcanize/vulcanizedb/pkg/core"
21+
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
22+
)
2023

24+
// Converter transforms log data into general InsertionModels the Repository can persist__
2125
type Converter interface {
22-
ToEntities(contractAbi string, ethLog []core.HeaderSyncLog) ([]interface{}, error)
23-
ToModels([]interface{}) ([]interface{}, error)
26+
ToModels(contractAbi string, ethLog []core.HeaderSyncLog) ([]InsertionModel, error)
27+
SetDB(db *postgres.DB)
2428
}

libraries/shared/factories/event/repository.go

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,163 @@
1616

1717
package event
1818

19-
import "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
19+
import (
20+
"database/sql/driver"
21+
"fmt"
22+
"github.com/vulcanize/vulcanizedb/utils"
23+
"strings"
2024

25+
"github.com/sirupsen/logrus"
26+
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
27+
)
28+
29+
const SetLogTransformedQuery = `UPDATE public.header_sync_logs SET transformed = true WHERE id = $1`
30+
31+
// Repository persists transformed values to the DB
2132
type Repository interface {
22-
Create(models []interface{}) error
33+
Create(models []InsertionModel) error
2334
SetDB(db *postgres.DB)
2435
}
36+
37+
// LogFK is the name of log foreign key columns
38+
const LogFK ColumnName = "log_id"
39+
40+
// AddressFK is the name of address foreign key columns
41+
const AddressFK ColumnName = "address_id"
42+
43+
// HeaderFK is the name of header foreign key columns
44+
const HeaderFK ColumnName = "header_id"
45+
46+
// SchemaName is the schema to work with
47+
type SchemaName string
48+
49+
// TableName identifies the table for inserting the data
50+
type TableName string
51+
52+
// ColumnName identifies columns on the given table
53+
type ColumnName string
54+
55+
// ColumnValues maps a column to the value for insertion. This is restricted to []byte, bool, float64, int64, string, time.Time
56+
type ColumnValues map[ColumnName]interface{}
57+
58+
// ErrUnsupportedValue is thrown when a model supplies a type of value the postgres driver cannot handle.
59+
var ErrUnsupportedValue = func(value interface{}) error {
60+
return fmt.Errorf("unsupported type of value supplied in model: %v (%T)", value, value)
61+
}
62+
63+
// InsertionModel is the generalised data structure a converter returns, and contains everything the repository needs to
64+
// persist the converted data.
65+
type InsertionModel struct {
66+
SchemaName SchemaName
67+
TableName TableName
68+
OrderedColumns []ColumnName // Defines the fields to insert, and in which order the table expects them
69+
ColumnValues ColumnValues // Associated values for columns, restricted to []byte, bool, float64, int64, string, time.Time
70+
}
71+
72+
// ModelToQuery stores memoised insertion queries to minimise computation
73+
var ModelToQuery = map[string]string{}
74+
75+
// GetMemoizedQuery gets/creates a DB insertion query for the model
76+
func GetMemoizedQuery(model InsertionModel) string {
77+
// The schema and table name uniquely determines the insertion query, use that for memoization
78+
queryKey := string(model.SchemaName) + string(model.TableName)
79+
query, queryMemoized := ModelToQuery[queryKey]
80+
if !queryMemoized {
81+
query = GenerateInsertionQuery(model)
82+
ModelToQuery[queryKey] = query
83+
}
84+
return query
85+
}
86+
87+
// GenerateInsertionQuery creates an SQL insertion query from an insertion model.
88+
// Should be called through GetMemoizedQuery, so the query is not generated on each call to Create.
89+
func GenerateInsertionQuery(model InsertionModel) string {
90+
var valuePlaceholders []string
91+
var updateOnConflict []string
92+
for i := 0; i < len(model.OrderedColumns); i++ {
93+
valuePlaceholder := fmt.Sprintf("$%d", 1+i)
94+
valuePlaceholders = append(valuePlaceholders, valuePlaceholder)
95+
updateOnConflict = append(updateOnConflict,
96+
fmt.Sprintf("%s = %s", model.OrderedColumns[i], valuePlaceholder))
97+
}
98+
99+
baseQuery := `INSERT INTO %v.%v (%v) VALUES(%v)
100+
ON CONFLICT (header_id, log_id) DO UPDATE SET %v;`
101+
102+
return fmt.Sprintf(baseQuery,
103+
model.SchemaName,
104+
model.TableName,
105+
joinOrderedColumns(model.OrderedColumns),
106+
strings.Join(valuePlaceholders, ", "),
107+
strings.Join(updateOnConflict, ", "))
108+
}
109+
110+
/*
111+
Create generates an insertion query and persists to the DB, given a slice of InsertionModels.
112+
ColumnValues are restricted to []byte, bool, float64, int64, string, time.Time.
113+
114+
testModel = shared.InsertionModel{
115+
SchemaName: "public"
116+
TableName: "testEvent",
117+
OrderedColumns: []string{"header_id", "log_id", "variable1"},
118+
ColumnValues: ColumnValues{
119+
"header_id": 303
120+
"log_id": "808",
121+
"variable1": "value1",
122+
},
123+
}
124+
*/
125+
func Create(models []InsertionModel, db *postgres.DB) error {
126+
if len(models) == 0 {
127+
return fmt.Errorf("repository got empty model slice")
128+
}
129+
130+
tx, dbErr := db.Beginx()
131+
if dbErr != nil {
132+
return dbErr
133+
}
134+
135+
for _, model := range models {
136+
// Maps can't be iterated over in a reliable manner, so we rely on OrderedColumns to define the order to insert
137+
// tx.Exec is variadically typed in the args, so if we wrap in []interface{} we can apply them all automatically
138+
var args []interface{}
139+
for _, col := range model.OrderedColumns {
140+
value := model.ColumnValues[col]
141+
// Check whether or not PG can accept the type of value in the model
142+
okPgValue := driver.IsValue(value)
143+
if !okPgValue {
144+
logrus.WithField("model", model).Errorf("PG cannot handle value of this type: %T", value)
145+
return ErrUnsupportedValue(value)
146+
}
147+
args = append(args, value)
148+
}
149+
150+
insertionQuery := GetMemoizedQuery(model)
151+
_, execErr := tx.Exec(insertionQuery, args...) // couldn't pass varying types in bulk with args :: []string
152+
153+
if execErr != nil {
154+
rollbackErr := tx.Rollback()
155+
if rollbackErr != nil {
156+
logrus.Error("failed to rollback ", rollbackErr)
157+
}
158+
return execErr
159+
}
160+
161+
_, logErr := tx.Exec(SetLogTransformedQuery, model.ColumnValues[LogFK])
162+
163+
if logErr != nil {
164+
utils.RollbackAndLogFailure(tx, logErr, "header_sync_logs.transformed")
165+
return logErr
166+
}
167+
}
168+
169+
return tx.Commit()
170+
}
171+
172+
func joinOrderedColumns(columns []ColumnName) string {
173+
var stringColumns []string
174+
for _, columnName := range columns {
175+
stringColumns = append(stringColumns, string(columnName))
176+
}
177+
return strings.Join(stringColumns, ", ")
178+
}

0 commit comments

Comments
 (0)