forked from transferia/transferia
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathasync_sink.go
More file actions
27 lines (22 loc) · 1.09 KB
/
async_sink.go
File metadata and controls
27 lines (22 loc) · 1.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package abstract
import (
"io"
"github.com/transferia/transferia/library/go/core/xerrors"
)
// AsyncSink provides asynchronous Push operation, which should be a wrapper over synchronous Push implemented by sink.
//
// All of its methods may be called concurrently.
type AsyncSink interface {
io.Closer
// AsyncPush writes items asynchronously. The error for the given batch of items will be written into the resulting channel when an underlying (synchronous) Push actually happens.
// Note, that AsyncPush takes ownership on slice `items`, so it shouldn't be further used.
AsyncPush(items []ChangeItem) chan error
}
// AsyncPushConcurrencyErr indicates a Push has been called on an already closed AsyncSink. This must not happen and means there are concurrency issues in the implementation of a source.
var AsyncPushConcurrencyErr = xerrors.NewSentinel("AsyncPush is called after Close")
// PusherFromAsyncSink wraps the given sink into a (synchronous) pusher interface
func PusherFromAsyncSink(asink AsyncSink) Pusher {
return func(items []ChangeItem) error {
return <-asink.AsyncPush(items)
}
}