Skip to content

Commit 16263fb

Browse files
iskakaushikserprex
andauthored
Create a generic latch.go and make schemas use that (#3353)
Co-authored-by: Philip Dubé <[email protected]>
1 parent c6f16f7 commit 16263fb

File tree

4 files changed

+69
-37
lines changed

4 files changed

+69
-37
lines changed

flow/cmd/handler.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/PeerDB-io/peerdb/flow/internal"
2323
"github.com/PeerDB-io/peerdb/flow/model"
2424
"github.com/PeerDB-io/peerdb/flow/shared"
25+
"github.com/PeerDB-io/peerdb/flow/shared/concurrency"
2526
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
2627
peerflow "github.com/PeerDB-io/peerdb/flow/workflows"
2728
)
@@ -264,14 +265,14 @@ func (h *FlowRequestHandler) shutdownFlow(
264265
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
265266
defer cancel()
266267

267-
errChan := make(chan error, 1)
268+
errLatch := concurrency.NewLatch[error]()
268269
go func() {
269-
errChan <- dropFlowHandle.Get(cancelCtx, nil)
270+
errLatch.Set(dropFlowHandle.Get(cancelCtx, nil))
270271
}()
271272

272273
select {
273-
case err := <-errChan:
274-
if err != nil {
274+
case <-errLatch.Chan():
275+
if err := errLatch.Wait(); err != nil {
275276
slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
276277
return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
277278
}
@@ -385,19 +386,17 @@ func (h *FlowRequestHandler) FlowStateChange(
385386
}
386387

387388
func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowID, runID string) error {
388-
errChan := make(chan error, 1)
389-
390389
ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute)
391390
defer cancel()
392391

392+
errLatch := concurrency.NewLatch[error]()
393393
go func() {
394-
err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID)
395-
errChan <- err
394+
errLatch.Set(h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID))
396395
}()
397396

