Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions blocks/bricks/recurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from blocks.bricks import Initializable, Logistic, Tanh, Linear
from blocks.bricks.base import Application, application, Brick, lazy
from blocks.initialization import NdarrayInitialization
from blocks.roles import add_role, WEIGHT, INITIAL_STATE
from blocks.roles import add_role, WEIGHT, BIAS, INITIAL_STATE
from blocks.utils import (pack, shared_floatx_nans, shared_floatx_zeros,
dict_union, dict_subset, is_shared_variable)
from blocks.bricks.parallel import Fork
Expand Down Expand Up @@ -412,10 +412,20 @@ def _allocate(self):
self.W_state, self.W_cell_to_in, self.W_cell_to_forget,
self.W_cell_to_out, self.initial_state_, self.initial_cells]

if self.use_bias:
self.b_cell_to_forget = shared_floatx_nans((self.dim,),
name='b_cell_to_forget')
add_role(self.b_cell_to_forget, BIAS)
self.parameters.append(self.b_cell_to_forget)

def _initialize(self):
for weights in self.parameters[:4]:
self.weights_init.initialize(weights, self.rng)

if self.use_bias:
for biases in self.parameters[-1:]:
self.biases_init.initialize(biases, self.rng)

@recurrent(sequences=['inputs', 'mask'], states=['states', 'cells'],
contexts=[], outputs=['states', 'cells'])
def apply(self, inputs, states, cells, mask=None):
Expand Down Expand Up @@ -458,8 +468,12 @@ def slice_last(x, no):
activation = tensor.dot(states, self.W_state) + inputs
in_gate = tensor.nnet.sigmoid(slice_last(activation, 0) +
cells * self.W_cell_to_in)
forget_gate = tensor.nnet.sigmoid(slice_last(activation, 1) +
cells * self.W_cell_to_forget)

forget_gate = slice_last(activation, 1) + cells * self.W_cell_to_forget
if self.use_bias:
forget_gate += self.b_cell_to_forget
forget_gate = tensor.nnet.sigmoid(forget_gate)

