Skip to content

Commit 3d72334

Browse files
committed
apply concrete implementation
1 parent 6df2c37 commit 3d72334

File tree

5 files changed

+294
-271
lines changed

5 files changed

+294
-271
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# goInterLock
22
![Go Interval Lock](material/gointerlock_bg.png)
33

4-
_known as: ⏰ Interval (Cron / Job / Task / Scheduler) Go Distributed Lock ⏱️_
4+
_known as: ⏰ Interval (Cron / Job / Task / Scheduler) Go Centralized Lock ⏱️_
55

6-
## **Go** **Interval** job timer, with distributed **Lock**
6+
## Go Interval job timer, with centralized Lock for Distributed Systems
77

8-
`goInterLock` is go job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,
8+
`goInterLock` is go job/task scheduler with centralized locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,
99

1010
**For example:** if your application has a task of calling some external APIs or doing some DB querying every 10 minutes, the lock prevents the process been run in every instance of that application, and you ended up running that task multiple time every 10 minutes. (let say in **kubernetes**)
1111

example/redis/basic/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"github.com/ehsaniara/gointerlock"
76
"log"
87
"time"
8+
9+
"github.com/ehsaniara/gointerlock"
910
)
1011

1112
func myJob() {
@@ -21,7 +22,7 @@ func main() {
2122
Interval: 2 * time.Second,
2223
Arg: myJob,
2324
RedisHost: "localhost:6379",
24-
RedisPassword: "MyRedisPassword",
25+
RedisPassword: "secret",
2526
}
2627

2728
//test cron

goInterval.go

Lines changed: 44 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,13 @@ package gointerlock
33
import (
44
"context"
55
"errors"
6-
"fmt"
7-
"github.com/aws/aws-sdk-go/aws"
8-
"github.com/aws/aws-sdk-go/aws/credentials"
9-
"github.com/go-redis/redis/v8"
106
"log"
117
"time"
128

13-
"github.com/aws/aws-sdk-go/aws/session"
14-
"github.com/aws/aws-sdk-go/service/dynamodb"
9+
"github.com/go-redis/redis/v8"
1510
)
1611

17-
var locker Locker
12+
var locker Lock
1813

1914
type LockVendor int32
2015

@@ -95,9 +90,35 @@ func (t *GoInterval) Run(ctx context.Context) error {
9590
}
9691
}
9792

98-
err := t.init(ctx)
99-
if err != nil {
100-
return err
93+
switch t.LockVendor {
94+
case RedisLock:
95+
r := &RedisLocker{
96+
redisConnector: t.RedisConnector,
97+
Name: t.Name,
98+
RedisHost: t.RedisHost,
99+
RedisPassword: t.RedisPassword,
100+
RedisDB: t.RedisDB,
101+
}
102+
err := r.SetClient()
103+
if err != nil {
104+
return err
105+
}
106+
107+
locker = r
108+
case AwsDynamoDbLock:
109+
d := &DynamoDbLocker{
110+
AwsDynamoDbRegion: t.AwsDynamoDbRegion,
111+
AwsDynamoDbEndpoint: t.AwsDynamoDbEndpoint,
112+
AwsDynamoDbAccessKeyID: t.AwsDynamoDbAccessKeyID,
113+
AwsDynamoDbSecretAccessKey: t.AwsDynamoDbSecretAccessKey,
114+
AwsDynamoDbSessionToken: t.AwsDynamoDbSessionToken,
115+
}
116+
err := d.SetClient()
117+
if err != nil {
118+
return err
119+
}
120+
121+
locker = d
101122
}
102123

103124
t.updateTimer()
@@ -128,168 +149,29 @@ func (t *GoInterval) Run(ctx context.Context) error {
128149
}
129150
}
130151