398397
select {
399-
case err := <-errChan:
400-
if err != nil {
398+
case <-errLatch.Chan():
399+
if err := errLatch.Wait(); err != nil {
401400
slog.Error(fmt.Sprintf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error()))
402401
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
403402
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {

flow/connectors/postgres/sink_pg.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@ import (
1212
"github.com/PeerDB-io/peerdb/flow/connectors/postgres/sanitize"
1313
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1414
"github.com/PeerDB-io/peerdb/flow/shared"
15+
"github.com/PeerDB-io/peerdb/flow/shared/concurrency"
1516
)
1617

1718
type PgCopyShared struct {
18-
schemaLatch chan struct{}
19+
schemaLatch *concurrency.Latch[[]string]
1920
err error
20-
schema []string
21-
schemaSet bool
2221
}
2322

2423
type PgCopyWriter struct {
@@ -33,17 +32,13 @@ type PgCopyReader struct {
3332

3433
func NewPgCopyPipe() (PgCopyReader, PgCopyWriter) {
3534
read, write := io.Pipe()
36-
schema := PgCopyShared{schemaLatch: make(chan struct{})}
35+
schema := PgCopyShared{schemaLatch: concurrency.NewLatch[[]string]()}
3736
return PgCopyReader{PipeReader: read, schema: &schema},
3837
PgCopyWriter{PipeWriter: write, schema: &schema}
3938
}
4039

4140
func (p PgCopyWriter) SetSchema(schema []string) {
42-
if !p.schema.schemaSet {
43-
p.schema.schema = schema
44-
close(p.schema.schemaLatch)
45-
p.schema.schemaSet = true
46-
}
41+
p.schema.schemaLatch.Set(schema)
4742
}
4843

4944
func (p PgCopyWriter) ExecuteQueryWithTx(
@@ -109,8 +104,7 @@ func (p PgCopyWriter) Close(err error) {
109104
}
110105

111106
func (p PgCopyReader) GetColumnNames() ([]string, error) {
112-
<-p.schema.schemaLatch
113-
return p.schema.schema, p.schema.err
107+
return p.schema.schemaLatch.Wait(), p.schema.err
114108
}
115109

116110
func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {

flow/model/qrecord_stream.go

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,38 @@
11
package model
22

33
import (
4+
"github.com/PeerDB-io/peerdb/flow/shared/concurrency"
45
"github.com/PeerDB-io/peerdb/flow/shared/types"
56
)
67

78
type QRecordStream struct {
8-
schemaLatch chan struct{}
9+
schemaLatch *concurrency.Latch[types.QRecordSchema]
910
Records chan []types.QValue
1011
err error
11-
schema types.QRecordSchema
12-
schemaSet bool
1312
}
1413

1514
func NewQRecordStream(buffer int) *QRecordStream {
1615
return &QRecordStream{
17-
schemaLatch: make(chan struct{}),
16+
schemaLatch: concurrency.NewLatch[types.QRecordSchema](),
1817
Records: make(chan []types.QValue, buffer),
19-
schema: types.QRecordSchema{},
2018
err: nil,
21-
schemaSet: false,
2219
}
2320
}
2421

2522
func (s *QRecordStream) Schema() (types.QRecordSchema, error) {
26-
<-s.schemaLatch
27-
return s.schema, s.Err()
23+
return s.schemaLatch.Wait(), s.Err()
2824
}
2925

3026
func (s *QRecordStream) SetSchema(schema types.QRecordSchema) {
31-
if !s.schemaSet {
32-
s.schema = schema
33-
s.schemaSet = true
34-
close(s.schemaLatch)
35-
}
27+
s.schemaLatch.Set(schema)
3628
}
3729

3830
func (s *QRecordStream) IsSchemaSet() bool {
39-
return s.schemaSet
31+
return s.schemaLatch.IsSet()
4032
}
4133

4234
func (s *QRecordStream) SchemaChan() <-chan struct{} {
43-
return s.schemaLatch
35+
return s.schemaLatch.Chan()
4436
}
4537

4638
func (s *QRecordStream) Err() error {
@@ -55,7 +47,7 @@ func (s *QRecordStream) Close(err error) {
5547
s.err = err
5648
close(s.Records)
5749
}
58-
if !s.schemaSet {
50+
if !s.schemaLatch.IsSet() {
5951
s.SetSchema(types.QRecordSchema{})
6052
}
6153
}

flow/shared/concurrency/latch.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package concurrency
2+
3+
import "sync"
4+
5+
// Latch is a thread-safe value holder that can be set once and read many times.
6+
// Once Set is called, all future Wait calls will immediately return the set value.
7+
type Latch[T any] struct {
8+
val T
9+
ready chan struct{}
10+
once sync.Once
11+
}
12+
13+
// NewLatch creates a new Latch for type T.
14+
func NewLatch[T any]() *Latch[T] {
15+
return &Latch[T]{ready: make(chan struct{})}
16+
}
17+
18+
// Set publishes the value exactly once; later calls are no-ops.
19+
func (l *Latch[T]) Set(v T) {
20+
l.once.Do(func() {
21+
l.val = v
22+
close(l.ready) // broadcasts to all waiters
23+
})
24+
}
25+
26+
// Wait blocks until Set is called, then returns the value.
27+
// Safe to call many times; subsequent calls return immediately.
28+
func (l *Latch[T]) Wait() T {
29+
<-l.ready
30+
return l.val
31+
}
32+
33+
// Chan returns a channel that will be closed when the value is set.
34+
// Useful for select statements.
35+
func (l *Latch[T]) Chan() <-chan struct{} {
36+
return l.ready
37+
}
38+
39+
// IsSet returns true if the value has been set.
40+
func (l *Latch[T]) IsSet() bool {
41+
select {
42+
case <-l.ready:
43+
return true
44+
default:
45+
return false
46+
}
47+
}

0 commit comments

Comments
 (0)