Skip to content

Commit 40f081d

Browse files
committed
Changes done for the DPS support of the official Redis cluster v3.
1 parent d6a9bcd commit 40f081d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+6095
-329
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ DPS toolkit for the purpose of sharing application state in a distributed manner
1010

1111
<ol>
1212
<li>Memcached</li>
13-
<li>Redis</li>
13+
<li>Redis [version 2.x that doesn't have a built-in cluster feature]</li>
1414
<li>Cassandra</li>
1515
<li>IBM Cloudant</li>
1616
<li>HBase</li>
1717
<li>Mongo</li>
1818
<li>Couchbase</li>
1919
<li>Aerospike</li>
20+
<li>Redis-Cluster [New cluster feature is available in Redis version 3 and above]</li>
2021
</ol>
2122

2223
There are plenty of details available about the installation, configuration, API description,

com.ibm.streamsx.dps/.toolkitList

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
com.ibm.streamsx.dps/1.0.8
1+
com.ibm.streamsx.dps/1.0.9
22
spl/1.1.0
33
058_data_sharing_between_non_fused_spl_custom_and_cpp_primitive_operators/1.0.0
44
061_data_sharing_between_non_fused_spl_custom_operators_and_a_native_function/1.0.0
55
062_data_sharing_between_non_fused_spl_custom_and_java_primitive_operators/1.0.0
6-
com.ibm.streams.db/1.4.0
76
dps_test_1/1.0.0

com.ibm.streamsx.dps/com.ibm.streamsx/lock/distributed/native.function/function.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
<cmn:managedLibrary>
4747
<cmn:lib>DistributedProcessStoreLib</cmn:lib>
4848
<cmn:lib>memcached</cmn:lib>
49+
<cmn:lib>sds</cmn:lib>
4950
<cmn:lib>hiredis</cmn:lib>
5051
<cmn:lib>uv</cmn:lib>
5152
<cmn:lib>crypto</cmn:lib>

com.ibm.streamsx.dps/com.ibm.streamsx/store/distributed/native.function/function.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@
188188
<cmn:managedLibrary>
189189
<cmn:lib>DistributedProcessStoreLib</cmn:lib>
190190
<cmn:lib>memcached</cmn:lib>
191+
<cmn:lib>sds</cmn:lib>
191192
<cmn:lib>hiredis</cmn:lib>
192193
<cmn:lib>uv</cmn:lib>
193194
<cmn:lib>crypto</cmn:lib>

com.ibm.streamsx.dps/doc/dps-usage-tips.txt

Lines changed: 100 additions & 52 deletions
Large diffs are not rendered by default.
Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,72 @@
11
#!/bin/sh
2+
#set -x
3+
24
# Output appropriate libPath information for toolkits
35
# that support multiple platforms.
46

57
if [ "$1" == "libPath" ]; then
68

9+
here=$( cd ${0%/*} ; pwd )
710
OS=`uname -s`
811
MACH=`uname -i`
912
if [ "${MACH}" = "i386" ] ; then
1013
MACH=x86
1114
fi
1215

1316
if [ "${OS}" = "Linux" ] ; then
17+
18+
# choose a library path for RedHat-based distributions
1419
if [ -f /etc/redhat-release ] ; then
15-
OSDIST=`awk 'FNR == 1 {print substr($1, 0)}' /etc/redhat-release`
16-
if [ "$OSDIST" == 'Red' ]; then
17-
OSDIST='RHEL'
18-
fi
1920

21+
# parse the distribution name and version from its banner
22+
OSDIST=`awk 'FNR == 1 {print substr($1, 0)}' /etc/redhat-release`
2023
OSVER=`cat /etc/redhat-release | sed s/.*release\ // | sed s/\ .*//`
24+
25+
# adjust name and version as necessary for derivatives of RedHat distribution
26+
if [ "$OSDIST" == 'Red' ] ; then OSDIST='RHEL' ; fi
27+
if [ "$OSDIST" == 'BluVector' ] ; then OSDIST='CentOS' ; OSVER=6 ; fi
28+
29+
# concatenate the name and version for use below in constructing a library pathname
2130
OSLVL=$OSDIST${OSVER}
31+
2232
elif [ -f /etc/SuSE-release ] ; then
33+
34+
# parse the distribution name and version from its banner
2335
OSVER=`awk 'FNR == 2 {print $3}' /etc/SuSE-release`
36+
37+
# concatenate the name and version for use below in constructing a library pathname
2438
OSLVL=SLES${OSVER}
39+
2540
else
41+
42+
# this should cause a link error on unsupported distributions
2643
OSLVL="unsupported Linux distribution"
27-
fi
2844

29-
OSSTR="../../../../impl/lib/${MACH}.${OSLVL%%.*}"
45+
fi
3046

47+
# construct a library pathname
48+
OSSTR="$here/../lib/${MACH}.${OSLVL%%.*}"
49+
3150
else
51+
52+
# this should cause a link error on unsupported operating systems
3253
OSSTR="${MACH}.unsupported OS"
3354
fi
3455

56+
# return a library pathname, but warn if it does not exist
57+
[ -d $OSSTR ] || echo warning: non-existant library path $OSSTR 1>&2
3558
echo ${OSSTR}
3659

3760
elif [ "$1" == "includePath" ]; then
3861
# echo any computed include paths here, if needed
3962
echo ../../../../impl/include
63+
4064
elif [ "$1" == "lib" ]; then
4165
# echo any computed library names here, if needed
4266
:
67+
4368
else
4469
echo "unsupported option"
70+
4571
fi
4672
# end of archLevel script

com.ibm.streamsx.dps/impl/include/DpsConstants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ interface with many different back-end in-memory stores.
9191
#define MONGO_NO_SQL_DB_NAME "mongo"
9292
#define COUCHBASE_NO_SQL_DB_NAME "couchbase"
9393
#define AEROSPIKE_NO_SQL_DB_NAME "aerospike"
94+
#define REDIS_CLUSTER_NO_SQL_DB_NAME "redis-cluster"
9495
#define HTTP_GET "GET"
9596
#define HTTP_PUT "PUT"
9697
#define HTTP_POST "POST"
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
# Licensed Materials - Property of IBM
3+
# Copyright IBM Corp. 2011, 2015
4+
# US Government Users Restricted Rights - Use, duplication or
5+
# disclosure restricted by GSA ADP Schedule Contract with
6+
# IBM Corp.
7+
*/
8+
#ifndef REDIS_CLUSTER_DB_LAYER_H_
9+
#define REDIS_CLUSTER_DB_LAYER_H_
10+
/*
11+
=====================================================================
12+
Here is the copyright statement for our use of the hiredis APIs:
13+
14+
Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
15+
Pieter Noordhuis (pcnoordhuis at gmail) and is released under the
16+
BSD license.
17+
18+
Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
19+
Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
20+
21+
hiredis-cluster include files provide wrappers on top of the hiredis library.
22+
This wrapper allows us to use the familiar hiredis APIs in the context of
23+
a Redis cluster (Version 3 and higher) to provide HA facility for automatic
24+
fail-over when a Redis instance or the entire machine crashes.
25+
This wrapper carries the copyright as shown in the following line.
26+
27+
Copyright (c) 2015, Dmitrii Shinkevich <shinmail at gmail dot com>
28+
29+
All rights reserved.
30+
31+
Redistribution and use in source and binary forms, with or without
32+
modification, are permitted provided that the following conditions are met:
33+
34+
* Redistributions of source code must retain the above copyright notice,
35+
this list of conditions and the following disclaimer.
36+
37+
* Redistributions in binary form must reproduce the above copyright notice,
38+
this list of conditions and the following disclaimer in the documentation
39+
and/or other materials provided with the distribution.
40+
41+
* Neither the name of Redis nor the names of its contributors may be used
42+
to endorse or promote products derived from this software without specific
43+
prior written permission.
44+
45+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
46+
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
47+
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
48+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
49+
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
50+
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
51+
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
52+
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
53+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
54+
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
55+
=====================================================================
56+
*/
57+
#include "DBLayer.h"
58+
59+
#include "hiredis.h"
60+
#include "hiredis-cluster/cluster.h"
61+
#include "hiredis-cluster/hirediscommand.h"
62+
#include <tr1/memory>
63+
#include <set>
64+
#include <vector>
65+
66+
using namespace RedisCluster;
67+
68+
namespace com {
69+
namespace ibm {
70+
namespace streamsx {
71+
namespace store {
72+
namespace distributed
73+
{
74+
class RedisClusterDBLayer;
75+
76+
/// Class that implements the Iterator for Redis cluster
77+
class RedisClusterDBLayerIterator : public DBLayer::Iterator
78+
{
79+
public:
80+
uint64_t store;
81+
std::string storeName;
82+
std::vector<std::string> dataItemKeys;
83+
uint32_t sizeOfDataItemKeysVector;
84+
uint32_t currentIndex;
85+
bool hasData;
86+
Cluster<redisContext>::ptr_t redis_cluster;
87+
redisReply *redis_cluster_reply;
88+
RedisClusterDBLayer *redisClusterDBLayerPtr;
89+
90+
RedisClusterDBLayerIterator();
91+
~RedisClusterDBLayerIterator();
92+
bool getNext(uint64_t store, unsigned char * & keyData, uint32_t & keySize,
93+
unsigned char * & valueData, uint32_t & valueSize, PersistenceError & dbError);
94+
};
95+
96+
/// Class that implements the DBLayer for Redis
97+
class RedisClusterDBLayer : public DBLayer
98+
{
99+
private:
100+
Cluster<redisContext>::ptr_t redis_cluster;
101+
redisReply *redis_cluster_reply;
102+
103+
104+
bool readStoreInformation(std::string const & storeIdString, PersistenceError & dbError,
105+
uint32_t & dataItemCnt, std::string & storeName,
106+
std::string & keySplTypeName, std::string & valueSplTypeName);
107+
bool acquireStoreLock(std::string const & storeIdString);
108+
void releaseStoreLock(std::string const & storeIdString);
109+
bool readLockInformation(std::string const & storeIdString, PersistenceError & dbError, uint32_t & lockUsageCnt,
110+
int32_t & lockExpirationTime, pid_t & lockOwningPid, std::string & lockName);
111+
bool updateLockInformation(std::string const & lockIdString, PersistenceError & lkError,
112+
uint32_t const & lockUsageCnt, int32_t const & lockExpirationTime, pid_t const & lockOwningPid);
113+
bool lockIdExistsOrNot(std::string lockIdString, PersistenceError & lkError);
114+
bool acquireGeneralPurposeLock(std::string const & entityName);
115+
void releaseGeneralPurposeLock(std::string const & entityName);
116+
int32_t getRedisServerPartitionIndex(std::string const & key);
117+
118+
public:
119+
/// Constructor
120+
RedisClusterDBLayer();
121+
122+
/// Destructor
123+
~RedisClusterDBLayer();
124+
125+
// These are inherited from DBLayer, see DBLayer for descriptions
126+
void connectToDatabase(std::set<std::string> const & dbServers, PersistenceError & dbError);
127+
128+
uint64_t createStore(std::string const & name,
129+
std::string const & keySplTypeName,
130+
std::string const & valueSplTypeName,
131+
PersistenceError & dbError);
132+
uint64_t createOrGetStore(std::string const & name,
133+
std::string const & keySplTypeName,
134+
std::string const & valueSplTypeName,
135+
PersistenceError & dbError);
136+
uint64_t findStore(std::string const & name,
137+
PersistenceError & dbError);
138+
bool removeStore(uint64_t store, PersistenceError & dbError);
139+
140+
bool put(uint64_t store, char const * keyData, uint32_t keySize,
141+
unsigned char const * valueData, uint32_t valueSize, PersistenceError & dbError);
142+
bool putSafe(uint64_t store, char const * keyData, uint32_t keySize,
143+
unsigned char const * valueData, uint32_t valueSize, PersistenceError & dbError);
144+
bool putTTL(char const * keyData, uint32_t keySize,
145+
unsigned char const * valueData, uint32_t valueSize, uint32_t ttl, PersistenceError & dbError);
146+
bool get(uint64_t store, char const * keyData, uint32_t keySize,
147+
unsigned char * & valueData, uint32_t & valueSize,
148+
PersistenceError & dbError);
149+
bool getSafe(uint64_t store, char const * keyData, uint32_t keySize,
150+
unsigned char * & valueData, uint32_t & valueSize,
151+
PersistenceError & dbError);
152+
bool getTTL(char const * keyData, uint32_t keySize,
153+
unsigned char * & valueData, uint32_t & valueSize,
154+
PersistenceError & dbError);
155+
bool remove(uint64_t store, char const * keyData, uint32_t keySize, PersistenceError & dbError);
156+
bool removeTTL(char const * keyData, uint32_t keySize, PersistenceError & dbError);
157+
bool has(uint64_t store, char const * keyData, uint32_t keySize, PersistenceError & dbError);
158+
bool hasTTL(char const * keyData, uint32_t keySize, PersistenceError & dbError);
159+
void clear(uint64_t store, PersistenceError & dbError);
160+
uint64_t size(uint64_t store, PersistenceError & dbError);
161+
void base64_encode(std::string const & str, std::string & base64);
162+
void base64_decode(std::string & base64, std::string & result);
163+
RedisClusterDBLayerIterator * newIterator(uint64_t store, PersistenceError & dbError);
164+
void deleteIterator(uint64_t store, Iterator * iter, PersistenceError & dbError);
165+
bool storeIdExistsOrNot(std::string storeIdString, PersistenceError & dbError);
166+
bool getDataItemFromStore(std::string const & storeIdString,
167+
std::string const & keyDataString, bool const & checkOnlyForDataItemExistence,
168+
bool const & skipDataItemExistenceCheck, unsigned char * & valueData,
169+
uint32_t & valueSize, PersistenceError & dbError);
170+
std::string getStoreName(uint64_t store, PersistenceError & dbError);
171+
std::string getSplTypeNameForKey(uint64_t store, PersistenceError & dbError);
172+
std::string getSplTypeNameForValue(uint64_t store, PersistenceError & dbError);
173+
std::string getNoSqlDbProductName(void);
174+
void getDetailsAboutThisMachine(std::string & machineName, std::string & osVersion, std::string & cpuArchitecture);
175+
bool runDataStoreCommand(std::string const & cmd, PersistenceError & dbError);
176+
bool runDataStoreCommand(uint32_t const & cmdType, std::string const & httpVerb,
177+
std::string const & baseUrl, std::string const & apiEndpoint, std::string const & queryParams,
178+
std::string const & jsonRequest, std::string & jsonResponse, PersistenceError & dbError);
179+
180+
// Lock related methods.
181+
uint64_t createOrGetLock(std::string const & name, PersistenceError & lkError);
182+
void releaseLock(uint64_t lock, PersistenceError & lkError);
183+
bool acquireLock(uint64_t lock, double leaseTime, double maxWaitTimeToAcquireLock, PersistenceError & lkError);
184+
bool removeLock(uint64_t lock, PersistenceError & lkError);
185+
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
186+
187+
};
188+
} } } } }
189+
#endif /* REDIS_CLUSTER_DB_LAYER_H_ */

0 commit comments

Comments
 (0)