File tree Expand file tree Collapse file tree 1 file changed +19
-5
lines changed
lib/redis_client/cluster/concurrent_worker Expand file tree Collapse file tree 1 file changed +19
-5
lines changed Original file line number Diff line number Diff line change 1
1
# frozen_string_literal: true
2
2
3
+ require 'redis_client/pid_cache'
4
+
3
5
class RedisClient
4
6
class Cluster
5
7
module ConcurrentWorker
6
- # This class is just an experimental implementation. There are some bugs for race condition.
8
+ # This class is just an experimental implementation.
7
9
# Ruby VM allocates 1 MB memory as a stack for a thread.
8
10
# It is a fixed size but we can modify the size with some environment variables.
9
11
# So it consumes memory 1 MB multiplied a number of workers.
10
12
class Pooled
11
13
def initialize
12
- @q = Queue . new
13
- size = ::RedisClient ::Cluster ::ConcurrentWorker ::MAX_WORKERS
14
- size = size . positive? ? size : 5
15
- @workers = Array . new ( size )
14
+ setup
16
15
end
17
16
18
17
def new_group ( size :)
19
18
raise ArgumentError , "size must be positive: #{ size } given" unless size . positive?
20
19
20
+ reset if @pid != ::RedisClient ::PIDCache . pid
21
21
ensure_workers if @workers . first . nil?
22
22
::RedisClient ::Cluster ::ConcurrentWorker ::Group . new ( worker : self , size : size )
23
23
end
@@ -31,6 +31,7 @@ def close
31
31
@workers . each { |t | t &.exit }
32
32
@workers . clear
33
33
@q . close
34
+ @pid = nil
34
35
nil
35
36
end
36
37
@@ -47,6 +48,19 @@ def spawn_worker
47
48
loop { q . pop . exec }
48
49
end
49
50
end
51
+
52
+ def setup
53
+ @q = Queue . new
54
+ size = ::RedisClient ::Cluster ::ConcurrentWorker ::MAX_WORKERS
55
+ size = size . positive? ? size : 5
56
+ @workers = Array . new ( size )
57
+ @pid = ::RedisClient ::PIDCache . pid
58
+ end
59
+
60
+ def reset
61
+ close
62
+ setup
63
+ end
50
64
end
51
65
end
52
66
end
You can’t perform that action at this time.
0 commit comments