Skip to content
Closed
42 changes: 21 additions & 21 deletions include/zim/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ namespace zim
efficientOrder
};

/** Get the maximum size of the cluster cache.
*
* @return The maximum memory size used the cluster cache.
*/
size_t LIBZIM_API getClusterCacheMaxSize();

/** Get the current size of the cluster cache.
*
* @return The current memory size used by the cluster cache.
*/
size_t LIBZIM_API getClusterCacheCurrentSize();

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param sizeInB The memory limit (in bytes) for the cluster cache.
*/
void LIBZIM_API setClusterCacheMaxSize(size_t sizeInB);

/**
* The Archive class to access content in a zim file.
*
Expand Down Expand Up @@ -534,27 +555,6 @@ namespace zim
*/
std::shared_ptr<FileImpl> getImpl() const { return m_impl; }

/** Get the maximum size of the cluster cache.
*
* @return The maximum number of clusters stored in the cache.
*/
size_t getClusterCacheMaxSize() const;

/** Get the current size of the cluster cache.
*
* @return The number of clusters currently stored in the cache.
*/
size_t getClusterCacheCurrentSize() const;

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param nbClusters The maximum number of clusters stored in the cache.
*/
void setClusterCacheMaxSize(size_t nbClusters);

/** Get the size of the dirent cache.
*
* @return The maximum number of dirents stored in the cache.
Expand Down
4 changes: 2 additions & 2 deletions meson_options.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
option('CLUSTER_CACHE_SIZE', type : 'string', value : '16',
description : 'set cluster cache size to number (default:16)')
option('CLUSTER_CACHE_SIZE', type : 'string', value : '536870912',
description : 'set cluster cache size to number (default:512MB)')
option('DIRENT_CACHE_SIZE', type : 'string', value : '512',
description : 'set dirent cache size to number (default:512)')
option('DIRENT_LOOKUP_CACHE_SIZE', type : 'string', value : '1024',
Expand Down
13 changes: 6 additions & 7 deletions src/archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,19 @@ namespace zim
return m_impl->hasNewNamespaceScheme();
}

size_t Archive::getClusterCacheMaxSize() const
size_t getClusterCacheMaxSize()
{
return m_impl->getClusterCacheMaxSize();
return getClusterCache().getMaxCost();
}

size_t Archive::getClusterCacheCurrentSize() const
size_t getClusterCacheCurrentSize()
{
return m_impl->getClusterCacheCurrentSize();
return getClusterCache().getCurrentCost();
}

void Archive::setClusterCacheMaxSize(size_t nbClusters)
void setClusterCacheMaxSize(size_t sizeInB)
{
m_impl->setClusterCacheMaxSize(nbClusters);
getClusterCache().setMaxCost(sizeInB);
}

size_t Archive::getDirentCacheMaxSize() const
Expand All @@ -534,7 +534,6 @@ namespace zim
m_impl->setDirentCacheMaxSize(nbDirents);
}


size_t Archive::getDirentLookupCacheMaxSize() const
{
return m_impl->getDirentLookupCacheMaxSize();
Expand Down
5 changes: 5 additions & 0 deletions src/buffer_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ zsize_t BufferReader::size() const
return source.size();
}

size_t BufferReader::getMemorySize() const
{
return source.size().v;
}

offset_t BufferReader::offset() const
{
return offset_t((offset_type)(static_cast<const void*>(source.data(offset_t(0)))));
Expand Down
1 change: 1 addition & 0 deletions src/buffer_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class LIBZIM_PRIVATE_API BufferReader : public Reader {
virtual ~BufferReader() {};

zsize_t size() const override;
size_t getMemorySize() const override;
offset_t offset() const override;

const Buffer get_buffer(offset_t offset, zsize_t size) const override;
Expand Down
45 changes: 42 additions & 3 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

#include "log.h"

#include "config.h"

log_define("zim.cluster")

#define log_debug1(e)
Expand Down Expand Up @@ -86,7 +84,8 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
Cluster::Cluster(std::unique_ptr<IStreamReader> reader_, Compression comp, bool isExtended)
: compression(comp),
isExtended(isExtended),
m_reader(std::move(reader_))
m_reader(std::move(reader_)),
m_memorySize(0)
{
if (isExtended) {
read_header<uint64_t>();
Expand Down Expand Up @@ -179,4 +178,44 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
}
}

// This function must return a constant size for a given cluster.
// This is important as we want to remove the same size that what we add when we remove
// the cluster from the cache.
// However, because of partial decompression, this size can change:
// - As we advance in the compression, we can create new blob readers in `m_blobReaders`
// - The stream itself may allocate memory.
// To solve this, we take the average and say a cluster's blob readers will half be created and
// so we assume a readers size of half the full uncompressed cluster data size.
// If cluster is not compressed, we never store its content (mmap is created on demand and not cached),
// so we use a size of 0 for the readers.
// It also appears that when we get the size of the stream, we reach a state where no
// futher allocation will be done by it. Probably because:
// - We already started to decompress the stream to read the offsets
// - Cluster data size is smaller than window size associated to compression level (?)
// We anyway check that and print a warning if this is not the case, hopping that user will create
// an issue allowing us for further analysis.
// Note:
// - No need to protect this method from concurent access as it will be called by the concurent_cache which will
// have a lock (on lru cache) to ensure only one thread access it in the same time.
size_t Cluster::getMemorySize() const {
if (!m_memorySize) {
auto offsets_size = sizeof(offset_t) * m_blobOffsets.size();
auto readers_size = 0;
if (isCompressed()) {
readers_size = m_blobOffsets.back().v / 2;
}
m_streamSize = m_reader->getMemorySize();
// Compression level define a huge window and make decompression stream allocate a huge memory to store it.
// However, the used memory will not be greater than the content itself, even if window is bigger.
// On linux (at least), the real used memory will be the actual memory used, not the one allocated.
// So, let's clamm the the stream size to the size of the content itself.
m_memorySize = offsets_size + readers_size + std::min<size_type>(m_streamSize, m_blobOffsets.back().v);
}
auto streamSize = m_reader->getMemorySize();
if (streamSize != m_streamSize) {
std::cerr << "WARNING: stream size have changed from " << m_streamSize << " to " << streamSize << std::endl;
std::cerr << "Please open an issue on https://github.com/openzim/libzim/issues with this message and the zim file you use" << std::endl;
}
return m_memorySize;
}
}
10 changes: 10 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ namespace zim

mutable std::mutex m_readerAccessMutex;
mutable BlobReaders m_blobReaders;
mutable size_t m_memorySize;
mutable size_t m_streamSize;


template<typename OFFSET_TYPE>
Expand All @@ -90,9 +92,17 @@ namespace zim
Blob getBlob(blob_index_t n) const;
Blob getBlob(blob_index_t n, offset_t offset, zsize_t size) const;

size_t getMemorySize() const;

static std::shared_ptr<Cluster> read(const Reader& zimReader, offset_t clusterOffset);
};

struct ClusterMemorySize {
static size_t cost(const std::shared_ptr<const Cluster>& cluster) {
return cluster->getMemorySize();
}
};

}

#endif // ZIM_CLUSTER_H
13 changes: 13 additions & 0 deletions src/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ void LZMA_INFO::stream_end_decode(stream_t* stream)
lzma_end(stream);
}

size_t LZMA_INFO::state_size(const stream_t& stream)
{
return lzma_memusage(&stream);
}


const std::string ZSTD_INFO::name = "zstd";

Expand Down Expand Up @@ -170,3 +175,11 @@ void ZSTD_INFO::stream_end_decode(stream_t* stream)
void ZSTD_INFO::stream_end_encode(stream_t* stream)
{
}

size_t ZSTD_INFO::state_size(const stream_t& stream) {
if (stream.decoder_stream) {
return ZSTD_sizeof_CStream(stream.encoder_stream);
} else {
return ZSTD_sizeof_DStream(stream.decoder_stream);
}
}
2 changes: 2 additions & 0 deletions src/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct LZMA_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static CompStatus stream_run(stream_t* stream, CompStep step);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down Expand Up @@ -94,6 +95,7 @@ struct LIBZIM_PRIVATE_API ZSTD_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static void stream_end_encode(stream_t* stream);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down
80 changes: 66 additions & 14 deletions src/concurrent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,38 @@

#include "lrucache.h"

#include <chrono>
#include <cstddef>
#include <future>
#include <mutex>

namespace zim
{

template<typename CostEstimation>
struct FutureToValueCostEstimation {
template<typename T>
static size_t cost(const std::shared_future<T>& future) {
// The future is the value in the cache.
// When calling getOrPut, if the key is not in the cache,
// we add a future and then we compute the value and set the future.
// But lrucache call us when we add the future, meaning before we have
// computed the value. If we wait here (or use future.get), we will dead lock
// as we need to exit before setting the value.
// So in this case, we return 0. `ConcurrentCache::getOrPut` will correctly increase
// the current cache size when it have an actual value.
// We still need to compute the size of the value if the future has a value as it
// is also use to decrease the cache size when the value is drop.
std::future_status status = future.wait_for(std::chrono::nanoseconds::zero());
if (status == std::future_status::ready) {
return CostEstimation::cost(future.get());
} else {
return 0;
}
}

Check notice on line 55 in src/concurrent_cache.h

View check run for this annotation

codefactor.io / CodeFactor

src/concurrent_cache.h#L55

Redundant blank line at the end of a code block should be deleted. (whitespace/blank_line)
};

/**
ConcurrentCache implements a concurrent thread-safe cache

Expand All @@ -39,16 +64,16 @@
safe, and, in case of a cache miss, will block until that element becomes
available.
*/
template <typename Key, typename Value>
class ConcurrentCache
template <typename Key, typename Value, typename CostEstimation>
class ConcurrentCache: private lru_cache<Key, std::shared_future<Value>, FutureToValueCostEstimation<CostEstimation>>
{
private: // types
typedef std::shared_future<Value> ValuePlaceholder;
typedef lru_cache<Key, ValuePlaceholder> Impl;
typedef lru_cache<Key, ValuePlaceholder, FutureToValueCostEstimation<CostEstimation>> Impl;

public: // types
explicit ConcurrentCache(size_t maxEntries)
: impl_(maxEntries)
explicit ConcurrentCache(size_t maxCost)
: Impl(maxCost)
{}

// Gets the entry corresponding to the given key. If the entry is not in the
Expand All @@ -65,11 +90,33 @@
{
std::promise<Value> valuePromise;
std::unique_lock<std::mutex> l(lock_);
const auto x = impl_.getOrPut(key, valuePromise.get_future().share());
auto shared_future = valuePromise.get_future().share();
const auto x = Impl::getOrPut(key, shared_future);
l.unlock();
if ( x.miss() ) {
try {
valuePromise.set_value(f());
auto cost = CostEstimation::cost(x.value().get());
// There is a small window when the valuePromise may be drop from lru cache after
// we set the value but before we increase the size of the cache.
// In this case we decrease the size of `cost` before increasing it.
// First of all it should be pretty rare as we have just put the future in the cache so it
// should not be the least used item.
// If it happens, this should not be a problem if current_size is bigger than `cost` (most of the time)
// For the really rare specific case of current cach size being lower than `cost` (if possible),
// `decreaseCost` will clamp the new size to 0.
{
std::unique_lock<std::mutex> l(lock_);
// There is a window when the shared_future is drop from the cache while we are computing the value.
// If this is the case, we readd the shared_future in the cache.
if (!Impl::exists(key)) {
// We don't have have to increase the cache as the future is already set, so the cost will be valid.
Impl::put(key, shared_future);
} else {
// We just have to increase the cost as we used 0 for unset future.
Impl::increaseCost(cost);
}
}
} catch (std::exception& e) {
drop(key);
throw;
Expand All @@ -82,26 +129,31 @@
bool drop(const Key& key)
{
std::unique_lock<std::mutex> l(lock_);
return impl_.drop(key);
return Impl::drop(key);
}

template<class F>
void dropAll(F f) {
std::unique_lock<std::mutex> l(lock_);
Impl::dropAll(f);
}

size_t getMaxSize() const {
size_t getMaxCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.getMaxSize();
return Impl::getMaxCost();
}

size_t getCurrentSize() const {
size_t getCurrentCost() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.size();
return Impl::cost();
}

void setMaxSize(size_t newSize) {
void setMaxCost(size_t newSize) {
std::unique_lock<std::mutex> l(lock_);
return impl_.setMaxSize(newSize);
return Impl::setMaxCost(newSize);
}

private: // data
Impl impl_;
mutable std::mutex lock_;
};

Expand Down
Loading
Loading