Skip to content

Commit 739e252

Browse files
author
Jason Yu
committed
Refactored the signature of the function option methods to operate on the main (public) objects instead of the respective (private) option configuration structs.
1 parent da9aabc commit 739e252

File tree

7 files changed

+140
-127
lines changed

7 files changed

+140
-127
lines changed

checkpointer.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"time"
1111
)
1212

13-
// checkpointElement is the struct used to store checkpointing information.
13+
// checkpointElement is the struct used to store checkpointing information in the doubly linked list.
1414
type checkpointElement struct {
1515
seqNum int
1616
done bool
@@ -46,7 +46,7 @@ func (c *checkpointList) insert(seqNum int) error {
4646
return nil
4747
}
4848

49-
// find is a method used to retreive the element in the doubly linked list for a given sequence number. find is
49+
// find is a method used to retrieve the element in the doubly linked list for a given sequence number. find is
5050
// optimized for searching oldest numbers first as it traverse the linked list starting from the beginning.
5151
func (c *checkpointList) find(seqNum int) (*list.Element, bool) {
5252
for e := c.Front(); e != nil; e = e.Next() {
@@ -83,11 +83,11 @@ func defaultCheckpointOptions() *checkpointOptions {
8383

8484
// checkpointOptionsFn is a function signature used to define function options for configuring all of the
8585
// configurable options of a checkpoint object.
86-
type checkpointOptionsFn func(*checkpointOptions) error
86+
type checkpointOptionsFn func(*checkpointer) error
8787

8888
// checkpointAutoCheckpointCount is a functional option method for configuring the checkpoint's auto checkpoint count.
8989
func checkpointAutoCheckpointCount(count int) checkpointOptionsFn {
90-
return func(o *checkpointOptions) error {
90+
return func(o *checkpointer) error {
9191
o.autoCheckpointCount = count
9292
return nil
9393
}
@@ -96,23 +96,23 @@ func checkpointAutoCheckpointCount(count int) checkpointOptionsFn {
9696
// checkpointAutoCheckpointFreq is a functional option method for configuring the checkpoint's auto checkpoint
9797
// frequency.
9898
func checkpointAutoCheckpointFreq(freq time.Duration) checkpointOptionsFn {
99-
return func(o *checkpointOptions) error {
99+
return func(o *checkpointer) error {
100100
o.autoCheckpointFreq = freq
101101
return nil
102102
}
103103
}
104104

105105
// checkpointCheckpointFn is a functional option method for configuring the checkpoint's checkpoint callback function.
106106
func checkpointCheckpointFn(fn func(string) error) checkpointOptionsFn {
107-
return func(o *checkpointOptions) error {
107+
return func(o *checkpointer) error {
108108
o.checkpointFn = fn
109109
return nil
110110
}
111111
}
112112

113113
// checkpointCountCheckFreq is a functional option method for configuring the checkpoint's count check frequency.
114114
func checkpointCountCheckFreq(freq time.Duration) checkpointOptionsFn {
115-
return func(o *checkpointOptions) error {
115+
return func(o *checkpointer) error {
116116
o.countCheckFreq = freq
117117
return nil
118118
}
@@ -138,14 +138,15 @@ type checkpointer struct {
138138
// newCheckpoint instantiates a new checkpoint object with default configuration settings unless the function option
139139
// methods are provided to change the default values.
140140
func newCheckpointer(optionFns ...checkpointOptionsFn) *checkpointer {
141-
checkpointOptions := defaultCheckpointOptions()
142-
for _, optionFn := range optionFns {
143-
optionFn(checkpointOptions)
144-
}
145-
return &checkpointer{
146-
checkpointOptions: checkpointOptions,
141+
checkpointer := &checkpointer{
142+
checkpointOptions: defaultCheckpointOptions(),
147143
list: &checkpointList{list.New()},
148144
}
145+
for _, optionFn := range optionFns {
146+
optionFn(checkpointer)
147+
}
148+
149+
return checkpointer
149150
}
150151

151152
// startup is a method used to enable automatic checkpointing.
@@ -231,7 +232,8 @@ func (c *checkpointer) insert(seqNumStr string) error {
231232
return nil
232233
}
233234

234-
// markDone safely marks the given sequence number as done.
235+
// markDone safely marks the given sequence number as done and attempts to remove it's previous element if the
236+
// previous element is also marked done or attempts to remove itself if it's next element is also marked done
235237
func (c *checkpointer) markDone(seqNumStr string) error {
236238
c.listMu.Lock()
237239
defer c.listMu.Unlock()

consumer.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,19 @@ func defaultConsumerOptions() *consumerOptions {
2929
}
3030

3131
// ConsumerOptionsFn is a method signature for defining functional option methods for configuring the Consumer.
32-
type ConsumerOptionsFn func(*consumerOptions) error
32+
type ConsumerOptionsFn func(*Consumer) error
3333

3434
// ConsumerReader is a functional option method for configuring the consumer's stream reader.
3535
func ConsumerReader(r StreamReader) ConsumerOptionsFn {
36-
return func(o *consumerOptions) error {
36+
return func(o *Consumer) error {
3737
o.reader = r
3838
return nil
3939
}
4040
}
4141

4242
// ConsumerQueueDepth is a functional option method for configuring the consumer's queueDepth.
4343
func ConsumerQueueDepth(depth int) ConsumerOptionsFn {
44-
return func(o *consumerOptions) error {
44+
return func(o *Consumer) error {
4545
if depth > 0 {
4646
o.queueDepth = depth
4747
return nil
@@ -52,7 +52,7 @@ func ConsumerQueueDepth(depth int) ConsumerOptionsFn {
5252

5353
// ConsumerConcurrency is a functional option method for configuring the consumer's concurrency.
5454
func ConsumerConcurrency(count int) ConsumerOptionsFn {
55-
return func(o *consumerOptions) error {
55+
return func(o *Consumer) error {
5656
if count > 0 {
5757
o.concurrency = count
5858
return nil
@@ -63,15 +63,15 @@ func ConsumerConcurrency(count int) ConsumerOptionsFn {
6363

6464
// ConsumerLogLevel is a functional option method for configuring the consumer's log level.
6565
func ConsumerLogLevel(ll aws.LogLevelType) ConsumerOptionsFn {
66-
return func(o *consumerOptions) error {
66+
return func(o *Consumer) error {
6767
o.logLevel = ll & 0xffff0000
6868
return nil
6969
}
7070
}
7171

7272
// ConsumerStats is a functional option method for configuring the consumer's stats collector.
7373
func ConsumerStats(sc ConsumerStatsCollector) ConsumerOptionsFn {
74-
return func(o *consumerOptions) error {
74+
return func(o *Consumer) error {
7575
o.Stats = sc
7676
return nil
7777
}
@@ -93,24 +93,25 @@ type Consumer struct {
9393

9494
// NewConsumer creates a new Consumer object for retrieving and listening to message(s) on a StreamReader.
9595
func NewConsumer(c *aws.Config, stream string, shard string, optionFns ...ConsumerOptionsFn) (*Consumer, error) {
96-
consumerOptions := defaultConsumerOptions()
96+
consumer := &Consumer{consumerOptions: defaultConsumerOptions()}
9797
for _, optionFn := range optionFns {
98-
optionFn(consumerOptions)
98+
optionFn(consumer)
9999
}
100-
if consumerOptions.reader == nil {
100+
101+
if consumer.reader == nil {
101102
r, err := NewKinesisReader(c, stream, shard)
102103
if err != nil {
103104
return nil, err
104105
}
105-
consumerOptions.reader = r
106+
consumer.reader = r
107+
}
108+
109+
consumer.LogHelper = &LogHelper{
110+
LogLevel: consumer.logLevel,
111+
Logger: c.Logger,
106112
}
107-
return &Consumer{
108-
consumerOptions: consumerOptions,
109-
LogHelper: &LogHelper{
110-
LogLevel: consumerOptions.logLevel,
111-
Logger: c.Logger,
112-
},
113-
}, nil
113+
114+
return consumer, nil
114115
}
115116

116117
// startConsuming will initialize the message channel and set consuming to true if there is not already another consume

firehose_writer.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ func defaultFirehoseWriterOptions() *firehoseWriterOptions {
3939

4040
// FirehoseWriterOptionsFn is a method signature for defining functional option methods for configuring
4141
// the FirehoseWriter.
42-
type FirehoseWriterOptionsFn func(*firehoseWriterOptions) error
42+
type FirehoseWriterOptionsFn func(*FirehoseWriter) error
4343

4444
// FirehoseWriterMsgCountRateLimit is a functional option method for configuring the FirehoseWriter's
4545
// message count rate limit.
4646
func FirehoseWriterMsgCountRateLimit(limit int) FirehoseWriterOptionsFn {
47-
return func(o *firehoseWriterOptions) error {
47+
return func(o *FirehoseWriter) error {
4848
if limit > 0 && limit <= firehoseMsgCountRateLimit {
4949
o.msgCountRateLimit = limit
5050
return nil
@@ -56,7 +56,7 @@ func FirehoseWriterMsgCountRateLimit(limit int) FirehoseWriterOptionsFn {
5656
// FirehoseWriterMsgSizeRateLimit is a functional option method for configuring the FirehoseWriter's
5757
// messsage size rate limit.
5858
func FirehoseWriterMsgSizeRateLimit(limit int) FirehoseWriterOptionsFn {
59-
return func(o *firehoseWriterOptions) error {
59+
return func(o *FirehoseWriter) error {
6060
if limit > 0 && limit <= firehoseMsgSizeRateLimit {
6161
o.msgSizeRateLimit = limit
6262
return nil
@@ -68,7 +68,7 @@ func FirehoseWriterMsgSizeRateLimit(limit int) FirehoseWriterOptionsFn {
6868
// FirehoseWriterThroughputMultiplier is a functional option method for configuring the FirehoseWriter's
6969
// throughput multiplier.
7070
func FirehoseWriterThroughputMultiplier(multiplier int) FirehoseWriterOptionsFn {
71-
return func(o *firehoseWriterOptions) error {
71+
return func(o *FirehoseWriter) error {
7272
if multiplier > 0 {
7373
o.throughputMultiplier = multiplier
7474
return nil
@@ -79,15 +79,15 @@ func FirehoseWriterThroughputMultiplier(multiplier int) FirehoseWriterOptionsFn
7979

8080
// FirehoseWriterLogLevel is a functional option method for configuring the FirehoseWriter's log level.
8181
func FirehoseWriterLogLevel(ll aws.LogLevelType) FirehoseWriterOptionsFn {
82-
return func(o *firehoseWriterOptions) error {
82+
return func(o *FirehoseWriter) error {
8383
o.logLevel = ll & 0xffff0000
8484
return nil
8585
}
8686
}
8787

8888
// FirehoseWriterStats is a functional option method for configuring the FirehoseWriter's stats collector.
8989
func FirehoseWriterStats(sc ProducerStatsCollector) FirehoseWriterOptionsFn {
90-
return func(o *firehoseWriterOptions) error {
90+
return func(o *FirehoseWriter) error {
9191
o.Stats = sc
9292
return nil
9393
}
@@ -103,23 +103,26 @@ type FirehoseWriter struct {
103103

104104
// NewFirehoseWriter creates a new stream writer to write records to a Kinesis.
105105
func NewFirehoseWriter(c *aws.Config, stream string, optionFns ...FirehoseWriterOptionsFn) (*FirehoseWriter, error) {
106-
firehoseWriterOptions := defaultFirehoseWriterOptions()
107-
for _, optionFn := range optionFns {
108-
optionFn(firehoseWriterOptions)
109-
}
110106
sess, err := session.NewSession(c)
111107
if err != nil {
112108
return nil, err
113109
}
114-
return &FirehoseWriter{
110+
111+
firehoseWriter := &FirehoseWriter{
112+
firehoseWriterOptions: defaultFirehoseWriterOptions(),
115113
stream: stream,
116114
client: firehose.New(sess),
117-
firehoseWriterOptions: firehoseWriterOptions,
118-
LogHelper: &LogHelper{
119-
LogLevel: firehoseWriterOptions.logLevel,
120-
Logger: c.Logger,
121-
},
122-
}, nil
115+
}
116+
for _, optionFn := range optionFns {
117+
optionFn(firehoseWriter)
118+
}
119+
120+
firehoseWriter.LogHelper = &LogHelper{
121+
LogLevel: firehoseWriter.logLevel,
122+
Logger: c.Logger,
123+
}
124+
125+
return firehoseWriter, nil
123126
}
124127

125128
// PutRecords sends a batch of records to Firehose and returns a list of records that need to be retried.

kcl_reader.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ func defaultKclReaderOptions() *kclReaderOptions {
4343
}
4444

4545
// KclReaderOptionsFn is a method signature for defining functional option methods for configuring the KclReader.
46-
type KclReaderOptionsFn func(*kclReaderOptions) error
46+
type KclReaderOptionsFn func(*KclReader) error
4747

4848
// kclReaderBatchSize is a functional option method for configuring the KclReader's batch size
4949
func kclReaderBatchSize(size int) KclReaderOptionsFn {
50-
return func(o *kclReaderOptions) error {
50+
return func(o *KclReader) error {
5151
if size > 0 && size <= kclReaderMaxBatchSize {
5252
o.batchSize = size
5353
return nil
@@ -58,15 +58,15 @@ func kclReaderBatchSize(size int) KclReaderOptionsFn {
5858

5959
// KclReaderAutoCheckpointCount is a functional option method for configuring the KclReader's checkpoint count
6060
func KclReaderAutoCheckpointCount(count int) KclReaderOptionsFn {
61-
return func(o *kclReaderOptions) error {
61+
return func(o *KclReader) error {
6262
o.autoCheckpointCount = count
6363
return nil
6464
}
6565
}
6666

6767
// KclReaderAutoCheckpointFreq is a functional option method for configuring the KclReader's checkpoint frequency
6868
func KclReaderAutoCheckpointFreq(freq time.Duration) KclReaderOptionsFn {
69-
return func(o *kclReaderOptions) error {
69+
return func(o *KclReader) error {
7070
o.autoCheckpointFreq = freq
7171
return nil
7272
}
@@ -75,7 +75,7 @@ func KclReaderAutoCheckpointFreq(freq time.Duration) KclReaderOptionsFn {
7575
// KclReaderUpdateCheckpointSizeFreq is a functional option method for configuring the KclReader's
7676
// update checkpoint size stats frequency
7777
func KclReaderUpdateCheckpointSizeFreq(freq time.Duration) KclReaderOptionsFn {
78-
return func(o *kclReaderOptions) error {
78+
return func(o *KclReader) error {
7979
o.updateCheckpointSizeFreq = freq
8080
return nil
8181
}
@@ -84,7 +84,7 @@ func KclReaderUpdateCheckpointSizeFreq(freq time.Duration) KclReaderOptionsFn {
8484
// KclReaderOnInitCallbackFn is a functional option method for configuring the KclReader's
8585
// onInitCallbackFn.
8686
func KclReaderOnInitCallbackFn(fn func() error) KclReaderOptionsFn {
87-
return func(o *kclReaderOptions) error {
87+
return func(o *KclReader) error {
8888
o.onInitCallbackFn = fn
8989
return nil
9090
}
@@ -93,7 +93,7 @@ func KclReaderOnInitCallbackFn(fn func() error) KclReaderOptionsFn {
9393
// KclReaderOnCheckpointCallbackFn is a functional option method for configuring the KclReader's
9494
// onCheckpointCallbackFn.
9595
func KclReaderOnCheckpointCallbackFn(fn func(seqNum string, err string) error) KclReaderOptionsFn {
96-
return func(o *kclReaderOptions) error {
96+
return func(o *KclReader) error {
9797
o.onCheckpointCallbackFn = fn
9898
return nil
9999
}
@@ -102,23 +102,23 @@ func KclReaderOnCheckpointCallbackFn(fn func(seqNum string, err string) error) K
102102
// KclReaderOnShutdownCallbackFn is a functional option method for configuring the KclReader's
103103
// onShutdownCallbackFn.
104104
func KclReaderOnShutdownCallbackFn(fn func() error) KclReaderOptionsFn {
105-
return func(o *kclReaderOptions) error {
105+
return func(o *KclReader) error {
106106
o.onShutdownCallbackFn = fn
107107
return nil
108108
}
109109
}
110110

111111
// KclReaderLogLevel is a functional option method for configuring the KclReader's log level.
112112
func KclReaderLogLevel(ll aws.LogLevelType) KclReaderOptionsFn {
113-
return func(o *kclReaderOptions) error {
113+
return func(o *KclReader) error {
114114
o.logLevel = ll
115115
return nil
116116
}
117117
}
118118

119119
// KclReaderStats is a functional option method for configuring the KclReader's stats collector.
120120
func KclReaderStats(sc ConsumerStatsCollector) KclReaderOptionsFn {
121-
return func(o *kclReaderOptions) error {
121+
return func(o *KclReader) error {
122122
o.Stats = sc
123123
return nil
124124
}
@@ -140,17 +140,17 @@ type KclReader struct {
140140

141141
// NewKclReader creates a new stream reader to read records from KCL
142142
func NewKclReader(c *aws.Config, optionFns ...KclReaderOptionsFn) (*KclReader, error) {
143-
kclReaderOptions := defaultKclReaderOptions()
143+
kclReader := &KclReader{kclReaderOptions: defaultKclReaderOptions()}
144144
for _, optionFn := range optionFns {
145-
optionFn(kclReaderOptions)
145+
optionFn(kclReader)
146146
}
147-
return &KclReader{
148-
kclReaderOptions: kclReaderOptions,
149-
LogHelper: &LogHelper{
150-
LogLevel: kclReaderOptions.logLevel,
151-
Logger: c.Logger,
152-
},
153-
}, nil
147+
148+
kclReader.LogHelper = &LogHelper{
149+
LogLevel: kclReader.logLevel,
150+
Logger: c.Logger,
151+
}
152+
153+
return kclReader, nil
154154
}
155155

156156
func (r *KclReader) process(ctx context.Context) {

0 commit comments

Comments
 (0)