Skip to content

Commit b6667be

Browse files
committed
TVar: initial global lock implementation.
1 parent 72615ee commit b6667be

File tree

4 files changed

+317
-0
lines changed

4 files changed

+317
-0
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
require 'concurrent/stoppable'
2323
require 'concurrent/supervisor'
2424
require 'concurrent/timer_task'
25+
require 'concurrent/tvar'
2526
require 'concurrent/utilities'
2627

2728
require 'concurrent/global_thread_pool'

lib/concurrent/tvar.rb

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
module Concurrent
2+
3+
ABORTED = Object.new
4+
5+
class TVar
6+
7+
def initialize(value)
8+
@value = value
9+
end
10+
11+
def value
12+
Concurrent::atomically do
13+
Transaction::current.read(self)
14+
end
15+
end
16+
17+
def value=(value)
18+
Concurrent::atomically do
19+
Transaction::current.write(self, value)
20+
end
21+
end
22+
23+
def unsafe_value
24+
@value
25+
end
26+
27+
def unsafe_value=(value)
28+
@value = value
29+
end
30+
31+
end
32+
33+
UndoLogEntry = Struct.new(:tvar, :value)
34+
35+
class Transaction
36+
37+
LOCK = Mutex.new
38+
39+
def initialize
40+
@undo_log = []
41+
42+
LOCK.lock
43+
end
44+
45+
def read(tvar)
46+
validate
47+
tvar.unsafe_value
48+
end
49+
50+
def write(tvar, value)
51+
@undo_log.push(UndoLogEntry.new(tvar, tvar.unsafe_value))
52+
tvar.unsafe_value = value
53+
end
54+
55+
def abort
56+
@undo_log.each do |entry|
57+
entry.tvar.unsafe_value = entry.value
58+
end
59+
60+
unlock
61+
end
62+
63+
def commit
64+
validate
65+
unlock
66+
true
67+
end
68+
69+
def validate
70+
end
71+
72+
def unlock
73+
LOCK.unlock
74+
end
75+
76+
def self.current
77+
Thread.current.thread_variable_get(:transaction)
78+
end
79+
80+
def self.current=(transaction)
81+
Thread.current.thread_variable_set(:transaction, transaction)
82+
end
83+
84+
end
85+
86+
class AbortException < Exception
87+
end
88+
89+
def atomically
90+
raise ArgumentError.new('no block given') unless block_given?
91+
92+
# Get the current transaction
93+
94+
transaction = Transaction::current
95+
96+
# Are we not already in a transaction (not nested)?
97+
98+
if transaction.nil?
99+
# New transaction
100+
101+
begin
102+
# Retry loop
103+
104+
loop do
105+
106+
# Create a new transaction
107+
108+
transaction = Transaction.new
109+
Transaction::current = transaction
110+
111+
# Run the block, aborting on exceptions
112+
113+
begin
114+
result = yield
115+
rescue AbortException => e
116+
transaction.abort
117+
result = ABORTED
118+
rescue => e
119+
transaction.abort
120+
throw e
121+
end
122+
# If we can commit, break out of the loop
123+
124+
if result != ABORTED
125+
if transaction.commit
126+
break result
127+
end
128+
end
129+
end
130+
ensure
131+
# Clear the current transaction
132+
133+
Transaction::current = nil
134+
end
135+
else
136+
# Nested transaction - flatten it and just run the block
137+
138+
yield
139+
end
140+
end
141+
142+
def abort_transaction
143+
raise AbortException.new
144+
end
145+
146+
module_function :atomically, :abort_transaction
147+
148+
end

