Skip to content

Commit 0ed1e02

Browse files
Started working on transprocessor and implementation of adaptable process for future upgrades
1 parent 2264c9d commit 0ed1e02

File tree

7 files changed

+144
-59
lines changed

7 files changed

+144
-59
lines changed

internal/scheduler/fcfs/fcfs.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package fcfs
22

33
import (
4+
"context"
45
"runtime"
56

7+
"go.uber.org/zap"
8+
69
"github.com/PythonHacker24/linux-acl-management-backend/config"
710
"github.com/PythonHacker24/linux-acl-management-backend/internal/session"
11+
"github.com/PythonHacker24/linux-acl-management-backend/internal/transprocessor"
812
)
913

1014
/* spawns a new FCFS scheduler */
11-
func NewFCFSScheduler(sm *session.Manager) *FCFSScheduler {
15+
func NewFCFSScheduler(sm *session.Manager, processor transprocessor.TransactionProcessor) *FCFSScheduler {
1216
/* calculate max workers */
1317
maxProcs := runtime.GOMAXPROCS(0)
1418
maxWorkers := config.BackendConfig.AppInfo.MaxWorkers
@@ -30,5 +34,61 @@ func NewFCFSScheduler(sm *session.Manager) *FCFSScheduler {
3034
curSessionManager: sm,
3135
maxWorkers: maxWorkers,
3236
semaphore: make(chan struct{}, maxWorkers),
37+
processor: processor,
3338
}
3439
}
40+
41+
/* run the fcfs scheduler with context */
42+
func (f *FCFSScheduler) Run(ctx context.Context) error {
43+
for {
44+
select {
45+
46+
/* check if ctx is done - catchable if default is not working hard (ideal scheduler) */
47+
case <-ctx.Done():
48+
return nil
49+
50+
/* in case default is working hard - ctx is passed here so it must attempt to quit */
51+
default:
52+
/* RULE: ctx is propogates all over the coming functions */
53+
54+
/* get next session in the queue (round robin manner) */
55+
curSession := f.curSessionManager.GetNextSession()
56+
if curSession == nil {
57+
/* might need a delay of 10 ms */
58+
continue
59+
}
60+
61+
/* check if transaction queue of the session is empty */
62+
curSession.Mutex.Lock()
63+
if curSession.TransactionQueue.Len() == 0 {
64+
curSession.Mutex.Unlock()
65+
continue
66+
}
67+
68+
/* get a transaction from the session to process */
69+
transaction := curSession.TransactionQueue.Remove(curSession.TransactionQueue.Front())
70+
curSession.Mutex.Unlock()
71+
72+
/* block if all workers are busy */
73+
f.semaphore <- struct{}{}
74+
75+
/* go routine is available to be spawned */
76+
go func(curSession *session.Session, transaction interface{}) {
77+
/* defer clearing the semaphore channel */
78+
defer func() { <-f.semaphore }()
79+
80+
/*
81+
process the transaction
82+
* processTransaction handles transaction processing completely
83+
* now it is responsible now responsible to execute it
84+
* role of scheduler in handling transactions ends here
85+
*/
86+
if err := f.processor.Process(ctx, curSession, transaction); err != nil {
87+
zap.L().Error("Faild to process transaction",
88+
zap.Error(err),
89+
)
90+
}
91+
}(curSession, transaction)
92+
}
93+
}
94+
}

internal/scheduler/fcfs/model.go

Lines changed: 2 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package fcfs
22

33
import (
4-
"context"
5-
64
"github.com/PythonHacker24/linux-acl-management-backend/internal/session"
7-
"go.uber.org/zap"
5+
"github.com/PythonHacker24/linux-acl-management-backend/internal/transprocessor"
86
)
97

108
/*
@@ -20,59 +18,5 @@ type FCFSScheduler struct {
2018

2119
/* for limiting spawning of goroutines */
2220
semaphore chan struct{}
23-
}
24-
25-
/* run the fcfs scheduler with context */
26-
func (f *FCFSScheduler) Run(ctx context.Context) error {
27-
for {
28-
select {
29-
30-
/* check if ctx is done - catchable if default is not working hard (ideal scheduler) */
31-
case <-ctx.Done():
32-
return nil
33-
34-
/* in case default is working hard - ctx is passed here so it must attempt to quit */
35-
default:
36-
/* RULE: ctx is propogates all over the coming functions */
37-
38-
/* get next session in the queue (round robin manner) */
39-
curSession := f.curSessionManager.GetNextSession()
40-
if curSession == nil {
41-
/* might need a delay of 10 ms */
42-
continue
43-
}
44-
45-
/* check if transaction queue of the session is empty */
46-
curSession.Mutex.Lock()
47-
if curSession.TransactionQueue.Len() == 0 {
48-
curSession.Mutex.Unlock()
49-
continue
50-
}
51-
52-
/* get a transaction from the session to process */
53-
transaction := curSession.TransactionQueue.Remove(curSession.TransactionQueue.Front())
54-
curSession.Mutex.Unlock()
55-
56-
/* block if all workers are busy */
57-
f.semaphore <- struct{}{}
58-
59-
/* go routine is available to be spawned */
60-
go func(curSession *session.Session, transaction interface{}) {
61-
/* defer clearing the semaphore channel */
62-
defer func() { <-f.semaphore }()
63-
64-
/*
65-
process the transaction
66-
* processTransaction handles transaction processing completely
67-
* now it is responsible now responsible to execute it
68-
* role of scheduler in handling transactions ends here
69-
*/
70-
if err := f.processTransaction(ctx, curSession, transaction); err != nil {
71-
zap.L().Error("Faild to process transaction",
72-
zap.Error(err),
73-
)
74-
}
75-
}(curSession, transaction)
76-
}
77-
}
21+
processor transprocessor.TransactionProcessor
7822
}

