Skip to content

Commit 8007350

Browse files
author
Senthil Nathan
committed
Changes done for v4.1.6.
1 parent 93cbc06 commit 8007350

26 files changed

+980
-148
lines changed

com.ibm.streamsx.dps/com.ibm.streamsx/store/distributed/native.function/function.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ it does safety checks and is therefore slower.</description>
433433
</description>
434434
<prototype>public stateful void dpsEndIteration(uint64 store, uint64 iterator, mutable uint64 err)</prototype>
435435
</function>
436+
436437
<function>
437438
<description>This function can be called to get multiple keys present in a given store.
438439
@param store The handle of the store.
@@ -445,6 +446,31 @@ it does safety checks and is therefore slower.</description>
445446
</description>
446447
<prototype>&lt;any T1> public stateful void dpsGetKeys(uint64 store, mutable list&lt;T1&gt; keys, int32 keyStartPosition, int32 numberOfKeysNeeded, rstring keyExpression, rstring valueExpression, mutable uint64 err)</prototype>
447448
</function>
449+
450+
<function>
451+
<description>This function can be called to get values for a given list of multiple keys present in a given store.
452+
@param store The handle of the store.
453+
@param keys User provided list variable that contains keys for which values need to be fetched.
454+
@param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appears in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
455+
@param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
456+
@return It returns true if value fetch worked for all given keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
457+
</description>
458+
<prototype>&lt;any T1, any T2> public stateful boolean dpsGetValues(uint64 store, list&lt;T1&gt; keys, mutable list&lt;T2&gt; values, mutable list&lt;uint64&gt; errors)</prototype>
459+
</function>
460+
461+
<function>
462+
<description>This function can be called to get multiple Key/Value (KV) pairs present in a given store.
463+
@param store The handle of the store.
464+
@param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
465+
@param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
466+
@param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
467+
@param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
468+
@param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
469+
@return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
470+
</description>
471+
<prototype>&lt;any T1, any T2> public stateful boolean dpsGetKVPairs(uint64 store, mutable list&lt;T1&gt; keys, mutable list&lt;T2&gt; values, int32 keyStartPosition, int32 numberOfPairsNeeded, mutable list&lt;uint64&gt; errors)</prototype>
472+
</function>
473+
448474
<function>
449475
<description> This function serializes all the key-value pairs in a given store id into a blob. The blob can be used to recreate all the key-value pairs into another store. This is a useful technique for copying an entire store into a different store. See `dpsDeserialize()` for a detailed example on how to use the serialization functions to copy a store.
450476
@param store the handle of the store to serialize.

com.ibm.streamsx.dps/impl/include/CassandraDBLayer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ namespace distributed
159159
bool removeLock(uint64_t lock, PersistenceError & lkError);
160160
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
161161
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
162+
void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
162163

163164
};
164165
} } } } }

com.ibm.streamsx.dps/impl/include/CloudantDBLayer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ namespace distributed
198198
bool removeLock(uint64_t lock, PersistenceError & lkError);
199199
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
200200
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
201+
void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
201202

202203
};
203204
} } } } }

com.ibm.streamsx.dps/impl/include/CouchbaseDBLayer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ namespace distributed
204204
bool removeLock(uint64_t lock, PersistenceError & lkError);
205205
uint32_t getPidForLock(std::string const & name, PersistenceError & lkError);
206206
void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError);
207+
void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error);
207208

208209
};
209210
} } } } }

