Skip to content

Commit e680e8e

Browse files
MoFHekarhdong
authored andcommitted
[feat] Add redis_hash_tags_hypodispersion to redis backend config.
Distribution of storag_slice tag will be hypodispersion in 16354 regardless cluster slot, but still depends on redis_hash_tags_import/runtime if they aren't empty. [feat] Add using_hash_storage_slice in redis config. If True, IDs will be calculated hash(CRC32) value and then MOD to decide which bucket number they belong to. If False, only calculate the remainder. [feat] Change redis_connection_mode config now redis_connection_mode = 2 will be standalone mode. [fix] Fix no redis sentinel password option in redis backend config.
1 parent b02d738 commit e680e8e

File tree

11 files changed

+394
-220
lines changed

11 files changed

+394
-220
lines changed

docs/api_docs/tfra/dynamic_embedding/RedisBackend.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Below is an example of a JSON file, along with comments on the corresponding pro
2424
**Attention! Json files cannot contain comments when actually used!**
2525
```json
2626
{
27-
"redis_connection_mode": 1, // ClusterMode = 0, SentinelMode = 1, StreamMode = 2
27+
"redis_connection_mode": 2, // ClusterMode = 0, SentinelMode = 1, StandaloneMode = 2
2828
"redis_master_name": "master",
2929

3030
// connection_options
@@ -44,14 +44,20 @@ Below is an example of a JSON file, along with comments on the corresponding pro
4444
"redis_connection_lifetime": 100, // minutes
4545

4646
// sentinel_connection_options
47+
"redis_sentinel_user": "default",
48+
"redis_sentinel_password": "",
4749
"redis_sentinel_connect_timeout": 1000, // milliseconds
4850
"redis_sentinel_socket_timeout": 1000, // milliseconds
4951

5052
// Below there is user-defined parameters in this custom op, not Redis setting parameters
5153
"storage_slice_import": 2, // If storage_slice_import is not equal to storage_slice, rehash will happen. Equaling -1 means same as storage_slice.
5254
"storage_slice": 2, // For deciding bucket number, which usually is how many Redis instance may be used in the trainning.
55+
"using_hash_storage_slice":
56+
False, // If True, IDs will be calculated hash(CRC32) value and then MOD to decide which bucket number they belong to. If False, only calculate the remainder.
5357
"keys_sending_size": 1024, // Determines how many keys to send at a time for performance tuning
5458
"using_md5_prefix_name": False, // 1=true, 0=false
59+
"redis_hash_tags_hypodispersion":
60+
True, // distribution of storag_slice will be hypodispersion in 16354 regardless cluster slot, but still depends on redis_hash_tags_import/runtime if they aren't empty.
5561
"model_tag_import": "test", // model_tag_import for version and any other information from last time.
5662
"redis_hash_tags_import": ["{6379}","{26379}"], // Deciding hash tag for every bucket from last time, Note that the hash tag must be wrapped in curly braces {}.
5763
"model_tag_runtime": "test", // model_tag_runtime for version and any other information for now.

docs/api_docs/tfra/dynamic_embedding/RedisTableConfig.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ assign the embedding table starage properties.
3434
An example of a configuration file is shown below:
3535
```python
3636
{
37-
"redis_connection_mode": 1,
37+
"redis_connection_mode": 2,
3838
"redis_master_name": "master",
3939
"redis_host_ip": [
4040
"127.0.0.1"
@@ -50,12 +50,16 @@ An example of a configuration file is shown below:
5050
"redis_conn_pool_size": 20,
5151
"redis_wait_timeout": 100000000,
5252
"redis_connection_lifetime": 100,
53+
"redis_sentinel_user": "default",
54+
"redis_sentinel_password": "",
5355
"redis_sentinel_connect_timeout": 1000,
5456
"redis_sentinel_socket_timeout": 1000,
5557
"storage_slice_import": 1,
5658
"storage_slice": 1,
59+
"using_hash_storage_slice": False,
5760
"keys_sending_size": 1024,
5861
"using_md5_prefix_name": False,
62+
"redis_hash_tags_hypodispersion": False,
5963
"model_tag_import": "test",
6064
"redis_hash_tags_import": [],
6165
"model_tag_runtime": "test",

tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Below is an example of a JSON file, along with comments on the corresponding pro
2424
**Attention! Json files cannot contain comments when actually used!**
2525
```json
2626
{
27-
"redis_connection_mode": 1, // ClusterMode = 0, SentinelMode = 1, StreamMode = 2
27+
"redis_connection_mode": 2, // ClusterMode = 0, SentinelMode = 1, StandaloneMode = 2
2828
"redis_master_name": "master",
2929

3030
// connection_options
@@ -44,14 +44,20 @@ Below is an example of a JSON file, along with comments on the corresponding pro
4444
"redis_connection_lifetime": 100, // minutes
4545

4646
// sentinel_connection_options
47+
"redis_sentinel_user": "default",
48+
"redis_sentinel_password": "",
4749
"redis_sentinel_connect_timeout": 1000, // milliseconds
4850
"redis_sentinel_socket_timeout": 1000, // milliseconds
4951

5052
// Below there is user-defined parameters in this custom op, not Redis setting parameters
5153
"storage_slice_import": 2, // If storage_slice_import is not equal to storage_slice, rehash will happen. Equaling -1 means same as storage_slice.
5254
"storage_slice": 2, // For deciding bucket number, which usually is how many Redis instance may be used in the trainning.
55+
"using_hash_storage_slice":
56+
False, // If True, IDs will be calculated hash(CRC32) value and then MOD to decide which bucket number they belong to. If False, only calculate the remainder.
5357
"keys_sending_size": 1024, // Determines how many keys to send at a time for performance tuning
5458
"using_md5_prefix_name": False, // 1=true, 0=false
59+
"redis_hash_tags_hypodispersion":
60+
False, // distribution of storag_slice will be hypodispersion in 16354 regardless cluster slot, but still depends on redis_hash_tags_import/runtime if they aren't empty.
5561
"model_tag_import": "test", // model_tag_import for version and any other information from last time.
5662
"redis_hash_tags_import": ["{6379}","{26379}"], // Deciding hash tag for every bucket from last time, Note that the hash tag must be wrapped in curly braces {}.
5763
"model_tag_runtime": "test", // model_tag_runtime for version and any other information for now.

tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/redis_cluster_connection_pool.hpp

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ class RedisWrapper<RedisInstance, K, V,
497497
}
498498

499499
virtual std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> MgetInBucket(
500-
const Tensor &keys, const int64 begin, const int64 max_i,
500+
const Tensor &keys, const int64_t begin, const int64_t max_i,
501501
const std::string &keys_prefix_name_slice) override {
502502
std::unique_ptr<BucketContext> bucket_context_temp(new BucketContext());
503503
const static char *redis_command = "HMGET";
@@ -937,8 +937,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
937937
*/
938938
virtual std::vector<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>
939939
MgetCommand(
940-
const Tensor &keys, ThreadContext *thread_context, const int64 begin,
941-
const int64 max_i,
940+
const Tensor &keys, ThreadContext *thread_context, const int64_t begin,
941+
const int64_t max_i,
942942
const std::vector<std::string> &keys_prefix_name_slices) override {
943943
const int &&total = max_i - begin;
944944
const int &&argc = total + 2;
@@ -953,7 +953,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
953953

954954
const unsigned &storage_slice = redis_connection_params.storage_slice;
955955
const unsigned &&vector_len =
956-
(static_cast<int64>(reinterpret_cast<int>(argc)) /
956+
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
957957
redis_connection_params.storage_slice) +
958958
2;
959959

@@ -968,7 +968,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
968968
unsigned *pbucket_loc = thread_context->bucket_locs->data();
969969
unsigned key_bucket_locs = 0;
970970
for (; pk_raw != pk_raw_end; ++pk_raw) {
971-
key_bucket_locs = KBucketNum<K>(pk_raw, storage_slice);
971+
key_bucket_locs =
972+
KBucketNum<K>(this->K_bucket_num_handle, pk_raw, storage_slice);
972973
// The bucket to which the key belongs is recorded to facilitate future
973974
// memory writes that do not recompute the redis hash
974975
*pbucket_loc = key_bucket_locs;
@@ -1019,7 +1020,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
10191020
inline void CopyDefaultToTensor(const bool is_full_default, const V *pv_raw,
10201021
const V *dft_raw,
10211022
const V *const dft_raw_begin,
1022-
const int64 Velems_per_dim0) {
1023+
const int64_t Velems_per_dim0) {
10231024
if (is_full_default) {
10241025
DefaultMemcpyToTensor<V>(
10251026
pv_raw, dft_raw,
@@ -1036,8 +1037,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
10361037
ThreadContext *thread_context,
10371038
std::vector<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>
10381039
&reply,
1039-
const int64 begin, const int64 max_i,
1040-
const int64 Velems_per_dim0) override {
1040+
const int64_t begin, const int64_t max_i,
1041+
const int64_t Velems_per_dim0) override {
10411042
const V *pv_raw =
10421043
reinterpret_cast<const V *>(values->tensor_data().data()) +
10431044
begin * Velems_per_dim0;
@@ -1096,8 +1097,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
10961097
const bool is_full_default, ThreadContext *thread_context,
10971098
std::vector<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>
10981099
&reply,
1099-
const int64 begin, const int64 max_i,
1100-
const int64 Velems_per_dim0) override {
1100+
const int64_t begin, const int64_t max_i,
1101+
const int64_t Velems_per_dim0) override {
11011102
const V *pv_raw =
11021103
reinterpret_cast<const V *>(values->tensor_data().data()) +
11031104
begin * Velems_per_dim0;
@@ -1117,7 +1118,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
11171118
redisReply *temp_reply;
11181119
bool print_once[storage_slice];
11191120
memset(print_once, false, sizeof(print_once));
1120-
for (int64 i = 0, j = begin; i < (max_i - begin);
1121+
for (int64_t i = 0, j = begin; i < (max_i - begin);
11211122
++i, ++j, pv_raw += Velems_per_dim0, dft_raw += Velems_per_dim0) {
11221123
bucket_loc = (*bucket_locs)[i];
11231124
if (reply[bucket_loc] != nullptr) {
@@ -1157,7 +1158,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
11571158

11581159
virtual Status MsetCommand(
11591160
const Tensor &keys, const Tensor &values, ThreadContext *thread_context,
1160-
const int64 begin, const int64 max_i, const int64 Velems_per_dim0,
1161+
const int64_t begin, const int64_t max_i, const int64_t Velems_per_dim0,
11611162
const std::vector<std::string> &keys_prefix_name_slices) override {
11621163
const int &&total = max_i - begin;
11631164
const int &&argc = total * 2 + 2;
@@ -1177,7 +1178,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
11771178

11781179
const unsigned &storage_slice = redis_connection_params.storage_slice;
11791180
const unsigned &&vector_len =
1180-
(static_cast<int64>(reinterpret_cast<int>(argc)) /
1181+
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
11811182
redis_connection_params.storage_slice) +
11821183
2;
11831184

@@ -1198,7 +1199,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
11981199
VCATS_temp = VContentAndTypeSize<V>(VCATS_temp, Velems_per_dim0,
11991200
V_byte_size, pv_raw, buff_temp[i]);
12001201
key_bucket_locs =
1201-
KBucketNum<K>(pk_raw, storage_slice); // TODO: change it to AVX512
1202+
KBucketNum<K>(this->K_bucket_num_handle, pk_raw,
1203+
storage_slice); // TODO: change it to AVX512
12021204

12031205
// Direct access to Tensor data in TensorFlow
12041206
thread_context->HandlePushBack(
@@ -1246,8 +1248,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
12461248

12471249
virtual Status MaccumCommand(
12481250
const Tensor &keys, const Tensor &values_or_delta, const Tensor &exists,
1249-
ThreadContext *thread_context, const int64 begin, const int64 max_i,
1250-
const int64 Velems_per_dim0,
1251+
ThreadContext *thread_context, const int64_t begin, const int64_t max_i,
1252+
const int64_t Velems_per_dim0,
12511253
const std::vector<std::string> &keys_prefix_name_slices) override {
12521254
const int &&total = max_i - begin;
12531255
const int &&argc = total * 2 + 4;
@@ -1270,7 +1272,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
12701272

12711273
const unsigned &storage_slice = redis_connection_params.storage_slice;
12721274
const unsigned &&vector_len =
1273-
(static_cast<int64>(reinterpret_cast<int>(argc)) /
1275+
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
12741276
redis_connection_params.storage_slice) +
12751277
4;
12761278

@@ -1292,7 +1294,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
12921294
VCATS_temp = VContentAndTypeSize<V>(VCATS_temp, Velems_per_dim0,
12931295
V_byte_size, pv_raw, buff_temp[i]);
12941296
key_bucket_locs =
1295-
KBucketNum<K>(pk_raw, storage_slice); // TODO: change it to AVX512
1297+
KBucketNum<K>(this->K_bucket_num_handle, pk_raw,
1298+
storage_slice); // TODO: change it to AVX512
12961299

12971300
// Direct access to Tensor data in TensorFlow
12981301
thread_context->HandlePushBack(
@@ -1346,8 +1349,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
13461349
}
13471350

13481351
virtual Status DelCommand(
1349-
const Tensor &keys, ThreadContext *thread_context, const int64 begin,
1350-
const int64 max_i,
1352+
const Tensor &keys, ThreadContext *thread_context, const int64_t begin,
1353+
const int64_t max_i,
13511354
const std::vector<std::string> &keys_prefix_name_slices) override {
13521355
const int &&total = max_i - begin;
13531356
const int &&argc = total + 2;
@@ -1362,7 +1365,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
13621365

13631366
const unsigned &storage_slice = redis_connection_params.storage_slice;
13641367
const unsigned &&vector_len =
1365-
(static_cast<int64>(reinterpret_cast<int>(argc)) /
1368+
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
13661369
redis_connection_params.storage_slice) +
13671370
2;
13681371

@@ -1377,7 +1380,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
13771380
unsigned *pbucket_loc = thread_context->bucket_locs->data();
13781381
unsigned key_bucket_locs = 0;
13791382
for (; pk_raw != pk_raw_end; ++pk_raw) {
1380-
key_bucket_locs = KBucketNum<K>(pk_raw, storage_slice);
1383+
key_bucket_locs =
1384+
KBucketNum<K>(this->K_bucket_num_handle, pk_raw, storage_slice);
13811385
// The bucket to which the key belongs is recorded to facilitate future
13821386
// memory writes that do not recompute the redis hash
13831387
*pbucket_loc = key_bucket_locs;

0 commit comments

Comments
 (0)