@@ -3,7 +3,10 @@ package servers
33import (
44 "encoding/json"
55 "fmt"
6+ "html"
67 "net/http"
8+ "path"
9+ "regexp"
710 "sync"
811
912 "github.com/getsumio/getsum/internal/config"
@@ -12,22 +15,32 @@ import (
1215 "github.com/getsumio/getsum/internal/status"
1316 . "github.com/getsumio/getsum/internal/supplier"
1417 "github.com/getsumio/getsum/internal/validation"
18+ "github.com/google/uuid"
1519)
1620
1721//server instance to run in server mode
1822type OnPremiseServer struct {
1923 StoragePath string
20- Supplier Supplier
21- mux sync. Mutex
24+ mux * sync. RWMutex
25+ suppliers map [ string ] Supplier
2226}
2327
2428var factory ISupplierFactory
2529
30+ const uuidPattern = "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[8|9|aA|bB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$"
31+
32+ var regex * regexp.Regexp = regexp .MustCompile (uuidPattern )
33+
34+ const default_capacity = 250
35+ const threshold = 150
36+
2637//start server in given config listen address and port or tls details
2738//TODO add interface support
2839func (s * OnPremiseServer ) Start (config * config.Config ) error {
2940 logger .Level = logger .LevelInfo
3041 factory = new (SupplierFactory )
42+ s .suppliers = make (map [string ]Supplier )
43+ s .mux = & sync.RWMutex {}
3144 http .HandleFunc ("/" , s .handle )
3245 listenAddress := fmt .Sprintf ("%s:%d" , * config .Listen , * config .Port )
3346 var err error
@@ -45,14 +58,18 @@ func (s *OnPremiseServer) Start(config *config.Config) error {
4558
4659//get executed to reach status
4760//so collect status if there is any runner
48- func handleGet (s * OnPremiseServer , w http.ResponseWriter , r * http.Request ) {
61+ func handleGet (s * OnPremiseServer , w http.ResponseWriter , r * http.Request , id string ) {
4962 //check if any runner
50- if s .Supplier == nil {
63+ s .mux .RLock ()
64+ defer s .mux .RUnlock ()
65+ logger .Info ("Checking if there is a process with id : %s" , id )
66+ supplier , ok := s .suppliers [id ]
67+ if ! ok {
5168 handleError ("There is no running process" , w )
5269 return
5370 }
5471 //collect status and return
55- stat := s . Supplier .Status ()
72+ stat := supplier .Status ()
5673 status , err := json .Marshal (stat )
5774 if err != nil {
5875 handleError ("System can not parse given status %s" , w , err .Error ())
@@ -64,14 +81,8 @@ func handleGet(s *OnPremiseServer, w http.ResponseWriter, r *http.Request) {
6481
6582//post executed to Run a new calculation
6683func handlePost (s * OnPremiseServer , w http.ResponseWriter , r * http.Request ) {
67- //check if any runner
68- if s .Supplier != nil {
69- stat := s .Supplier .Status ()
70- if stat .Type <= status .RUNNING {
71- handleError ("Server already running another process" , w )
72- return
73- }
74- }
84+ s .mux .Lock ()
85+ defer s .mux .Unlock ()
7586 //read the config from request
7687 jsonDecoder := json .NewDecoder (r .Body )
7788 config := & Config {}
@@ -85,49 +96,73 @@ func handlePost(s *OnPremiseServer, w http.ResponseWriter, r *http.Request) {
8596 if err != nil {
8697 handleError (err .Error (), w )
8798 return
88-
8999 }
90100
91101 //get supplier instance, only single algo supported on server mode
92102 var algorithm = ValueOf (& config .Algorithm [0 ])
93103 config .Dir = & s .StoragePath
94- s . Supplier , err = factory .GetSupplierByAlgo (config , & algorithm )
104+ supplier , err : = factory .GetSupplierByAlgo (config , & algorithm )
95105 //something went wrong, TODO add error handler
96106 if err != nil {
97107 handleError ("Can not create algorithm runner instance: " + err .Error (), w )
98108 return
99109 }
100- go s .Supplier .Run ()
101- handleGet (s , w , r )
110+ processId := uuid .New ().String ()
111+ stat := supplier .Status ()
112+ stat .Type = status .STARTED
113+ stat .Value = processId
114+ jsonStat , err := json .Marshal (stat )
115+ if err != nil {
116+ handleError ("Can not parse status" + err .Error (), w )
117+ return
118+ }
119+ w .Write (jsonStat )
120+ go supplier .Run ()
121+ s .suppliers [processId ] = supplier
102122 logger .Info ("Process started" )
123+ s .ensureCapacity ()
103124}
104125
105126//terminates running calculation
106- func handleDelete (s * OnPremiseServer , w http.ResponseWriter , r * http.Request ) {
107- if s .Supplier == nil {
127+ func handleDelete (s * OnPremiseServer , w http.ResponseWriter , r * http.Request , id string ) {
128+ s .mux .Lock ()
129+ defer s .mux .Unlock ()
130+ supplier , ok := s .suppliers [id ]
131+ if ! ok {
108132 handleError ("There is no running process" , w )
109133 return
110134 }
111- s .Supplier .Terminate ()
112- s .Supplier .Delete ()
135+
136+ supplier .Terminate ()
137+ supplier .Delete ()
113138 w .WriteHeader (http .StatusOK )
114- s . Supplier = nil
139+ delete ( s . suppliers , id )
115140 logger .Info ("Process terminated" )
116141}
117142
118143//delegates GET POST DELETE main server listener
119144func (s * OnPremiseServer ) handle (w http.ResponseWriter , r * http.Request ) {
120- s .mux .Lock ()
121- defer s .mux .Unlock ()
122-
123145 logger .LogRequest (r )
124146 switch r .Method {
125147 case "GET" :
126- handleGet (s , w , r )
148+ requestId := path .Base (html .EscapeString (r .URL .Path ))
149+ if ! regex .MatchString (requestId ) {
150+ logger .Error ("Request is not a valid request id! %s" , requestId )
151+ w .WriteHeader (http .StatusNotFound )
152+ return
153+ }
154+ handleGet (s , w , r , requestId )
127155 case "POST" :
128156 handlePost (s , w , r )
129157 case "DELETE" :
130- handleDelete (s , w , r )
158+ requestId := path .Base (html .EscapeString (r .URL .Path ))
159+ if ! regex .MatchString (requestId ) {
160+ logger .Error ("Request is not a valid request id! %s" , requestId )
161+ w .WriteHeader (http .StatusNotFound )
162+ return
163+ }
164+
165+ handleDelete (s , w , r , requestId )
131166 default :
132167 w .WriteHeader (http .StatusMethodNotAllowed )
133168 logger .Error ("Can not handle request method rejecting request" )
@@ -147,3 +182,18 @@ func handleError(message string, w http.ResponseWriter, params ...interface{}) {
147182 w .Write (jsonStatus )
148183 return
149184}
185+
186+ func (s * OnPremiseServer ) ensureCapacity () {
187+ if len (s .suppliers ) >= default_capacity {
188+ var i int = 0
189+ for k := range s .suppliers {
190+ if i >= threshold {
191+ supplier := s .suppliers [k ]
192+ if supplier .Status ().Type >= status .RUNNING {
193+ delete (s .suppliers , k )
194+ }
195+ }
196+ i ++
197+ }
198+ }
199+ }
0 commit comments