com.ibm.streamsx.dps/impl/include/DBLayer.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
# Licensed Materials - Property of IBM
3-
# Copyright IBM Corp. 2011, 2014
3+
# Copyright IBM Corp. 2011, 2022
44
# US Government Users Restricted Rights - Use, duplication or
55
# disclosure restricted by GSA ADP Schedule Contract with
66
# IBM Corp.
@@ -289,6 +289,10 @@ namespace store {
289289
/// @return a list containing multiple keys of a given data type.
290290
virtual void getKeys(uint64_t store, std::vector<unsigned char *> & keysBuffer, std::vector<uint32_t> & keysSize, int32_t keyStartPosition, int32_t numberOfKeysNeeded, PersistenceError & dbError) = 0;
291291

292+
/// Get value for a given key present in a given store.
293+
/// @return a value for a given key of a given data type.
294+
virtual void getValue(std::string const & storeIdString, char const * & key, uint32_t const & keySize, unsigned char * & value, uint32_t & valueSize, uint64_t & error) = 0;
295+
292296
/// A store iterator
293297
class Iterator
294298
{

com.ibm.streamsx.dps/impl/include/DistributedProcessStore.h

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,27 @@ namespace distributed
230230
template<class T1>
231231
void getKeys(SPL::uint64 store, SPL::list<T1> & keys, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfKeysNeeded, SPL::rstring const & keyExpression, SPL::rstring const & valueExpression, SPL::uint64 & err);
232232

233+
/// This function can be called to get values for a given list of multiple keys present in a given store.
234+
/// @param store The handle of the store.
235+
/// @param keys User provided list variable that contains keys for which values need to be fetched.
236+
/// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
237+
/// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
238+
///
239+
template<class T1, class T2>
240+
bool getValues(SPL::uint64 store, SPL::list<T1> const & keys, SPL::list<T2> & values, SPL::list<SPL::uint64> & errors);
241+
242+
/// This function can be called to get multiple Key/Value (KV) pairs present in a given store.
243+
/// @param store The handle of the store.
244+
/// @param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
245+
/// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
246+
/// @param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
247+
/// @param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
248+
/// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
249+
/// @return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
250+
///
251+
template<class T1, class T2>
252+
bool getKVPairs(SPL::uint64 store, SPL::list<T1> & keys, SPL::list<T2> & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::list<SPL::uint64> & errors);
253+
233254
/// Serialize the items from the serialized store
234255
/// @param store store handle
235256
/// @param data blob to serialize into
@@ -912,6 +933,7 @@ namespace distributed
912933
for (unsigned int i = 0; i < keysBuffer.size(); i++) {
913934
SPL::NativeByteBuffer nbf_key(keysBuffer.at(i), keysSize.at(i));
914935
T1 tempKey;
936+
// Deserialize the obtained keys from their NBF format to their native SPL type.
915937
nbf_key >> tempKey;
916938
keys.push_back(tempKey);
917939

@@ -920,7 +942,102 @@ namespace distributed
920942
free(keysBuffer.at(i));
921943
}
922944
} // End of for loop.
923-
}
945+
} // End of getKeys method.
946+
947+
/// This function can be called to get values for a given list of multiple keys present in a given store.
948+
/// @param store The handle of the store.
949+
/// @param keys User provided list variable that contains keys for which values need to be fetched.
950+
/// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the key appeared in the user provided keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
951+
/// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the user provided keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
952+
///
953+
template<class T1, class T2>
954+
bool DistributedProcessStore::getValues(SPL::uint64 store, SPL::list<T1> const & keys, SPL::list<T2> & values, SPL::list<SPL::uint64> & errors) {
955+
dbError_->reset();
956+
bool resultStatus = true;
957+
958+
// Clear the user provided mutable lists now.
959+
values.clear();
960+
errors.clear();
961+
962+
// Create a string form of the store id.
963+
std::ostringstream storeId;
964+
storeId << store;
965+
std::string storeIdString = storeId.str();
966+
967+
// Populate the key buffer with the serialized key.
968+
// i.e. Serialize it from the native SPL type to NBF format.
969+
// Then, call the method in the underlying DB implementation layer for every key that we have.
970+
for(int i=0; i<keys.size(); i++) {
971+
SPL::NativeByteBuffer key_nbf;
972+
key_nbf << keys.at(i);
973+
char const * keyData = (char const *)key_nbf.getPtr();
974+
uint32_t keySize = key_nbf.getSerializedDataSize();
975+
976+
unsigned char * valueData;
977+
uint32_t valueSize = 0;
978+
uint64_t error = 0;
979+
db_->getValue(storeIdString, keyData, keySize, valueData, valueSize, error);
980+
981+
// Collect the results in the user provided list.
982+
T2 tempValue;
983+
984+
if(error == 0) {
985+
SPL::NativeByteBuffer value_nbf(valueData, valueSize);
986+
value_nbf >> tempValue;
987+
} else {
988+
// At least one error seen while fetching the value for a key.
989+
resultStatus = false;
990+
}
991+
992+
values.push_back(tempValue);
993+
errors.push_back(error);
994+
995+
// We must free the memory allocated in the DB layer.
996+
if(valueSize > 0) {
997+
free(valueData);
998+
}
999+
} // End of for loop.
1000+
1001+
return(resultStatus);
1002+
} // End of getValues method.
1003+
1004+
/// This function can be called to get multiple Key/Value (KV) pairs present in a given store.
1005+
/// @param store The handle of the store.
1006+
/// @param keys User provided mutable list variable in which the keys found in the store will be returned. This list must be suitable for storing multiple keys found in a given store and it must be made of a given store's key data type.
1007+
/// @param values User provided mutabe list variable in which the fetched values will be returned. This list must be suitable for storing multiple values obtained from a given store and it must be made of a given store's value data type. Fetched values will be available in this list at the same index where the corresponding key appears in the keys list. Before using the individual values from this list, it is recommended to make sure that a given value fetch worked with no errors.
1008+
/// @param keyStartPosition User can indicate a start position from where the K/V pairs should be fetched and returned. It must be greater than or equal to zero. If not, this API will return back with an empty list of keys.
1009+
/// @param numberOfPairsNeeded User can indicate the total number of K/V pairs to be returned as available from the given key start position. It must be greater than or equal to 0 and less than or equal to 50000. If it is set to 0, then all the available K/V pairs upto a maximum of 50000 pairs from the given key start position will be returned.
1010+
/// @param errors User provided mutable list variable in which the individual success or failure value fetch result codes will be returned. This list must be of type uint64. Each list element will be 0 if no error occurs and a non-zero error code otherwise. Such value fetch result codes will be available in this list at the same index where the key appears in the keys list. If a given result code doesn't indicate a successful value fetch, it is better to skip the corresponding element at the same index in the mutable values list.
1011+
/// @return It returns true if value fetch worked for all the keys with no errors. Else, it returns false to indicate that value fetch encountered error for one or more keys.
1012+
///
1013+
template<class T1, class T2>
1014+
bool DistributedProcessStore::getKVPairs(SPL::uint64 store, SPL::list<T1> & keys, SPL::list<T2> & values, SPL::int32 const & keyStartPosition, SPL::int32 const & numberOfPairsNeeded, SPL::list<SPL::uint64> & errors) {
1015+
// The logic in this method is going to be a combination of the two previous methods shown above.
1016+
// We have to first get the keys and then the values for those keys.
1017+
dbError_->reset();
1018+
bool resultStatus = true;
1019+
uint64_t error = 0;
1020+
std::string keyExpression = "";
1021+
std::string valueExpression = "";
1022+
1023+
// Clear the user provided mutable lists now.
1024+
keys.clear();
1025+
values.clear();
1026+
errors.clear();
1027+
1028+
// Let us first get the keys available in the user specified range.
1029+
getKeys(store, keys, keyStartPosition, numberOfPairsNeeded, keyExpression, valueExpression, error);
1030+
1031+
// If we encountered an error in getting the keys, we can return back from here.
1032+
if(error != 0) {
1033+
resultStatus = false;
1034+
return(resultStatus);
1035+
}
1036+
1037+
// We got the keys. Let us now get the values stored for those keys.
1038+
resultStatus = getValues(store, keys, values, errors);
1039+
return(resultStatus);
1040+
} // End of getKVPairs method.
9241041

9251042
template<class T1, class T2>
9261043
void DistributedProcessStore::serialize(SPL::uint64 store, SPL::blob & data, SPL::uint64 & err)

0 commit comments

Comments
 (0)