From 2691a0b74b3e4495ea9e9d1c146f39ed5946eabf Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Tue, 16 Feb 2016 17:11:32 -0500 Subject: [PATCH 01/14] Add description --- src/riak_dt.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_dt.app.src b/src/riak_dt.app.src index 09146a0..4e81214 100644 --- a/src/riak_dt.app.src +++ b/src/riak_dt.app.src @@ -1,7 +1,7 @@ %% -*- erlang -*- {application, riak_dt, [ - {description, ""}, + {description, "riak CTDT datatypes"}, {vsn, git}, {registered, []}, {applications, [ From ccfa6b28867316d7faf4a7d1c75db3c2fb59e40e Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Tue, 16 Feb 2016 17:14:48 -0500 Subject: [PATCH 02/14] Add missing app.src fields for hex --- src/riak_dt.app.src | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/riak_dt.app.src b/src/riak_dt.app.src index 4e81214..5cad961 100644 --- a/src/riak_dt.app.src +++ b/src/riak_dt.app.src @@ -14,5 +14,8 @@ %% indicates the level of compression. Higher number means more %% compression, but more time to compress. In tests so far 1 has %% been enough for CRDTs - {env, [{binary_compression, 1}]} + {env, [{binary_compression, 1}]}, + {maintainers, ["Basho", "Heinz N. Gies"]}, + {licenses, ["Apache"]}, + {links, [{"Github", "https://github.com/basho/riak_dt"}]} ]}. From 76254ef703a90697b7e8dc55b73d2c0e32ecfa18 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Wed, 17 Feb 2016 12:48:21 -0500 Subject: [PATCH 03/14] Fix CRDT typo --- src/riak_dt.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_dt.app.src b/src/riak_dt.app.src index 5cad961..4e0a847 100644 --- a/src/riak_dt.app.src +++ b/src/riak_dt.app.src @@ -1,7 +1,7 @@ %% -*- erlang -*- {application, riak_dt, [ - {description, "riak CTDT datatypes"}, + {description, "riak CRDT datatypes"}, {vsn, git}, {registered, []}, {applications, [ From 2a76da2071dd22d0331ea28d8ac2e61babf2beb0 Mon Sep 17 00:00:00 2001 From: Julian Pistorius Date: Sat, 23 Jul 2016 15:00:43 -0700 Subject: [PATCH 04/14] Typos and clarifications --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7b01d3a..2b87f53 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## WHAT? -A set of state based CRDTs implemented in Erlang and on the paper - +A set of state-based CRDTs implemented in Erlang, and based on the paper - [A Comprehensive study of Convergent and Commutative Replicated Data Types] (http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf) - which you may find an interesting read. @@ -11,6 +11,6 @@ may find an interesting read. Riak is getting CRDT support built in, so we've archived the old riak_dt in the branch `prototype`. No further work will be done on -it. This repo is now a resuable library of Quickcheck tested +it. This repo is now a reusable library of QuickCheck tested implementations of CRDTs. From 89d8f87ac86f2d0f99a6029608f1ff960747968c Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 27 Jan 2016 17:29:18 -0500 Subject: [PATCH 05/14] move from erlang sets to ordsets (sets->ordsets) for set operations across datatypes for small perf gain --- include/riak_dt.hrl | 4 +- src/riak_dt_map.erl | 18 ++++----- src/riak_dt_od_flag.erl | 8 ++-- src/riak_dt_orswot.erl | 86 ++++++++++++++++++++++------------------- 4 files changed, 61 insertions(+), 55 deletions(-) diff --git a/include/riak_dt.hrl b/include/riak_dt.hrl index ec07097..708941d 100644 --- a/include/riak_dt.hrl +++ b/include/riak_dt.hrl @@ -1,7 +1,7 @@ -ifdef(namespaced_types). -type riak_dt_dict() :: dict:dict(). --type riak_dt_set() :: sets:set(). -else. -type riak_dt_dict() :: dict(). --type riak_dt_set() :: set(). -endif. + +-type riak_dt_set() :: ordsets:ordset(_). diff --git a/src/riak_dt_map.erl b/src/riak_dt_map.erl index 82162b6..7b1ec5b 100644 --- a/src/riak_dt_map.erl +++ b/src/riak_dt_map.erl @@ -483,7 +483,7 @@ merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) %% only. -spec filter_unique(riak_dt_set(), entries(), riak_dt_vclock:vclock(), entries()) -> entries(). filter_unique(FieldSet, Entries, Clock, Acc) -> - sets:fold(fun({_Name, Type}=Field, Keep) -> + ordsets:fold(fun({_Name, Type}=Field, Keep) -> {Dots, TS} = ?DICT:fetch(Field, Entries), KeepDots = ?DICT:filter(fun(Dot, _CRDT) -> is_dot_unseen(Dot, Clock) @@ -518,7 +518,7 @@ is_dot_unseen(Dot, Clock) -> %% @doc Get the keys from an ?DICT as a ?SET -spec key_set(riak_dt_dict()) -> riak_dt_set(). key_set(Dict) -> - sets:from_list(?DICT:fetch_keys(Dict)). + ordsets:from_list(?DICT:fetch_keys(Dict)). %% @doc break the keys from an two ?DICTs out into three ?SETs, the %% common keys, those unique to one, and those unique to the other. @@ -526,22 +526,22 @@ key_set(Dict) -> key_sets(LHS, RHS) -> LHSet = key_set(LHS), RHSet = key_set(RHS), - {sets:intersection(LHSet, RHSet), - sets:subtract(LHSet, RHSet), - sets:subtract(RHSet, LHSet)}. + {ordsets:intersection(LHSet, RHSet), + ordsets:subtract(LHSet, RHSet), + ordsets:subtract(RHSet, LHSet)}. %% @private for a set of dots (that are unique to one side) decide %% whether to keep, or drop each. -spec filter_dots(riak_dt_set(), riak_dt_dict(), riak_dt_vclock:vclock()) -> entries(). filter_dots(Dots, CRDTs, Clock) -> - DotsToKeep = sets:filter(fun(Dot) -> + DotsToKeep = ordsets:filter(fun(Dot) -> is_dot_unseen(Dot, Clock) end, Dots), ?DICT:filter(fun(Dot, _CRDT) -> - sets:is_element(Dot, DotsToKeep) + ordsets:is_element(Dot, DotsToKeep) end, CRDTs). @@ -549,13 +549,13 @@ filter_dots(Dots, CRDTs, Clock) -> %% tombstone per field. If a dot is on both sides, keep it. If it is %% only on one side, drop it if dominated by the otherside's clock. merge_common(FieldSet, LHS, RHS, LHSClock, RHSClock, Acc) -> - sets:fold(fun({_, Type}=Field, Keep) -> + ordsets:fold(fun({_, Type}=Field, Keep) -> {LHSDots, LHTS} = ?DICT:fetch(Field, LHS), {RHSDots, RHTS} = ?DICT:fetch(Field, RHS), {CommonDots, LHSUniqe, RHSUnique} = key_sets(LHSDots, RHSDots), TS = Type:merge(RHTS, LHTS), - CommonSurviving = sets:fold(fun(Dot, Common) -> + CommonSurviving = ordsets:fold(fun(Dot, Common) -> L = ?DICT:fetch(Dot, LHSDots), ?DICT:store(Dot, L, Common) end, diff --git a/src/riak_dt_od_flag.erl b/src/riak_dt_od_flag.erl index b028f99..4a02754 100644 --- a/src/riak_dt_od_flag.erl +++ b/src/riak_dt_od_flag.erl @@ -128,12 +128,12 @@ merge({LHSClock, LHSDots, LHSDeferred}, {RHSClock, RHSDots, RHSDeferred}) -> %% drop all the RHS dots that dominated by the LHS clock %% keep all the dots that are in both %% save value as value of flag - CommonDots = sets:intersection(sets:from_list(LHSDots), sets:from_list(RHSDots)), - LHSUnique = sets:to_list(sets:subtract(sets:from_list(LHSDots), CommonDots)), - RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)), + CommonDots = ordsets:intersection(ordsets:from_list(LHSDots), ordsets:from_list(RHSDots)), + LHSUnique = ordsets:to_list(ordsets:subtract(ordsets:from_list(LHSDots), CommonDots)), + RHSUnique = ordsets:to_list(ordsets:subtract(ordsets:from_list(RHSDots), CommonDots)), LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - Flag = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), + Flag = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]), Deferred = ordsets:union(LHSDeferred, RHSDeferred), apply_deferred(NewClock, Flag, Deferred). diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index 90e0b4a..d7ca388 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -294,50 +294,56 @@ merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) -> {Clock, Entries, Deferred}; merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) -> Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), - {Keep, RHSElems} = ?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) -> - case ?DICT:find(Elem, RHSEntries) of - error -> - %% Only on left, trim dots and keep - %% surviving - case riak_dt_vclock:subtract_dots(Dots, RHSClock) of + {Keep, RHSElems} = + ?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) -> + case ?DICT:find(Elem, RHSEntries) of + error -> + %% Only on left, trim dots and keep surviving + case riak_dt_vclock:subtract_dots(Dots, RHSClock) of + [] -> + %% Removed + {Acc, RHSRemaining}; + NewDots -> + {?DICT:store(Elem, NewDots, Acc), RHSRemaining} + end; + {ok, RHSDots} -> + %% On both sides + CommonDots = ordsets:intersection( + ordsets:from_list(Dots), + ordsets:from_list(RHSDots)), + LHSUnique = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Dots), + CommonDots)), + RHSUnique = ordsets:to_list( + ordsets:subtract(ordsets:from_list(RHSDots), + CommonDots)), + LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), + RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), + V = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]), + %% Perfectly possible that an item in both sets should be dropped + case V of + [] -> + %% Removed from both sides + {Acc, ?DICT:erase(Elem, RHSRemaining)}; + _ -> + {?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)} + end + end + end, + {?DICT:new(), RHSEntries}, + LHSEntries), + %%Now what about the stuff left from the right hand side? Do the same to that! + Entries = ?DICT:fold(fun(Elem, Dots, Acc) -> + case riak_dt_vclock:subtract_dots(Dots, LHSClock) of [] -> %% Removed - {Acc, RHSRemaining}; + Acc; NewDots -> - {?DICT:store(Elem, NewDots, Acc), RHSRemaining} - end; - {ok, RHSDots} -> - %% On both sides - CommonDots = sets:intersection(sets:from_list(Dots), sets:from_list(RHSDots)), - LHSUnique = sets:to_list(sets:subtract(sets:from_list(Dots), CommonDots)), - RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)), - LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), - RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), - V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]), - %% Perfectly possible that an item in both sets should be dropped - case V of - [] -> - %% Removed from both sides - {Acc, ?DICT:erase(Elem, RHSRemaining)}; - _ -> - {?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)} + ?DICT:store(Elem, NewDots, Acc) end - end - end, - {?DICT:new(), RHSEntries}, - LHSEntries), - %%Now what about the stuff left from the right hand side? Do the same to that! - Entries = ?DICT:fold(fun(Elem, Dots, Acc) -> - case riak_dt_vclock:subtract_dots(Dots, LHSClock) of - [] -> - %% Removed - Acc; - NewDots -> - ?DICT:store(Elem, NewDots, Acc) - end - end, - Keep, - RHSElems), + end, + Keep, + RHSElems), Deffered = merge_deferred(LHSDeferred, RHSDeferred), apply_deferred(Clock, Entries, Deffered). From 00a92d34b978bb4d0fc4703f61dbfbcdf75fc769 Mon Sep 17 00:00:00 2001 From: Bryan Hunt Date: Tue, 12 Jul 2016 14:49:22 +0100 Subject: [PATCH 06/14] enable gset support --- src/riak_dt_gset.erl | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/src/riak_dt_gset.erl b/src/riak_dt_gset.erl index fb54ff6..3c84bcc 100644 --- a/src/riak_dt_gset.erl +++ b/src/riak_dt_gset.erl @@ -58,7 +58,7 @@ -type binary_gset() :: binary(). %% A binary that from_binary/1 will operate on. --type gset_op() :: {add, member()}. +-type gset_op() :: {add, member()} | {add_all, members()}. -type actor() :: riak_dt:actor(). @@ -78,10 +78,25 @@ value(GSet) -> value(_, GSet) -> value(GSet). +%%-spec apply_ops([gset_op()], actor() | dot(), orswot()) -> +%% {ok, orswot()} | precondition_error(). +apply_ops([], _Actor, ORSet) -> + {ok, ORSet}; +apply_ops([Op | Rest], Actor, ORSet) -> + case update(Op, Actor, ORSet) of + {ok, ORSet2} -> + apply_ops(Rest, Actor, ORSet2); + Error -> + Error + end. -spec update(gset_op(), actor(), gset()) -> {ok, gset()}. update({add, Elem}, _Actor, GSet) -> {ok, ordsets:add_element(Elem, GSet)}; + +update({update, Ops}, _Actor, GSet) -> +apply_ops(Ops,_Actor,GSet); + update({add_all, Elems}, _Actor, GSet) -> {ok, ordsets:union(GSet, ordsets:from_list(Elems))}. @@ -105,21 +120,27 @@ equal(GSet1, GSet2) -> -include("riak_dt_tags.hrl"). -define(TAG, ?DT_GSET_TAG). -define(V1_VERS, 1). +-define(V2_VERS, 2). -spec to_binary(gset()) -> binary_gset(). to_binary(GSet) -> - <>. + %%<>. + {ok, B} = to_binary(?V2_VERS, GSet), + B. -spec to_binary(Vers :: pos_integer(), gset()) -> {ok, binary()} | ?UNSUPPORTED_VERSION. -to_binary(1, S) -> - B = to_binary(S), - {ok, B}; -to_binary(Vers, _S) -> +to_binary(?V1_VERS, S) -> + {ok, <>}; +to_binary(?V2_VERS, S) -> + {ok, <>}; +to_binary(Vers, _S0) -> ?UNSUPPORTED_VERSION(Vers). -spec from_binary(binary()) -> {ok, gset()} | ?UNSUPPORTED_VERSION | ?INVALID_BINARY. from_binary(<>) -> {ok, riak_dt:from_binary(Bin)}; +from_binary(<>) -> + {ok, riak_dt:from_binary(Bin)}; from_binary(<>) -> ?UNSUPPORTED_VERSION(Vers); from_binary(_B) -> From ad518e82ac36cc54cb46c0ee04a38de86006ae78 Mon Sep 17 00:00:00 2001 From: Bryan Hunt Date: Mon, 25 Jul 2016 18:13:23 +0100 Subject: [PATCH 07/14] tests --- src/riak_dt_gset.erl | 5 +++++ test/riak_dt_gset_tests.erl | 39 +++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 test/riak_dt_gset_tests.erl diff --git a/src/riak_dt_gset.erl b/src/riak_dt_gset.erl index 3c84bcc..bfabe95 100644 --- a/src/riak_dt_gset.erl +++ b/src/riak_dt_gset.erl @@ -178,6 +178,11 @@ stat_test() -> ?assertEqual(15, stat(max_element_size, S1)), ?assertEqual(undefined, stat(actor_count, S1)). +to_binary_test() -> + GSet = update({add, <<"foo">>}, undefined_actor, riak_dt_gset:new()), + Bin = riak_dt_gset:to_binary(GSet), + ?assertMatch( <<82:8/integer, ?V2_VERS:8/integer, _/binary>> , Bin). + -ifdef(EQC). eqc_value_test_() -> crdt_statem_eqc:run(?MODULE, 1000). diff --git a/test/riak_dt_gset_tests.erl b/test/riak_dt_gset_tests.erl new file mode 100644 index 0000000..ffe9980 --- /dev/null +++ b/test/riak_dt_gset_tests.erl @@ -0,0 +1,39 @@ +%% ------------------------------------------------------------------- +%% +%% riak_dt_gset_test: trivial assertive tests to illustrate module behavior +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_dt_gset_tests). + +-include_lib("eunit/include/eunit.hrl"). +-import(riak_dt_gset, [update/3]). + +-define(ACTOR_VAL, undefined). +-define(SINGLE_VAL, <<"binarytemple">>). +-define(FRANK_BOOTH, [<<"frank">>, <<"booth">>]). +-define(BOOTH_FRANK, [<<"booth">>, <<"frank">>]). + +update_add_test() -> + N = riak_dt_gset:new(), + ?assertEqual({ok, [?SINGLE_VAL]}, update({add, ?SINGLE_VAL}, ?ACTOR_VAL, N)) +. + +update_add_all_test() -> + ?assertEqual({ok, ?BOOTH_FRANK}, update({add_all, ?FRANK_BOOTH}, ?ACTOR_VAL, riak_dt_gset:new())), + ?assertNotEqual({ok, ?FRANK_BOOTH}, update({add_all, ?FRANK_BOOTH}, ?ACTOR_VAL, riak_dt_gset:new())) +. From e7c864702b0699fa7c20410905078648aa10ff72 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Tue, 1 Nov 2016 17:11:17 +0000 Subject: [PATCH 08/14] Update copyright dates --- src/riak_dt_gset.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_dt_gset.erl b/src/riak_dt_gset.erl index bfabe95..01a922b 100644 --- a/src/riak_dt_gset.erl +++ b/src/riak_dt_gset.erl @@ -3,7 +3,7 @@ %% %% riak_dt_gset: A convergent, replicated, state based grow only set %% -%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file From 4fa631ed174895e6dadda1ec1fcade932c09bdb0 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Wed, 2 Nov 2016 10:50:02 +0000 Subject: [PATCH 09/14] Fix copy-pasta from orswot and make dialyzer happy --- src/riak_dt_gset.erl | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/riak_dt_gset.erl b/src/riak_dt_gset.erl index 01a922b..dcb26e6 100644 --- a/src/riak_dt_gset.erl +++ b/src/riak_dt_gset.erl @@ -78,17 +78,13 @@ value(GSet) -> value(_, GSet) -> value(GSet). -%%-spec apply_ops([gset_op()], actor() | dot(), orswot()) -> -%% {ok, orswot()} | precondition_error(). -apply_ops([], _Actor, ORSet) -> - {ok, ORSet}; -apply_ops([Op | Rest], Actor, ORSet) -> - case update(Op, Actor, ORSet) of - {ok, ORSet2} -> - apply_ops(Rest, Actor, ORSet2); - Error -> - Error - end. +-spec apply_ops([gset_op()], actor(), gset()) -> + {ok, gset()}. +apply_ops([], _Actor, GSet) -> + {ok, GSet}; +apply_ops([Op | Rest], Actor, GSet) -> + {ok, GSet2} = update(Op, Actor, GSet), + apply_ops(Rest, Actor, GSet2). -spec update(gset_op(), actor(), gset()) -> {ok, gset()}. update({add, Elem}, _Actor, GSet) -> From c72a6e3afe0cddbefa7b81a0eae8e4337dbebee0 Mon Sep 17 00:00:00 2001 From: chenduo Date: Tue, 14 Nov 2017 10:20:37 -0600 Subject: [PATCH 10/14] Update README.md typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2b87f53..68618eb 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ A set of state-based CRDTs implemented in Erlang, and based on the paper - (http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf) - which you may find an interesting read. -### What happend to riak_dt, the database? +### What happened to riak_dt, the database? Riak is getting CRDT support built in, so we've archived the old riak_dt in the branch `prototype`. No further work will be done on From 2ac749f47e54ce1f423ca5f2fe87d46cf4861d8a Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Sun, 3 Aug 2014 20:57:17 +0100 Subject: [PATCH 11/14] Add LWW-Element-Set with Add-Wins semantic for equal timestamp --- include/riak_dt_tags.hrl | 1 + src/riak_dt_lwwset.erl | 258 +++++++++++++++++++++++++++++++++++++++ test/crdt_statem_eqc.erl | 2 +- 3 files changed, 260 insertions(+), 1 deletion(-) create mode 100644 src/riak_dt_lwwset.erl diff --git a/include/riak_dt_tags.hrl b/include/riak_dt_tags.hrl index 9f04f10..12693ab 100644 --- a/include/riak_dt_tags.hrl +++ b/include/riak_dt_tags.hrl @@ -30,6 +30,7 @@ -define(DT_GSET_TAG, 82). -define(DT_ORSET_TAG, 76). -define(DT_ORSWOT_TAG, 75). +-define(DT_LWWSET_TAG, 78). %% Maps -define(DT_MAP_TAG, 77). diff --git a/src/riak_dt_lwwset.erl b/src/riak_dt_lwwset.erl new file mode 100644 index 0000000..a74dec5 --- /dev/null +++ b/src/riak_dt_lwwset.erl @@ -0,0 +1,258 @@ +%% ------------------------------------------------------------------- +%% +%% riak_dt_lwwset: LWW-Element-Set +%% +%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Erlang DT implemntation of Roshi's LWW-Element-Set +%% +%% +%% @reference Marc Shapiro, Nuno Preguiça, Carlos Baquero, Marek +%% Zawirski (2011) A comprehensive study of Convergent and Commutative +%% Replicated Data Types. http://hal.upmc.fr/inria-00555588/ +%% +%% @reference Roshi, https://github.com/soundcloud/roshi +%% +%% @end +-module(riak_dt_lwwset). + +-behaviour(riak_dt). + +-ifdef(EQC). +-include_lib("eqc/include/eqc.hrl"). +-define(QC_OUT(P), + eqc:on_output(fun(Str, Args) -> + io:format(user, Str, Args) end, P)). +-define(NUMTESTS, 1000). +-endif. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([new/0, value/1, value/2]). +-export([update/3, update/4, merge/2, equal/2]). +-export([to_binary/1, from_binary/1]). +-export([stats/1, stat/2]). +-export([parent_clock/2]). + +%% EQC API +-ifdef(EQC). +-export([gen_op/0, gen_op/1, update_expected/3, eqc_state_value/1]). +-export([init_state/0, generate/0, size/1]). + +-endif. + +-export_type([lwwset/0, lwwset_op/0, binary_lwwset/0]). + +-type lwwset() :: [entries()]. + +-type binary_lwwset() :: binary(). %% A binary that from_binary/1 will operate on. + +-type lwwset_op() :: {add, member(), ts()} | {remove, member(), ts()}. + +-type entries() :: [{member(), {ts(), status()}}]. + +-type member() :: term(). +-type ts() :: pos_integer(). +-type status() :: add() | remove(). +-type add() :: 1. +-type remove() :: 0. + +-define(ADD, 1). +-define(REM, 0). + +-spec new() -> lwwset(). +new() -> + orddict:new(). + +-spec parent_clock(riak_dt_vclock:vclock(), lwwset()) -> lwwset(). +parent_clock(_Clock, LWWSet) -> + LWWSet. + +-spec value(lwwset()) -> [member()]. +value(LWWSet) -> + [K || {K, {_TS, Status}} <- orddict:to_list(LWWSet), Status == 1]. + +value(size, LWWSet) -> + length(value(LWWSet)); +value({contains, Elem}, LWWSet) -> + lists:member(Elem, value(LWWSet)). + +-spec update(lwwset_op(), riak_dt:actor() | riak_dt:dot(), lwwset()) -> {ok, lwwset()}. +update({add, Elem, TS}, _Actor, LWWSet) -> + {ok, add_elem(Elem, TS, LWWSet)}; +update({remove, Elem, TS}, _Actor, LWWSet) -> + {ok, remove_elem(Elem, TS, LWWSet)}. + +update(Op, Actor, Set, _Ctx) -> + update(Op, Actor, Set). + +%% Private +-spec add_elem(member(), ts(), lwwset()) -> lwwset(). +add_elem(Elem, TS, LWWSet) -> + case orddict:find(Elem, LWWSet) of + error -> + orddict:store(Elem, {TS, ?ADD}, LWWSet); + {ok, {TS, ?REM}} -> + orddict:store(Elem, {TS, ?ADD}, LWWSet); + {ok, {TS0, _}} when TS0 < TS -> + orddict:store(Elem, {TS, ?ADD}, LWWSet); + _ -> + LWWSet + end. + +%% @doc warning, allows doomstoning. +-spec remove_elem(member(), ts(), lwwset()) -> lwwset(). +remove_elem(Elem, TS, LWWSet) -> + case orddict:find(Elem, LWWSet) of + error -> + orddict:store(Elem, {TS, ?REM}, LWWSet); + {ok, {TS, ?ADD}} -> + LWWSet; + {ok, {TS0, _}} when TS0 < TS -> + orddict:store(Elem, {TS, ?REM}, LWWSet); + _ -> + LWWSet + end. + +-spec merge(lwwset(), lwwset()) -> lwwset(). +merge(LWWSet, LWWSet) -> + LWWSet; +merge(LWWSet1, LWWSet2) -> + orddict:merge(fun lww/3, LWWSet1, LWWSet2). + +lww(_Key, {TS, ?ADD}, {TS, ?REM}) -> + {TS, ?ADD}; +lww(_Key, {TS, ?REM}, {TS, ?ADD}) -> + {TS, ?ADD}; +lww(_Key, {TS, Op}, {TS, Op}) -> + {TS, Op}; +lww(_Key, {TS1, _}=V1, {TS2, _}) when TS1 > TS2 -> + V1; +lww(_Key, {TS1, _}, {TS2, _}=V2) when TS1 < TS2 -> + V2. + +-spec equal(lwwset(), lwwset()) -> boolean(). +equal(LWWSet1, LWWSet2) -> + LWWSet1 == LWWSet2. + +-spec stats(lwwset()) -> [{atom(), number()}]. +stats(LWWSet) -> + [{S, stat(S, LWWSet)} || S <- [element_count]]. + +-spec stat(atom(), lwwset()) -> number() | undefined. +stat(element_count, LWWSet) -> + orddict:size(LWWSet); +stat(_,_) -> undefined. + +-include("riak_dt_tags.hrl"). +-define(TAG, ?DT_LWWSET_TAG). +-define(V1_VERS, 1). + +%% @doc returns a binary representation of the provided +%% `orswot()'. The resulting binary is tagged and versioned for ease +%% of future upgrade. Calling `from_binary/1' with the result of this +%% function will return the original set. Use the application env var +%% `binary_compression' to turn t2b compression on (`true') and off +%% (`false') +%% +%% @see `from_binary/1' +-spec to_binary(lwwset()) -> binary_lwwset(). +to_binary(S) -> + <>. + +%% @doc When the argument is a `binary_orswot()' produced by +%% `to_binary/1' will return the original `orswot()'. +%% +%% @see `to_binary/1' +-spec from_binary(binary_lwwset()) -> lwwset(). +from_binary(<>) -> + riak_dt:from_binary(B). + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). + +-ifdef(EQC). + +bin_roundtrip_test_() -> + crdt_statem_eqc:run_binary_rt(?MODULE, ?NUMTESTS). + +eqc_value_test_() -> + crdt_statem_eqc:run(?MODULE, ?NUMTESTS). + +generate() -> + new(). + +size(Set) -> + [{element_count, Cnt}] = stats(Set), + Cnt. + +%% EQC generator +gen_op() -> + ?SIZED(Size, gen_op(Size)). + +gen_op(_Size) -> + oneof([{add, int(), nat()}, {remove, int(), nat()}]). + +init_state() -> + {orddict:new(), orddict:new()}. + +update_expected(_ID, {add, Elem, TS}, {A0, R}) -> + A = update_element(A0, Elem, TS), + {A, R}; +update_expected(_ID, {remove, Elem, TS}, {A, R0}) -> + R = update_element(R0, Elem, TS), + {A, R}; +update_expected(_, _, S) -> + S. + +update_element(Dict, Elem, TS) -> + orddict:update(Elem, fun(T) when T >= TS-> T; + (_T) -> TS end, + TS, + Dict). + +eqc_state_value({A, R}) -> + orddict:fold(fun(Elem, TS, Acc) -> + case orddict:find(Elem, R) of + error -> + [Elem | Acc]; + {ok, T} when T > TS -> + Acc; + _ -> + [Elem | Acc] + end + end, + [], + A). +-endif. + +stat_test() -> + Set = new(), + {ok, Set1} = update({add, <<"foo">>, 1}, 1, Set), + {ok, Set2} = update({add, <<"foo">>, 2}, 2, Set1), + {ok, Set3} = update({add, <<"bar">>, 3}, 3, Set2), + {ok, Set4} = update({remove, <<"foo">>, 4}, 1, Set3), + ?assertEqual([{element_count, 2}], stats(Set4)). + +-endif. diff --git a/test/crdt_statem_eqc.erl b/test/crdt_statem_eqc.erl index 37b95ad..8ecbfff 100644 --- a/test/crdt_statem_eqc.erl +++ b/test/crdt_statem_eqc.erl @@ -164,7 +164,7 @@ crdt_equals(Mod, {_IDS, CS}, {_IDD, CD}) -> %% list equality expects lists in order sort(Mod, L) when Mod == riak_dt_orset; Mod == riak_dt_gset; Mod == riak_dt_orswot; Mod == riak_dt_map; - Mod == riak_dt_tsmap -> + Mod == riak_dt_tsmap; Mod == riak_dt_lwwset -> lists:sort(L); sort(_, Other) -> Other. From 5f175b215b89716d7876515a1d556c2bc0f771b4 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Mon, 4 Aug 2014 17:36:07 +0100 Subject: [PATCH 12/14] Fix type error in lwwset declaration --- src/riak_dt_lwwset.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/riak_dt_lwwset.erl b/src/riak_dt_lwwset.erl index a74dec5..eba0fd0 100644 --- a/src/riak_dt_lwwset.erl +++ b/src/riak_dt_lwwset.erl @@ -60,24 +60,24 @@ -endif. +-define(ADD, 1). +-define(REM, 0). + -export_type([lwwset/0, lwwset_op/0, binary_lwwset/0]). --type lwwset() :: [entries()]. +-type lwwset() :: [entry()]. -type binary_lwwset() :: binary(). %% A binary that from_binary/1 will operate on. -type lwwset_op() :: {add, member(), ts()} | {remove, member(), ts()}. --type entries() :: [{member(), {ts(), status()}}]. +-type entry() :: {member(), {ts(), status()}}. -type member() :: term(). -type ts() :: pos_integer(). -type status() :: add() | remove(). --type add() :: 1. --type remove() :: 0. - --define(ADD, 1). --define(REM, 0). +-type add() :: ?ADD. +-type remove() :: ?REM. -spec new() -> lwwset(). new() -> From 9bade06377b4b9dba99f8e858f9ddffa475e1aea Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Thu, 11 Jan 2018 12:00:32 +0000 Subject: [PATCH 13/14] Update to/from binary for latest Also move to `dict` from `orddict` for better performance in larger sets. Retain `orddict` for EQC for ease of reading. --- src/riak_dt_lwwset.erl | 59 ++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/riak_dt_lwwset.erl b/src/riak_dt_lwwset.erl index eba0fd0..6905756 100644 --- a/src/riak_dt_lwwset.erl +++ b/src/riak_dt_lwwset.erl @@ -50,8 +50,10 @@ -export([new/0, value/1, value/2]). -export([update/3, update/4, merge/2, equal/2]). -export([to_binary/1, from_binary/1]). +-export([to_binary/2]). -export([stats/1, stat/2]). -export([parent_clock/2]). +-export([to_version/2]). %% EQC API -ifdef(EQC). @@ -79,9 +81,15 @@ -type add() :: ?ADD. -type remove() :: ?REM. +-ifdef(EQC). +-define(DICT, orddict). +-else. +-define(DICT, dict). +-endif. + -spec new() -> lwwset(). new() -> - orddict:new(). + ?DICT:new(). -spec parent_clock(riak_dt_vclock:vclock(), lwwset()) -> lwwset(). parent_clock(_Clock, LWWSet) -> @@ -89,7 +97,7 @@ parent_clock(_Clock, LWWSet) -> -spec value(lwwset()) -> [member()]. value(LWWSet) -> - [K || {K, {_TS, Status}} <- orddict:to_list(LWWSet), Status == 1]. + [K || {K, {_TS, Status}} <- ?DICT:to_list(LWWSet), Status == 1]. value(size, LWWSet) -> length(value(LWWSet)); @@ -108,13 +116,13 @@ update(Op, Actor, Set, _Ctx) -> %% Private -spec add_elem(member(), ts(), lwwset()) -> lwwset(). add_elem(Elem, TS, LWWSet) -> - case orddict:find(Elem, LWWSet) of + case ?DICT:find(Elem, LWWSet) of error -> - orddict:store(Elem, {TS, ?ADD}, LWWSet); + ?DICT:store(Elem, {TS, ?ADD}, LWWSet); {ok, {TS, ?REM}} -> - orddict:store(Elem, {TS, ?ADD}, LWWSet); + ?DICT:store(Elem, {TS, ?ADD}, LWWSet); {ok, {TS0, _}} when TS0 < TS -> - orddict:store(Elem, {TS, ?ADD}, LWWSet); + ?DICT:store(Elem, {TS, ?ADD}, LWWSet); _ -> LWWSet end. @@ -122,13 +130,13 @@ add_elem(Elem, TS, LWWSet) -> %% @doc warning, allows doomstoning. -spec remove_elem(member(), ts(), lwwset()) -> lwwset(). remove_elem(Elem, TS, LWWSet) -> - case orddict:find(Elem, LWWSet) of + case ?DICT:find(Elem, LWWSet) of error -> - orddict:store(Elem, {TS, ?REM}, LWWSet); + ?DICT:store(Elem, {TS, ?REM}, LWWSet); {ok, {TS, ?ADD}} -> LWWSet; {ok, {TS0, _}} when TS0 < TS -> - orddict:store(Elem, {TS, ?REM}, LWWSet); + ?DICT:store(Elem, {TS, ?REM}, LWWSet); _ -> LWWSet end. @@ -137,7 +145,7 @@ remove_elem(Elem, TS, LWWSet) -> merge(LWWSet, LWWSet) -> LWWSet; merge(LWWSet1, LWWSet2) -> - orddict:merge(fun lww/3, LWWSet1, LWWSet2). + ?DICT:merge(fun lww/3, LWWSet1, LWWSet2). lww(_Key, {TS, ?ADD}, {TS, ?REM}) -> {TS, ?ADD}; @@ -160,7 +168,7 @@ stats(LWWSet) -> -spec stat(atom(), lwwset()) -> number() | undefined. stat(element_count, LWWSet) -> - orddict:size(LWWSet); + ?DICT:size(LWWSet); stat(_,_) -> undefined. -include("riak_dt_tags.hrl"). @@ -177,15 +185,32 @@ stat(_,_) -> undefined. %% @see `from_binary/1' -spec to_binary(lwwset()) -> binary_lwwset(). to_binary(S) -> - <>. - -%% @doc When the argument is a `binary_orswot()' produced by -%% `to_binary/1' will return the original `orswot()'. + {ok, B} = to_binary(?V1_VERS, S), + B. + +%% @doc encode set to target version. The first argument is the target +%% binary type. +-spec to_binary(Vers :: pos_integer(), lwwset()) -> {ok, binary_lwwset()} | ?UNSUPPORTED_VERSION. +to_binary(?V1_VERS, S) -> + {ok, <>}; +to_binary(Vers, _S) -> + ?UNSUPPORTED_VERSION(Vers). + +%% @doc When the argument is a `binary_lwwset()' produced by +%% `to_binary/1' will return the original `lwwset()'. %% %% @see `to_binary/1' --spec from_binary(binary_lwwset()) -> lwwset(). +-spec from_binary(binary_lwwset()) -> {ok, lwwset()} | ?UNSUPPORTED_VERSION | ?INVALID_BINARY. from_binary(<>) -> - riak_dt:from_binary(B). + {ok, riak_dt:from_binary(B)}; +from_binary(<>) -> + ?UNSUPPORTED_VERSION(Vers); +from_binary(_B) -> + ?INVALID_BINARY. + +-spec to_version(pos_integer(), lwwset()) -> lwwset(). +to_version(_Version, Set) -> + Set. %% =================================================================== %% EUnit tests From ea3a425e097500e6816f3919d09c8d61120d47cb Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Thu, 11 Jan 2018 12:17:11 +0000 Subject: [PATCH 14/14] Update dialyzer type for orddict->dict change --- src/riak_dt_lwwset.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/riak_dt_lwwset.erl b/src/riak_dt_lwwset.erl index 6905756..10f5039 100644 --- a/src/riak_dt_lwwset.erl +++ b/src/riak_dt_lwwset.erl @@ -67,20 +67,24 @@ -export_type([lwwset/0, lwwset_op/0, binary_lwwset/0]). --type lwwset() :: [entry()]. +-type lwwset() :: dict(member(), {ts(), status()}). -type binary_lwwset() :: binary(). %% A binary that from_binary/1 will operate on. -type lwwset_op() :: {add, member(), ts()} | {remove, member(), ts()}. --type entry() :: {member(), {ts(), status()}}. - -type member() :: term(). -type ts() :: pos_integer(). -type status() :: add() | remove(). -type add() :: ?ADD. -type remove() :: ?REM. +-ifdef(namespaced_types). +-type dict(A, B) :: dict:dict(A, B). +-else. +-type dict(_A, _B) :: dict(). +-endif. + -ifdef(EQC). -define(DICT, orddict). -else.