diff --git a/README.md b/README.md index 7b01d3a..68618eb 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,15 @@ ## WHAT? -A set of state based CRDTs implemented in Erlang and on the paper - +A set of state-based CRDTs implemented in Erlang, and based on the paper - [A Comprehensive study of Convergent and Commutative Replicated Data Types] (http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf) - which you may find an interesting read. -### What happend to riak_dt, the database? +### What happened to riak_dt, the database? Riak is getting CRDT support built in, so we've archived the old riak_dt in the branch `prototype`. No further work will be done on -it. This repo is now a resuable library of Quickcheck tested +it. This repo is now a reusable library of QuickCheck tested implementations of CRDTs. diff --git a/include/riak_dt_tags.hrl b/include/riak_dt_tags.hrl index 9f04f10..12693ab 100644 --- a/include/riak_dt_tags.hrl +++ b/include/riak_dt_tags.hrl @@ -30,6 +30,7 @@ -define(DT_GSET_TAG, 82). -define(DT_ORSET_TAG, 76). -define(DT_ORSWOT_TAG, 75). +-define(DT_LWWSET_TAG, 78). %% Maps -define(DT_MAP_TAG, 77). diff --git a/src/riak_dt.app.src b/src/riak_dt.app.src index 09146a0..4e0a847 100644 --- a/src/riak_dt.app.src +++ b/src/riak_dt.app.src @@ -1,7 +1,7 @@ %% -*- erlang -*- {application, riak_dt, [ - {description, ""}, + {description, "riak CRDT datatypes"}, {vsn, git}, {registered, []}, {applications, [ @@ -14,5 +14,8 @@ %% indicates the level of compression. Higher number means more %% compression, but more time to compress. In tests so far 1 has %% been enough for CRDTs - {env, [{binary_compression, 1}]} + {env, [{binary_compression, 1}]}, + {maintainers, ["Basho", "Heinz N. Gies"]}, + {licenses, ["Apache"]}, + {links, [{"Github", "https://github.com/basho/riak_dt"}]} ]}. diff --git a/src/riak_dt_lwwset.erl b/src/riak_dt_lwwset.erl new file mode 100644 index 0000000..10f5039 --- /dev/null +++ b/src/riak_dt_lwwset.erl @@ -0,0 +1,287 @@ +%% ------------------------------------------------------------------- +%% +%% riak_dt_lwwset: LWW-Element-Set +%% +%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Erlang DT implemntation of Roshi's LWW-Element-Set +%% +%% +%% @reference Marc Shapiro, Nuno Preguiça, Carlos Baquero, Marek +%% Zawirski (2011) A comprehensive study of Convergent and Commutative +%% Replicated Data Types. http://hal.upmc.fr/inria-00555588/ +%% +%% @reference Roshi, https://github.com/soundcloud/roshi +%% +%% @end +-module(riak_dt_lwwset). + +-behaviour(riak_dt). + +-ifdef(EQC). +-include_lib("eqc/include/eqc.hrl"). +-define(QC_OUT(P), + eqc:on_output(fun(Str, Args) -> + io:format(user, Str, Args) end, P)). +-define(NUMTESTS, 1000). +-endif. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([new/0, value/1, value/2]). +-export([update/3, update/4, merge/2, equal/2]). +-export([to_binary/1, from_binary/1]). +-export([to_binary/2]). +-export([stats/1, stat/2]). +-export([parent_clock/2]). +-export([to_version/2]). + +%% EQC API +-ifdef(EQC). +-export([gen_op/0, gen_op/1, update_expected/3, eqc_state_value/1]). +-export([init_state/0, generate/0, size/1]). + +-endif. + +-define(ADD, 1). +-define(REM, 0). + +-export_type([lwwset/0, lwwset_op/0, binary_lwwset/0]). + +-type lwwset() :: dict(member(), {ts(), status()}). + +-type binary_lwwset() :: binary(). %% A binary that from_binary/1 will operate on. + +-type lwwset_op() :: {add, member(), ts()} | {remove, member(), ts()}. + +-type member() :: term(). +-type ts() :: pos_integer(). +-type status() :: add() | remove(). +-type add() :: ?ADD. +-type remove() :: ?REM. + +-ifdef(namespaced_types). +-type dict(A, B) :: dict:dict(A, B). +-else. +-type dict(_A, _B) :: dict(). +-endif. + +-ifdef(EQC). +-define(DICT, orddict). +-else. +-define(DICT, dict). +-endif. + +-spec new() -> lwwset(). +new() -> + ?DICT:new(). + +-spec parent_clock(riak_dt_vclock:vclock(), lwwset()) -> lwwset(). +parent_clock(_Clock, LWWSet) -> + LWWSet. + +-spec value(lwwset()) -> [member()]. +value(LWWSet) -> + [K || {K, {_TS, Status}} <- ?DICT:to_list(LWWSet), Status == 1]. + +value(size, LWWSet) -> + length(value(LWWSet)); +value({contains, Elem}, LWWSet) -> + lists:member(Elem, value(LWWSet)). + +-spec update(lwwset_op(), riak_dt:actor() | riak_dt:dot(), lwwset()) -> {ok, lwwset()}. +update({add, Elem, TS}, _Actor, LWWSet) -> + {ok, add_elem(Elem, TS, LWWSet)}; +update({remove, Elem, TS}, _Actor, LWWSet) -> + {ok, remove_elem(Elem, TS, LWWSet)}. + +update(Op, Actor, Set, _Ctx) -> + update(Op, Actor, Set). + +%% Private +-spec add_elem(member(), ts(), lwwset()) -> lwwset(). +add_elem(Elem, TS, LWWSet) -> + case ?DICT:find(Elem, LWWSet) of + error -> + ?DICT:store(Elem, {TS, ?ADD}, LWWSet); + {ok, {TS, ?REM}} -> + ?DICT:store(Elem, {TS, ?ADD}, LWWSet); + {ok, {TS0, _}} when TS0 < TS -> + ?DICT:store(Elem, {TS, ?ADD}, LWWSet); + _ -> + LWWSet + end. + +%% @doc warning, allows doomstoning. +-spec remove_elem(member(), ts(), lwwset()) -> lwwset(). +remove_elem(Elem, TS, LWWSet) -> + case ?DICT:find(Elem, LWWSet) of + error -> + ?DICT:store(Elem, {TS, ?REM}, LWWSet); + {ok, {TS, ?ADD}} -> + LWWSet; + {ok, {TS0, _}} when TS0 < TS -> + ?DICT:store(Elem, {TS, ?REM}, LWWSet); + _ -> + LWWSet + end. + +-spec merge(lwwset(), lwwset()) -> lwwset(). +merge(LWWSet, LWWSet) -> + LWWSet; +merge(LWWSet1, LWWSet2) -> + ?DICT:merge(fun lww/3, LWWSet1, LWWSet2). + +lww(_Key, {TS, ?ADD}, {TS, ?REM}) -> + {TS, ?ADD}; +lww(_Key, {TS, ?REM}, {TS, ?ADD}) -> + {TS, ?ADD}; +lww(_Key, {TS, Op}, {TS, Op}) -> + {TS, Op}; +lww(_Key, {TS1, _}=V1, {TS2, _}) when TS1 > TS2 -> + V1; +lww(_Key, {TS1, _}, {TS2, _}=V2) when TS1 < TS2 -> + V2. + +-spec equal(lwwset(), lwwset()) -> boolean(). +equal(LWWSet1, LWWSet2) -> + LWWSet1 == LWWSet2. + +-spec stats(lwwset()) -> [{atom(), number()}]. +stats(LWWSet) -> + [{S, stat(S, LWWSet)} || S <- [element_count]]. + +-spec stat(atom(), lwwset()) -> number() | undefined. +stat(element_count, LWWSet) -> + ?DICT:size(LWWSet); +stat(_,_) -> undefined. + +-include("riak_dt_tags.hrl"). +-define(TAG, ?DT_LWWSET_TAG). +-define(V1_VERS, 1). + +%% @doc returns a binary representation of the provided +%% `orswot()'. The resulting binary is tagged and versioned for ease +%% of future upgrade. Calling `from_binary/1' with the result of this +%% function will return the original set. Use the application env var +%% `binary_compression' to turn t2b compression on (`true') and off +%% (`false') +%% +%% @see `from_binary/1' +-spec to_binary(lwwset()) -> binary_lwwset(). +to_binary(S) -> + {ok, B} = to_binary(?V1_VERS, S), + B. + +%% @doc encode set to target version. The first argument is the target +%% binary type. +-spec to_binary(Vers :: pos_integer(), lwwset()) -> {ok, binary_lwwset()} | ?UNSUPPORTED_VERSION. +to_binary(?V1_VERS, S) -> + {ok, <>}; +to_binary(Vers, _S) -> + ?UNSUPPORTED_VERSION(Vers). + +%% @doc When the argument is a `binary_lwwset()' produced by +%% `to_binary/1' will return the original `lwwset()'. +%% +%% @see `to_binary/1' +-spec from_binary(binary_lwwset()) -> {ok, lwwset()} | ?UNSUPPORTED_VERSION | ?INVALID_BINARY. +from_binary(<>) -> + {ok, riak_dt:from_binary(B)}; +from_binary(<>) -> + ?UNSUPPORTED_VERSION(Vers); +from_binary(_B) -> + ?INVALID_BINARY. + +-spec to_version(pos_integer(), lwwset()) -> lwwset(). +to_version(_Version, Set) -> + Set. + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). + +-ifdef(EQC). + +bin_roundtrip_test_() -> + crdt_statem_eqc:run_binary_rt(?MODULE, ?NUMTESTS). + +eqc_value_test_() -> + crdt_statem_eqc:run(?MODULE, ?NUMTESTS). + +generate() -> + new(). + +size(Set) -> + [{element_count, Cnt}] = stats(Set), + Cnt. + +%% EQC generator +gen_op() -> + ?SIZED(Size, gen_op(Size)). + +gen_op(_Size) -> + oneof([{add, int(), nat()}, {remove, int(), nat()}]). + +init_state() -> + {orddict:new(), orddict:new()}. + +update_expected(_ID, {add, Elem, TS}, {A0, R}) -> + A = update_element(A0, Elem, TS), + {A, R}; +update_expected(_ID, {remove, Elem, TS}, {A, R0}) -> + R = update_element(R0, Elem, TS), + {A, R}; +update_expected(_, _, S) -> + S. + +update_element(Dict, Elem, TS) -> + orddict:update(Elem, fun(T) when T >= TS-> T; + (_T) -> TS end, + TS, + Dict). + +eqc_state_value({A, R}) -> + orddict:fold(fun(Elem, TS, Acc) -> + case orddict:find(Elem, R) of + error -> + [Elem | Acc]; + {ok, T} when T > TS -> + Acc; + _ -> + [Elem | Acc] + end + end, + [], + A). +-endif. + +stat_test() -> + Set = new(), + {ok, Set1} = update({add, <<"foo">>, 1}, 1, Set), + {ok, Set2} = update({add, <<"foo">>, 2}, 2, Set1), + {ok, Set3} = update({add, <<"bar">>, 3}, 3, Set2), + {ok, Set4} = update({remove, <<"foo">>, 4}, 1, Set3), + ?assertEqual([{element_count, 2}], stats(Set4)). + +-endif. diff --git a/test/crdt_statem_eqc.erl b/test/crdt_statem_eqc.erl index 37b95ad..8ecbfff 100644 --- a/test/crdt_statem_eqc.erl +++ b/test/crdt_statem_eqc.erl @@ -164,7 +164,7 @@ crdt_equals(Mod, {_IDS, CS}, {_IDD, CD}) -> %% list equality expects lists in order sort(Mod, L) when Mod == riak_dt_orset; Mod == riak_dt_gset; Mod == riak_dt_orswot; Mod == riak_dt_map; - Mod == riak_dt_tsmap -> + Mod == riak_dt_tsmap; Mod == riak_dt_lwwset -> lists:sort(L); sort(_, Other) -> Other.