internal/scheduler/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
package scheduler
22

33
/* contains handlers related to monitoring the schedular */
4+
5+
/* TODO: Implementing a watchdog */

internal/session/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
package session
2+
3+
/* TODO: watchdog for session */

internal/transprocessor/model.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package transprocessor
2+
3+
/*
4+
transprocessor implements the transactions structure that whole project complies with
5+
*/
6+
7+
/* universal transaction structure */
8+
type Transaction struct {
9+
ID string
10+
/* command structure would be added later */
11+
}
12+
13+
/* permissions processor */
14+
type PermProcessor struct{}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package transprocessor
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/PythonHacker24/linux-acl-management-backend/internal/session"
8+
"go.uber.org/zap"
9+
)
10+
11+
/* instanciate new permission processor */
12+
func NewPermProcessor() *PermProcessor {
13+
return &PermProcessor{}
14+
}
15+
16+
/* processor for permissions manager */
17+
func (p *PermProcessor) Process(ctx context.Context, curSession *session.Session, tx interface{}) error {
18+
transaction, ok := tx.(*Transaction)
19+
if !ok {
20+
return fmt.Errorf("invalid transaction type")
21+
}
22+
23+
/* add complete information here + persistent logging in database */
24+
zap.L().Info("Processing Transaction",
25+
zap.String("user", curSession.Username),
26+
)
27+
28+
select {
29+
case <-ctx.Done():
30+
/*
31+
store this into persistent storage too!
32+
make sure database connections are closed after scheduler shutsdown
33+
*/
34+
zap.L().Warn("Transaction process stopped due to shutdown",
35+
zap.String("user", curSession.Username),
36+
)
37+
return ctx.Err()
38+
default:
39+
_ = transaction
40+
}
41+
42+
return nil
43+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package transprocessor
2+
3+
import (
4+
"context"
5+
6+
"github.com/PythonHacker24/linux-acl-management-backend/internal/session"
7+
)
8+
9+
/*
10+
the job of scheduler was to handle how transactions are allocated to executors
11+
transprocessor's job is to open the content of transactions and take care of them onwards
12+
so work of transactions was irrelevant to scheduler - transprocessor is responsible
13+
14+
also, this archirecture allows us to create mulitple processors in case we plan to extend in future
15+
*/
16+
17+
/* transaction processor - pluggable to any scheduler */
18+
type TransactionProcessor interface {
19+
Process(ctx context.Context, curSession *session.Session, transaction interface{}) error
20+
}

0 commit comments

Comments
 (0)