44 "context"
55 "encoding/json"
66 "fmt"
7+ "sync"
78
89 log "github.com/sirupsen/logrus"
910 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -73,6 +74,7 @@ type WorkflowStore interface {
7374type SQLiteStore struct {
7475 conn * sqlite.Conn
7576 instanceService instanceid.Service
77+ mtx sync.Mutex
7678}
7779
7880var _ WorkflowStore = & SQLiteStore {}
@@ -102,6 +104,8 @@ where instanceid = ?
102104 }
103105
104106 var workflows = wfv1.Workflows {}
107+ s .mtx .Lock ()
108+ defer s .mtx .Unlock ()
105109 err = sqlitex .Execute (s .conn , query , & sqlitex.ExecOptions {
106110 Args : args ,
107111 ResultFunc : func (stmt * sqlite.Stmt ) error {
@@ -143,6 +147,8 @@ where instanceid = ?
143147 }
144148
145149 var total int64
150+ s .mtx .Lock ()
151+ defer s .mtx .Unlock ()
146152 err = sqlitex .Execute (s .conn , query , & sqlitex.ExecOptions {
147153 Args : args ,
148154 ResultFunc : func (stmt * sqlite.Stmt ) error {
@@ -161,6 +167,8 @@ func (s *SQLiteStore) Add(obj interface{}) error {
161167 if ! ok {
162168 return fmt .Errorf ("unable to convert object to Workflow. object: %v" , obj )
163169 }
170+ s .mtx .Lock ()
171+ defer s .mtx .Unlock ()
164172 done := sqlitex .Transaction (s .conn )
165173 err := s .upsertWorkflow (wf )
166174 defer done (& err )
@@ -172,6 +180,8 @@ func (s *SQLiteStore) Update(obj interface{}) error {
172180 if ! ok {
173181 return fmt .Errorf ("unable to convert object to Workflow. object: %v" , obj )
174182 }
183+ s .mtx .Lock ()
184+ defer s .mtx .Unlock ()
175185 done := sqlitex .Transaction (s .conn )
176186 err := s .upsertWorkflow (wf )
177187 defer done (& err )
@@ -183,6 +193,8 @@ func (s *SQLiteStore) Delete(obj interface{}) error {
183193 if ! ok {
184194 return fmt .Errorf ("unable to convert object to Workflow. object: %v" , obj )
185195 }
196+ s .mtx .Lock ()
197+ defer s .mtx .Unlock ()
186198 return sqlitex .Execute (s .conn , deleteWorkflowQuery , & sqlitex.ExecOptions {Args : []any {string (wf .UID )}})
187199}
188200
@@ -195,6 +207,8 @@ func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error
195207 }
196208 wfs = append (wfs , wf )
197209 }
210+ s .mtx .Lock ()
211+ defer s .mtx .Unlock ()
198212 done := sqlitex .Transaction (s .conn )
199213 err := s .replaceWorkflows (wfs )
200214 defer done (& err )
@@ -222,6 +236,7 @@ func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err e
222236}
223237
224238func (s * SQLiteStore ) upsertWorkflow (wf * wfv1.Workflow ) error {
239+ // Called with the mutex
225240 err := sqlitex .Execute (s .conn , deleteWorkflowQuery , & sqlitex.ExecOptions {Args : []any {string (wf .UID )}})
226241 if err != nil {
227242 return err
0 commit comments