next_cells = (forget_gate * cells +
in_gate * nonlinearity(slice_last(activation, 2)))
out_gate = tensor.nnet.sigmoid(slice_last(activation, 3) +
Expand Down
102 changes: 95 additions & 7 deletions tests/bricks/test_recurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,53 @@ def test_many_steps(self):

class TestLSTM(unittest.TestCase):
def setUp(self):
self.lstm = LSTM(dim=3, weights_init=Constant(2),
biases_init=Constant(0))
self.lstm.initialize()
self.lstm_with_bias = LSTM(dim=3, weights_init=Constant(2),
biases_init=Constant(1))
self.lstm_without_bias = LSTM(dim=3, weights_init=Constant(2),
biases_init=Constant(0), use_bias=False)
self.lstm_with_bias.initialize()
self.lstm_without_bias.initialize()

def test_one_step_with_bias(self):
h0 = tensor.matrix('h0')
c0 = tensor.matrix('c0')
x = tensor.matrix('x')
h1, c1 = self.lstm_with_bias.apply(x, h0, c0, iterate=False)
next_h = theano.function(inputs=[x, h0, c0], outputs=[h1])

def test_one_step(self):
h0_val = 0.1 * numpy.array([[1, 1, 0], [0, 1, 1]],
dtype=theano.config.floatX)
c0_val = 0.1 * numpy.array([[1, 1, 0], [0, 1, 1]],
dtype=theano.config.floatX)
x_val = 0.1 * numpy.array([range(12), range(12, 24)],
dtype=theano.config.floatX)
W_state_val = 2 * numpy.ones((3, 12), dtype=theano.config.floatX)
W_cell_to_in = 2 * numpy.ones((3,), dtype=theano.config.floatX)
W_cell_to_out = 2 * numpy.ones((3,), dtype=theano.config.floatX)
W_cell_to_forget = 2 * numpy.ones((3,), dtype=theano.config.floatX)
b_cell_to_forget = 1 * numpy.ones((3,), dtype=theano.config.floatX)

# omitting biases because they are zero
activation = numpy.dot(h0_val, W_state_val) + x_val

def sigmoid(x):
return 1. / (1. + numpy.exp(-x))

i_t = sigmoid(activation[:, :3] + c0_val * W_cell_to_in)
f_t = sigmoid(activation[:, 3:6] + c0_val * W_cell_to_forget +
b_cell_to_forget)
next_cells = f_t * c0_val + i_t * numpy.tanh(activation[:, 6:9])
o_t = sigmoid(activation[:, 9:12] +
next_cells * W_cell_to_out)
h1_val = o_t * numpy.tanh(next_cells)
assert_allclose(h1_val, next_h(x_val, h0_val, c0_val)[0],
rtol=1e-6)

def test_one_step_without_bias(self):
h0 = tensor.matrix('h0')
c0 = tensor.matrix('c0')
x = tensor.matrix('x')
h1, c1 = self.lstm.apply(x, h0, c0, iterate=False)
h1, c1 = self.lstm_without_bias.apply(x, h0, c0, iterate=False)
next_h = theano.function(inputs=[x, h0, c0], outputs=[h1])

h0_val = 0.1 * numpy.array([[1, 1, 0], [0, 1, 1]],
Expand Down Expand Up @@ -182,10 +220,59 @@ def sigmoid(x):
assert_allclose(h1_val, next_h(x_val, h0_val, c0_val)[0],
rtol=1e-6)

def test_many_steps(self):
def test_many_steps_with_bias(self):
x = tensor.tensor3('x')
mask = tensor.matrix('mask')
h, c = self.lstm_with_bias.apply(x, mask=mask, iterate=True)
calc_h = theano.function(inputs=[x, mask], outputs=[h])

x_val = (0.1 * numpy.asarray(
list(itertools.islice(itertools.permutations(range(12)), 0, 24)),
dtype=theano.config.floatX))
x_val = numpy.ones((24, 4, 12),
dtype=theano.config.floatX) * x_val[:, None, :]
mask_val = numpy.ones((24, 4), dtype=theano.config.floatX)
mask_val[12:24, 3] = 0
h_val = numpy.zeros((25, 4, 3), dtype=theano.config.floatX)
c_val = numpy.zeros((25, 4, 3), dtype=theano.config.floatX)
W_state_val = 2 * numpy.ones((3, 12), dtype=theano.config.floatX)
W_cell_to_in = 2 * numpy.ones((3,), dtype=theano.config.floatX)
W_cell_to_out = 2 * numpy.ones((3,), dtype=theano.config.floatX)
W_cell_to_forget = 2 * numpy.ones((3,), dtype=theano.config.floatX)
b_cell_to_forget = 1 * numpy.ones((3,), dtype=theano.config.floatX)

def sigmoid(x):
return 1. / (1. + numpy.exp(-x))

for i in range(1, 25):
activation = numpy.dot(h_val[i-1], W_state_val) + x_val[i-1]
i_t = sigmoid(activation[:, :3] + c_val[i-1] * W_cell_to_in)
f_t = sigmoid(activation[:, 3:6] + c_val[i-1] * W_cell_to_forget +
b_cell_to_forget)
c_val[i] = f_t * c_val[i-1] + i_t * numpy.tanh(activation[:, 6:9])
o_t = sigmoid(activation[:, 9:12] +
c_val[i] * W_cell_to_out)
h_val[i] = o_t * numpy.tanh(c_val[i])
h_val[i] = (mask_val[i - 1, :, None] * h_val[i] +
(1 - mask_val[i - 1, :, None]) * h_val[i - 1])
c_val[i] = (mask_val[i - 1, :, None] * c_val[i] +
(1 - mask_val[i - 1, :, None]) * c_val[i - 1])

h_val = h_val[1:]
assert_allclose(h_val, calc_h(x_val, mask_val)[0], rtol=1e-04)

# Also test that initial state is a parameter
initial1, initial2 = VariableFilter(roles=[INITIAL_STATE])(
ComputationGraph(h))
assert is_shared_variable(initial1)
assert is_shared_variable(initial2)
assert {initial1.name, initial2.name} == {
'initial_state', 'initial_cells'}

def test_many_steps_without_bias(self):
x = tensor.tensor3('x')
mask = tensor.matrix('mask')
h, c = self.lstm.apply(x, mask=mask, iterate=True)
h, c = self.lstm_without_bias.apply(x, mask=mask, iterate=True)
calc_h = theano.function(inputs=[x, mask], outputs=[h])

x_val = (0.1 * numpy.asarray(
Expand All @@ -209,6 +296,7 @@ def sigmoid(x):
activation = numpy.dot(h_val[i-1], W_state_val) + x_val[i-1]
i_t = sigmoid(activation[:, :3] + c_val[i-1] * W_cell_to_in)
f_t = sigmoid(activation[:, 3:6] + c_val[i-1] * W_cell_to_forget)

c_val[i] = f_t * c_val[i-1] + i_t * numpy.tanh(activation[:, 6:9])
o_t = sigmoid(activation[:, 9:12] +
c_val[i] * W_cell_to_out)
Expand Down