131-
func (t *GoInterval) init(ctx context.Context) error {
132-
// distributed mod is enabled
133-
switch t.LockVendor {
134-
case RedisLock:
135-
136-
//if given connection is null the use the built-in one
137-
if t.RedisConnector == nil {
138-
139-
log.Printf("Job %s started in distributed mode!", t.Name)
140-
141-
//if Redis host missed, use the default one
142-
if t.RedisHost == "" {
143-
t.RedisHost = "localhost:6379"
144-
}
145-
146-
locker.redisConnector = redis.NewClient(&redis.Options{
147-
Addr: t.RedisHost,
148-
Password: t.RedisPassword, // no password set
149-
DB: 0, // use default DB
150-
})
151-
152-
} else {
153-
// set the connection
154-
locker.redisConnector = t.RedisConnector
155-
}
156-
157-
//validate the connection
158-
if locker.redisConnector.Conn(ctx) == nil {
159-
return errors.New("`Redis Connection Failed!`")
160-
}
161-
162-
log.Printf("Job %s started in distributed mode by provided redis connection", t.Name)
163-
164-
case AwsDynamoDbLock:
165-
166-
// override the AWS profile credentials
167-
if aws.String(t.AwsDynamoDbEndpoint) == nil {
168-
// Initialize a session that the SDK will use to load
169-
// credentials from the shared credentials file ~/.aws/credentials
170-
// and region from the shared configuration file ~/.aws/config.
171-
sess := session.Must(session.NewSessionWithOptions(session.Options{
172-
SharedConfigState: session.SharedConfigEnable,
173-
}))
174-
// Create DynamoDB client
175-
locker.dynamoClient = dynamodb.New(sess)
176-
} else {
177-
178-
if aws.String(t.AwsDynamoDbRegion) == nil {
179-
return errors.New("`AwsDynamoDbRegion is missing (AWS Region)`")
180-
}
181-
182-
//setting StaticCredentials
183-
awsConfig := &aws.Config{
184-
Credentials: credentials.NewStaticCredentials(t.AwsDynamoDbAccessKeyID, t.AwsDynamoDbSecretAccessKey, t.AwsDynamoDbSessionToken),
185-
Region: aws.String(t.AwsDynamoDbRegion),
186-
Endpoint: aws.String(t.AwsDynamoDbEndpoint),
187-
}
188-
sess, err := session.NewSession(awsConfig)
189-
if err != nil {
190-
return err
191-
}
192-
// Create DynamoDB client
193-
locker.dynamoClient = dynamodb.New(sess)
194-
}
195-
196-
//sess, err := session.NewSession(&aws.Config{
197-
// Region: aws.String("us-west-2"),
198-
// Credentials: credentials.NewStaticCredentials(conf.AWS_ACCESS_KEY_ID, conf.AWS_SECRET_ACCESS_KEY, ""),
199-
//})
200-
201-
if locker.dynamoClient == nil {
202-
return errors.New("`DynamoDb Connection Failed!`")
203-
}
204-
205-
//check if table exist, if not create one
206-
tableInput := &dynamodb.CreateTableInput{
207-
AttributeDefinitions: []*dynamodb.AttributeDefinition{
208-
{
209-
AttributeName: aws.String("id"),
210-
AttributeType: aws.String("S"),
211-
},
212-
},
213-
KeySchema: []*dynamodb.KeySchemaElement{
214-
{
215-
AttributeName: aws.String("id"),
216-
KeyType: aws.String("HASH"),
217-
},
218-
},
219-
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
220-
ReadCapacityUnits: aws.Int64(10),
221-
WriteCapacityUnits: aws.Int64(10),
222-
},
223-
//TimeToLiveDescription: &dynamodb.TimeToLiveDescription{
224-
// AttributeName: aws.String("ttl"),
225-
// TimeToLiveStatus: aws.String("enable"),
226-
//},
227-
TableName: aws.String(Prefix),
228-
}
229-
230-
_, err := locker.dynamoClient.CreateTable(tableInput)
231-
if err != nil {
232-
log.Printf("Got error calling CreateTable: %s", err)
233-
} else {
234-
fmt.Println("Created the table", Prefix)
235-
}
236-
237-
default:
238-
239-
}
240-
return nil
241-
}
242-
243152
func (t *GoInterval) isNotLockThenLock(ctx context.Context) (bool, error) {
244-
245-
// distributed mod is enabled
246-
switch t.LockVendor {
247-
case RedisLock:
248-
249-
locked, err := locker.RedisLock(ctx, t.Name, t.Interval)
250-
251-
if err != nil {
252-
return false, err
253-
}
254-
return locked, nil
255-
256-
case AwsDynamoDbLock:
257-
258-
locked, err := locker.DynamoDbLock(ctx, t.Name, t.Interval)
259-
260-
if err != nil {
261-
return false, err
262-
}
263-
return locked, nil
264-
265-
default:
266-
267-
// no distributed lock
153+
//lock
154+
if t.LockVendor == SingleApp {
268155
return true, nil
156+
}
157+
locked, err := locker.Lock(ctx, t.Name, t.Interval)
269158

159+
if err != nil {
160+
log.Fatalf("err:%v", err)
161+
return false, err
270162
}
163+
return locked, nil
271164
}
272165

273166
func (t *GoInterval) UnLock(ctx context.Context) {
274167
//unlock
275-
switch t.LockVendor {
276-
case RedisLock:
277168

278-
err := locker.RedisUnlock(ctx, t.Name)
279-
if err != nil {
280-
return
281-
}
282-
283-
case AwsDynamoDbLock:
284-
285-
err := locker.DynamoDbUnlock(ctx, t.Name)
286-
if err != nil {
287-
return
288-
}
289-
290-
default:
169+
if t.LockVendor == SingleApp {
170+
return
171+
}
291172

292-
// no distributed lock
173+
err := locker.UnLock(ctx, t.Name)
174+
if err != nil {
293175
return
294176
}
295177
}

goIntervalLock.go

Lines changed: 0 additions & 104 deletions
This file was deleted.

0 commit comments

Comments
 (0)