66const redis = require ( "redis" ) ;
77const VError = require ( "verror" ) ;
88const { Logger } = require ( "./logger" ) ;
9- const { isOnCF , cfEnv } = require ( "./env" ) ;
9+ const { cfEnv } = require ( "./env" ) ;
1010const { HandlerCollection } = require ( "./shared/handlerCollection" ) ;
1111const { Semaphore } = require ( "./shared/semaphore" ) ;
1212
1313const COMPONENT_NAME = "/RedisWrapper" ;
1414const VERROR_CLUSTER_NAME = "RedisWrapperError" ;
1515
1616const INTEGRATION_MODE = Object . freeze ( {
17+ CF_REDIS_CLUSTER : "CF_REDIS_CLUSTER" ,
1718 CF_REDIS : "CF_REDIS" ,
1819 LOCAL_REDIS : "LOCAL_REDIS" ,
1920 NO_REDIS : "NO_REDIS" ,
2021} ) ;
22+ const CF_REDIS_SERVICE_LABEL = "redis-cache" ;
2123
2224const logger = new Logger ( COMPONENT_NAME ) ;
25+ const watchedGetSetSemaphore = new Semaphore ( ) ;
2326
2427const MODE = Object . freeze ( {
2528 RAW : "raw" ,
2629 OBJECT : "object" ,
2730} ) ;
2831
29- let redisIsOnCF = isOnCF ;
30- let mainClient = null ;
31- let subscriberClient = null ;
32- let messageHandlers = new HandlerCollection ( ) ;
33- let integrationMode = null ;
34-
35- const watchedGetSetSemaphore = new Semaphore ( ) ;
36-
32+ let messageHandlers ;
33+ let mainClient ;
34+ let subscriberClient ;
35+ let integrationMode ;
3736const _reset = ( ) => {
38- redisIsOnCF = isOnCF ;
37+ messageHandlers = new HandlerCollection ( ) ;
3938 mainClient = null ;
4039 subscriberClient = null ;
41- messageHandlers = new HandlerCollection ( ) ;
40+ integrationMode = null ;
4241} ;
42+ _reset ( ) ;
4343
4444const _logErrorOnEvent = ( err ) =>
45- redisIsOnCF ? logger . error ( err ) : logger . warning ( "%s | %O" , err . message , VError . info ( err ) ) ;
45+ cfEnv . isOnCf ? logger . error ( err ) : logger . warning ( "%s | %O" , err . message , VError . info ( err ) ) ;
4646
4747const _subscribedMessageHandler = async ( message , channel ) => {
4848 const handlers = messageHandlers . getHandlers ( channel ) ;
@@ -78,22 +78,30 @@ const _localReconnectStrategy = () =>
7878 * Lazily create a new redis client. Client creation transparently handles both the Cloud Foundry "redis-cache" service
7979 * (hyperscaler option) and a local redis-server.
8080 *
81- * @returns {RedisClient }
81+ * @returns {RedisClient|RedisCluster }
8282 * @private
8383 */
8484const _createClientBase = ( ) => {
85- if ( redisIsOnCF ) {
85+ if ( cfEnv . isOnCf ) {
8686 try {
87- const credentials = cfEnv . cfServiceCredentialsForLabel ( "redis-cache" ) ;
8887 // NOTE: settings the user explicitly to empty resolves auth problems, see
8988 // https://github.com/go-redis/redis/issues/1343
90- const url = credentials . uri . replace ( / (?< = r e d i s s : \/ \/ ) [ \w - ] + ?(? = : ) / , "" ) ;
89+ const redisCredentials = cfEnv . cfServiceCredentialsForLabel ( CF_REDIS_SERVICE_LABEL ) ;
90+ const redisIsCluster = redisCredentials . cluster_mode ;
91+ const url = redisCredentials . uri . replace ( / (?< = r e d i s s : \/ \/ ) [ \w - ] + ?(? = : ) / , "" ) ;
92+ if ( redisIsCluster ) {
93+ return redis . createCluster ( {
94+ rootNodes : [ { url } ] ,
95+ // https://github.com/redis/node-redis/issues/1782
96+ defaults : {
97+ password : redisCredentials . password ,
98+ socket : { tls : redisCredentials . tls } ,
99+ } ,
100+ } ) ;
101+ }
91102 return redis . createClient ( { url } ) ;
92103 } catch ( err ) {
93- throw new VError (
94- { name : VERROR_CLUSTER_NAME , cause : err } ,
95- "error during create client with redis-cache service"
96- ) ;
104+ throw new VError ( { name : VERROR_CLUSTER_NAME , cause : err } , "error during create client with redis service" ) ;
97105 }
98106 } else {
99107 // NOTE: documentation is buried here https://github.com/redis/node-redis/blob/master/docs/client-configuration.md
@@ -144,7 +152,7 @@ const _clientErrorHandlerBase = async (client, err, clientName) => {
144152 *
145153 * Only one publisher is necessary for any number of channels.
146154 *
147- * @returns {RedisClient }
155+ * @returns {RedisClient|RedisCluster }
148156 * @private
149157 */
150158const getMainClient = async ( ) => {
@@ -163,7 +171,7 @@ const getMainClient = async () => {
163171 *
164172 * Only one subscriber is necessary for any number of channels.
165173 *
166- * @returns {RedisClient }
174+ * @returns {RedisClient|RedisCluster }
167175 * @private
168176 */
169177const getSubscriberClient = async ( ) => {
@@ -205,6 +213,12 @@ const sendCommand = async (command) => {
205213 }
206214
207215 try {
216+ const redisIsCluster = cfEnv . cfServiceCredentialsForLabel ( CF_REDIS_SERVICE_LABEL ) . cluster_mode ;
217+ if ( redisIsCluster ) {
218+ // NOTE: the cluster sendCommand API has a different signature, where it takes two optional args: firstKey and
219+ // isReadonly before the command
220+ return await mainClient . sendCommand ( undefined , undefined , command ) ;
221+ }
208222 return await mainClient . sendCommand ( command ) ;
209223 } catch ( err ) {
210224 throw new VError (
@@ -476,8 +490,9 @@ const removeMessageHandler = (channel, handler) => messageHandlers.removeHandler
476490const removeAllMessageHandlers = ( channel ) => messageHandlers . removeAllHandlers ( channel ) ;
477491
478492const _getIntegrationMode = async ( ) => {
479- if ( redisIsOnCF ) {
480- return INTEGRATION_MODE . CF_REDIS ;
493+ if ( cfEnv . isOnCf ) {
494+ const redisIsCluster = cfEnv . cfServiceCredentialsForLabel ( CF_REDIS_SERVICE_LABEL ) . cluster_mode ;
495+ return redisIsCluster ? INTEGRATION_MODE . CF_REDIS_CLUSTER : INTEGRATION_MODE . CF_REDIS ;
481496 }
482497 try {
483498 await getMainClient ( ) ;
@@ -521,7 +536,6 @@ module.exports = {
521536 _reset,
522537 _getMessageHandlers : ( ) => messageHandlers ,
523538 _getLogger : ( ) => logger ,
524- _setRedisIsOnCF : ( value ) => ( redisIsOnCF = value ) ,
525539 _getMainClient : ( ) => mainClient ,
526540 _setMainClient : ( value ) => ( mainClient = value ) ,
527541 _getSubscriberClient : ( ) => subscriberClient ,
0 commit comments