md/tvar.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# TVars
2+
3+
`TVar` and `atomically` implement a transactional memory. A `TVar` is a single
4+
item container that always contains exactly one value. The `atomically` method
5+
allows you to modify a set of `TVar` objects with the guarantee that all of the
6+
updates are collectively atomic - they either all happen or none of them do -
7+
consistent - a `TVar` will never enter an illegal state - and isolated - atomic
8+
blocks never interfere with each other when they are running. You may recognise
9+
these properties from database transactions.
10+
11+
There are some very important and unusual semantics that you must be aware of:
12+
13+
* Most importantly, the block that you pass to `atomically` may be executed more
14+
than once. In most cases your code should be free of side-effects, except for
15+
via `TVar`.
16+
17+
* If an exception escapes an `atomically` block it will abort the transaction.
18+
19+
* It is undefined behaviour to use `callcc` or `Fiber` with `atomically`.
20+
21+
* If you create a new thread within an `atomically`, it will not be part of
22+
the transaction. Creating a thread counts as a side-effect.
23+
24+
We implement nested transactions by flattening.
25+
26+
We only support strong isolation if you use the API correctly. In order words,
27+
we do not support strong isolation.
28+
29+
See:
30+
31+
1. T. Harris, J. Larus, and R. Rajwar. Transactional Memory. Morgan & Claypool, second edition, 2010.

spec/concurrent/tvar_spec.rb

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
require 'spec_helper'
2+
3+
module Concurrent
4+
5+
describe TVar do
6+
7+
context '#initialize' do
8+
9+
it 'accepts an initial value' do
10+
t = TVar.new(14)
11+
t.value.should eq 14
12+
end
13+
14+
end
15+
16+
context '#value' do
17+
18+
it 'gets the value' do
19+
t = TVar.new(14)
20+
t.value.should eq 14
21+
end
22+
23+
end
24+
25+
context '#value=' do
26+
27+
it 'sets the value' do
28+
t = TVar.new(14)
29+
t.value = 2
30+
t.value.should eq 2
31+
end
32+
33+
end
34+
35+
end
36+
37+
describe '#atomically' do
38+
39+
it 'raises an exception when no block given' do
40+
expect { Concurrent::atomically }.to raise_error(ArgumentError)
41+
end
42+
43+
it 'retries on abort' do
44+
count = 0
45+
46+
Concurrent::atomically do
47+
if count == 0
48+
count = 1
49+
Concurrent::abort_transaction
50+
else
51+
count = 2
52+
end
53+
end
54+
55+
count.should eq 2
56+
end
57+
58+
it 'commits writes if the transaction succeeds' do
59+
t = TVar.new(0)
60+
61+
Concurrent::atomically do
62+
t.value = 1
63+
end
64+
65+
t.value.should eq 1
66+
end
67+
68+
it 'undoes writes if the transaction is aborted' do
69+
t = TVar.new(0)
70+
71+
count = 0
72+
73+
Concurrent::atomically do
74+
if count == 0
75+
t.value = 1
76+
count = 1
77+
Concurrent::abort_transaction
78+
end
79+
end
80+
81+
t.value.should eq 0
82+
end
83+
84+
it 'provides atomicity' do
85+
t1 = TVar.new(0)
86+
t2 = TVar.new(0)
87+
88+
count = 0
89+
90+
Concurrent::atomically do
91+
if count == 0
92+
count = 1
93+
t1.value = 1
94+
Concurrent::abort_transaction
95+
t2.value = 2
96+
end
97+
end
98+
99+
t1.value.should eq 0
100+
t2.value.should eq 0
101+
end
102+
103+
it 'provides isolation' do
104+
t = TVar.new(0)
105+
106+
Thread.new do
107+
Concurrent::atomically do
108+
t1.value = 1
109+
sleep(1)
110+
end
111+
end
112+
113+
sleep(0.5)
114+
115+
t.value.should eq 0
116+
end
117+
118+
it 'nests' do
119+
Concurrent::atomically do
120+
Concurrent::atomically do
121+
Concurrent::atomically do
122+
end
123+
end
124+
end
125+
end
126+
127+
end
128+
129+
describe '#abort_transaction' do
130+
131+
it 'raises an exception outside an #atomically block' do
132+
expect { Concurrent::abort_transaction }.to raise_error(Concurrent::AbortException)
133+
end
134+
135+
end
136+
137+
end

0 commit comments

Comments
 (0)