Skip to content

Commit a2a5dd1

Browse files
committed
Use sync.WaitGroup
1 parent ef589b8 commit a2a5dd1

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

share/model.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type ProtoShare struct {
4242
Permissions uint8
4343
Orphan bool
4444
Expiration datatypes.NullTime
45-
PreviousID uint `gorm:"uniqueIndex"`
4645
}
4746

4847
type Share struct {

share/sql/migrate.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"fmt"
77
"os"
8+
"sync"
89
"time"
910

1011
model "github.com/cernbox/reva-plugins/share"
@@ -60,7 +61,7 @@ type OldShareState struct {
6061

6162
const (
6263
bufferSize = 10
63-
numWorkers = 10
64+
numWorkers = 5
6465
)
6566

6667
func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) {
@@ -75,6 +76,7 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port
7576
"gatewaysvc": gatewaysvc,
7677
"dry_run": dryRun,
7778
}
79+
7880
// Authenticate to gateway service
7981
tokenlessCtx, cancel := context.WithCancel(context.Background())
8082
ctx := appctx.ContextSetToken(tokenlessCtx, token)
@@ -103,6 +105,8 @@ func RunMigration(username, password, host, name, gatewaysvc, token string, port
103105
migrator.NewDb = migrator.NewDb.Debug()
104106
}
105107

108+
migrator.NewDb.AutoMigrate(&model.Share{}, &model.PublicLink{}, &model.ShareState{})
109+
106110
migrateShares(ctx, migrator)
107111
fmt.Println("---------------------------------")
108112
migrateShareStatuses(ctx, migrator)
@@ -131,11 +135,11 @@ func migrateShares(ctx context.Context, migrator Migrator) {
131135

132136
// Create channel for workers
133137
ch := make(chan *OldShareEntry, bufferSize)
134-
defer close(ch)
138+
var wg sync.WaitGroup
135139

136140
// Start all workers
137141
for range numWorkers {
138-
go workerShare(ctx, migrator, ch)
142+
go workerShare(ctx, migrator, ch, &wg)
139143
}
140144

141145
for res.Next() {
@@ -147,6 +151,9 @@ func migrateShares(ctx context.Context, migrator Migrator) {
147151
fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error())
148152
}
149153
}
154+
155+
close(ch)
156+
wg.Wait()
150157
}
151158

152159
func migrateShareStatuses(ctx context.Context, migrator Migrator) {
@@ -171,11 +178,12 @@ func migrateShareStatuses(ctx context.Context, migrator Migrator) {
171178

172179
// Create channel for workers
173180
ch := make(chan *OldShareState, bufferSize)
174-
defer close(ch)
181+
182+
var wg sync.WaitGroup
175183

176184
// Start all workers
177185
for range numWorkers {
178-
go workerState(ctx, migrator, ch)
186+
go workerState(ctx, migrator, ch, &wg)
179187
}
180188

181189
for res.Next() {
@@ -187,18 +195,24 @@ func migrateShareStatuses(ctx context.Context, migrator Migrator) {
187195
fmt.Printf("Error occured for share status%d: %s\n", s.id, err.Error())
188196
}
189197
}
198+
close(ch)
199+
wg.Wait()
190200
}
191201

192-
func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) {
202+
func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry, wg *sync.WaitGroup) {
203+
wg.Add(1)
193204
for share := range ch {
194205
handleSingleShare(ctx, migrator, share)
195206
}
207+
wg.Done()
196208
}
197209

198-
func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState) {
210+
func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState, wg *sync.WaitGroup) {
211+
wg.Add(1)
199212
for state := range ch {
200213
handleSingleState(ctx, migrator, state)
201214
}
215+
wg.Done()
202216
}
203217

204218
func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) {
@@ -222,12 +236,9 @@ func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry)
222236
func handleSingleState(ctx context.Context, migrator Migrator, s *OldShareState) {
223237
newShareState := &model.ShareState{
224238
ShareID: uint(s.id),
225-
Model: gorm.Model{
226-
ID: uint(s.id),
227-
},
228-
User: s.recipient,
229-
Hidden: s.state == -1, // Hidden if REJECTED
230-
Synced: false,
239+
User: s.recipient,
240+
Hidden: s.state == -1, // Hidden if REJECTED
241+
Synced: false,
231242
}
232243
res := migrator.NewDb.Create(&newShareState)
233244
if res.Error != nil {
@@ -267,6 +278,7 @@ func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry
267278
if err == nil {
268279
protoShare.InitialPath = path
269280
} else if errors.Is(err, errtypes.NotFound(protoShare.Inode)) {
281+
fmt.Printf("Marked share %d as an orphan (%s, %s)\n", s.ID, protoShare.Instance, protoShare.Inode)
270282
protoShare.Orphan = true
271283
} else {
272284
// We do not set, because of a general error

0 commit comments

Comments
 (0)