Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
def create_communicator(
communicator_name='flat', mpi_comm=None, dynamic=False, debug=False):
communicator_name='flat', mpi_comm=None, debug=False):

if mpi_comm is None:
import mpi4py.MPI
Expand All @@ -8,8 +8,7 @@ def create_communicator(
if communicator_name == 'flat':
from dlframeworks.chainer.communicators.kfac_communicators\
.flat_communicator import FlatCommunicator
return FlatCommunicator(
mpi_comm=mpi_comm, dynamic=dynamic, debug=debug)
return FlatCommunicator(mpi_comm, debug)
else:
raise ValueError(
'Unrecognized communicator: "{}"'.format(communicator_name))

This file was deleted.

99 changes: 99 additions & 0 deletions dlframeworks/chainer/communicators/kfac_communicators/_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import itertools

from dlframeworks.chainer.utils import create_mpi_print


def extract(fisher_blocks, indices, extractors):
arrays = []
for local_indices in indices:
if len(local_indices) == 0:
arrays.append([])
else:
local_arrays = []
for index in local_indices:
for extractor in extractors:
for array in extractor(fisher_blocks[index]):
local_arrays.append(array)
arrays.append(local_arrays)
return arrays


def extract_cov_emas(fisher_block):
ret = []
if fisher_block.cov_emas is not None:
for cov_ema in fisher_block.cov_emas:
ret.append(cov_ema)
return ret


def extract_grads(fisher_block):
ret = []
for _, param in sorted(fisher_block.link.namedparams()):
if param.grad is not None:
ret.append(param.grad)
return ret


def extract_kfgrads(fisher_block):
ret = []
for _, param in sorted(fisher_block.link.namedparams()):
if hasattr(param, 'kfgrad') and param.kfgrad is not None:
ret.append(param.kfgrad)
return ret


def get_nelems(arrays):
nelems = 0
for array in list(itertools.chain(*arrays)): # flatten arrays
nelems += array.size
return nelems


def get_sendcounts_and_displs(arrays):
sendcounts = []
displs = []
sendcount_offset = 0
for local_arrays in arrays:
sendcount = 0
for array in local_arrays:
sendcount += array.size
sendcounts.append(sendcount)
displs.append(sendcount_offset)
sendcount_offset += sendcount
return sendcounts, displs


def pack(arrays, gpu_buf, sizeof_dtype):
buf_offset = 0
for array in list(itertools.chain(*arrays)): # flatten arrays
nbytes = array.size * sizeof_dtype
gpu_buf.from_device(array, nbytes, buf_offset)
buf_offset += nbytes


def unpack(arrays, gpu_buf, sizeof_dtype):
buf_offset = 0
for array in list(itertools.chain(*arrays)): # flatten arrays
nbytes = array.size * sizeof_dtype
gpu_buf.to_device(array, nbytes, buf_offset)
buf_offset += nbytes


def allocate_kfgrads(fisher_blocks):
for fisher_block in fisher_blocks:
for _, param in sorted(fisher_block.link.namedparams()):
if param.grad is None:
continue
if not hasattr(param, 'kfgrad'):
kfgrad = param.grad.copy()
kfgrad.fill(0.)
setattr(param, 'kfgrad', kfgrad)


def print_debug_message(mpi_comm, arrays, prefix):
mpi_print = create_mpi_print(mpi_comm)
idx = 0
for array in list(itertools.chain(*arrays)): # flatten arrays
mpi_print('{} {} MEAN {}'.format(
prefix, idx, array.mean()))
idx += 1
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
from dlframeworks.chainer.communicators.kfac_communicators \
import kfac_communicator_base
from dlframeworks.chainer.communicators.kfac_communicators \
import _memory_utility
import _utility


class FlatCommunicator(kfac_communicator_base.KFACCommunicatorBase):

def __init__(self, mpi_comm, dynamic=False, debug=False):
super(FlatCommunicator, self).__init__(
mpi_comm, False, dynamic, debug)
def __init__(self, mpi_comm, debug=False):
super(FlatCommunicator, self).__init__(mpi_comm, debug)

# GPU buffers
self.gpu_buffer_a = DeviceMemory()
Expand All @@ -22,98 +21,113 @@ def __init__(self, mpi_comm, dynamic=False, debug=False):
self.mpi_dtype = mpi4py.MPI.FLOAT
self.sizeof_dtype = 4

def reduce_scatterv(self, model, covs, root=0):
def reduce_scatterv_grad(self, fisher_blocks, root=0):
"""Reduce and Scatterv grads and covs

1. Extract (by reference)
model, covs -> dictionary
1. Extract
grads, cov_emas -> arrays
2. Pack
dictionary -> GPU buffer A
arrays -> GPU buffer A
3. Reduce
GPU buffer A -> GPU buffer B
4. Scatterv
GPU buffer B -> GPU buffer A
5. Unpack
GPU buffer A -> dictionary
GPU buffer A -> arrays

"""
self.setup(model)
self.setup(fisher_blocks)
cuda_stream = chainer.cuda.Stream.null

dictionary = self.reduce_scatterv_extract(model, covs)
nelems = self.reduce_scatterv_get_nelems(dictionary)
# We extract cov_emas and grads from fisher_blocks
extractors = [_utility.extract_cov_emas, _utility.extract_grads]
arrays = _utility.extract(fisher_blocks, self.indices, extractors)

# Get total number of elements
nelems = _utility.get_nelems(arrays)
nbytes = nelems * self.sizeof_dtype

self.gpu_buffer_a.assign(nbytes)
self.gpu_buffer_b.assign(nbytes)

# Pack the elements in a single buffer, calculate sendcounts, and
# calculate displs
# Calculate sendcounts, and calculate displs
# - sendcounts: the number of elements to send to each process
# - displs: the displacements where each segment begins
sendcounts, displs = _memory_utility.reduce_scatterv_pack(
dictionary, self.divided_linknames, self.gpu_buffer_a,
self.sizeof_dtype)
sendcounts, displs = _utility.get_sendcounts_and_displs(arrays)

# Pack the elements in a single buffer
_utility.pack(arrays, self.gpu_buffer_a, self.sizeof_dtype)

# Buffers for Reduce
sendbuf = [self.gpu_buffer_a.buffer(nbytes), self.mpi_dtype]
recvbuf = [self.gpu_buffer_b.buffer(nbytes), self.mpi_dtype] if \
self.rank == root else None

if self.debug:
self.reduce_scatterv_debug(dictionary, 'BEFORE')
_utility.print_debug_message(self.mpi_comm, arrays,
'BEFORE REDUCE_SCATTERV')

# We must sync before communication
cuda_stream.synchronize()
self.mpi_comm.Reduce(sendbuf, recvbuf, root=root)

if not self.is_worker:
if not self.is_inv_worker:
return

# Buffers for Scatterv
nbytes_local = sendcounts[self.invcomm.rank] * self.sizeof_dtype
nbytes_local = sendcounts[self.inv_comm.rank] * self.sizeof_dtype
sendbuf = [self.gpu_buffer_b.buffer(nbytes), sendcounts, displs,
self.mpi_dtype] if self.rank == root else None
recvbuf = self.gpu_buffer_a.buffer(nbytes_local)

# We must sync before communication
cuda_stream.synchronize()
self.invcomm.mpi_comm.Scatterv(sendbuf, recvbuf, root=root)
self.inv_comm.mpi_comm.Scatterv(sendbuf, recvbuf, root=root)

# Unpack the all elements
_memory_utility.reduce_scatterv_unpack(
dictionary, self.divided_linknames[self.invcomm.rank],
self.gpu_buffer_a, self.sizeof_dtype)
_utility.unpack(arrays[self.inv_comm.rank], self.gpu_buffer_a,
self.sizeof_dtype)

if self.debug:
self.reduce_scatterv_debug(dictionary, 'AFTER')
_utility.print_debug_message(self.mpi_comm, arrays,
'AFTER REDUCE_SCATTERV')

def allgatherv(self, model):
def allgatherv_kfgrad(self, fisher_blocks):
"""Allgatherv kfgrads

1. Extract
kfgrads -> arrays
1. Pack
kfgrads -> GPU buffer A
arrays -> GPU buffer A
2. Allgatherv
GPU buffer A -> GPU buffer B
3. Unpack
GPU buffer B -> kfgrads
GPU buffer B -> arrays

"""
# Allocate memory space for recieving kfgrads
_utility.allocate_kfgrads(fisher_blocks)

cuda_stream = chainer.cuda.Stream.null

nelems = self.allgatherv_get_nelems(model)
# We extract kfgrads from fisher_blocks
extractors = [_utility.extract_kfgrads]
arrays = _utility.extract(fisher_blocks, self.indices, extractors)

# Get total number of elements
nelems = _utility.get_nelems(arrays)
nbytes = nelems * self.sizeof_dtype

self.gpu_buffer_a.assign(nbytes)
self.gpu_buffer_b.assign(nbytes)

# Pack the elements in a single buffer, calculate sendcounts, and
# calculate displs
# Calculate sendcounts, and calculate displs
# - sendcounts: the number of elements to send to each process
# - displs: the displacements where each segment begins
sendcounts, displs = _memory_utility.allgatherv_pack(
model, self.divided_linknames, self.gpu_buffer_a,
self.sizeof_dtype, self.rank)
sendcounts, displs = _utility.get_sendcounts_and_displs(arrays)

# Pack the elements in a single buffer
_utility.pack(arrays[self.rank], self.gpu_buffer_a, self.sizeof_dtype)

# Buffers for Allgatherv
nbytes_local = sendcounts[self.rank] * self.sizeof_dtype
Expand All @@ -122,15 +136,16 @@ def allgatherv(self, model):
self.mpi_dtype]

if self.debug:
self.allgatherv_debug(model, 'BEFORE')
_utility.print_debug_message(self.mpi_comm, arrays,
'BEFORE ALLGATHERV')

# We must sync before communication
cuda_stream.synchronize()
self.mpi_comm.Allgatherv(sendbuf, recvbuf)

# Unpack the all elements
_memory_utility.allgatherv_unpack(
model, self.linknames, self.gpu_buffer_b, self.sizeof_dtype)
_utility.unpack(arrays, self.gpu_buffer_b, self.sizeof_dtype)

if self.debug:
self.allgatherv_debug(model, 'AFTER')
_utility.print_debug_message(self.mpi_comm, arrays,
'AFTER ALLGATHERV')
Loading