@@ -150,6 +150,35 @@ class RedisProtocol {
150150 }
151151 return cmd;
152152 }
153+
154+ static std::string formatExpire (const std::string &key, int64_t seconds) {
155+ return " *3\r\n $6\r\n EXPIRE\r\n $" + std::to_string (key.length ()) + " \r\n " + key + " \r\n $" +
156+ std::to_string (std::to_string (seconds).length ()) + " \r\n " + std::to_string (seconds) + " \r\n " ;
157+ }
158+
159+ static std::string formatExpireAt (const std::string &key, int64_t timestamp) {
160+ return " *3\r\n $8\r\n EXPIREAT\r\n $" + std::to_string (key.length ()) + " \r\n " + key + " \r\n $" +
161+ std::to_string (std::to_string (timestamp).length ()) + " \r\n " + std::to_string (timestamp) + " \r\n " ;
162+ }
163+
164+ static std::string formatTtl (const std::string &key) {
165+ return " *2\r\n $3\r\n TTL\r\n $" + std::to_string (key.length ()) + " \r\n " + key + " \r\n " ;
166+ }
167+
168+ static int64_t parseIntegerResponse (const std::string &response) {
169+ if (response.empty () || response[0 ] != ' :' ) {
170+ throw InvalidInputException (" Invalid Redis integer response" );
171+ }
172+ size_t end = response.find (" \r\n " );
173+ if (end == std::string::npos) {
174+ throw InvalidInputException (" Invalid Redis integer response" );
175+ }
176+ try {
177+ return std::stoll (response.substr (1 , end - 1 ));
178+ } catch (const std::exception &e) {
179+ throw InvalidInputException (" Failed to parse Redis integer response: %s" , e.what ());
180+ }
181+ }
153182};
154183
155184// Redis connection class
@@ -845,6 +874,78 @@ static void RedisTypeFunction(DataChunk &args, ExpressionState &state, Vector &r
845874 });
846875}
847876
877+ static void RedisExpireFunction (DataChunk &args, ExpressionState &state, Vector &result) {
878+ auto &key_vector = args.data [0 ];
879+ auto &seconds_vector = args.data [1 ];
880+ auto &secret_vector = args.data [2 ];
881+
882+ // Extract secret once before executor loop (optimization)
883+ string host, port, password;
884+ if (!GetRedisSecret (state.GetContext (), secret_vector.GetValue (0 ).ToString (), host, port, password)) {
885+ throw InvalidInputException (" Redis secret not found" );
886+ }
887+ auto conn = ConnectionPool::getInstance ().getConnection (host, port, password);
888+
889+ BinaryExecutor::Execute<string_t , int64_t , bool >(
890+ key_vector, seconds_vector, result, args.size (), [&](string_t key, int64_t seconds) {
891+ try {
892+ auto response = conn->execute (RedisProtocol::formatExpire (key.GetString (), seconds));
893+ auto result_int = RedisProtocol::parseIntegerResponse (response);
894+ // Returns 1 if TTL was set (key exists), 0 if key doesn't exist
895+ return result_int == 1 ;
896+ } catch (std::exception &e) {
897+ throw InvalidInputException (" Redis EXPIRE error: %s" , e.what ());
898+ }
899+ });
900+ }
901+
902+ static void RedisTTLFunction (DataChunk &args, ExpressionState &state, Vector &result) {
903+ auto &key_vector = args.data [0 ];
904+ auto &secret_vector = args.data [1 ];
905+
906+ // Extract secret once before executor loop (optimization)
907+ string host, port, password;
908+ if (!GetRedisSecret (state.GetContext (), secret_vector.GetValue (0 ).ToString (), host, port, password)) {
909+ throw InvalidInputException (" Redis secret not found" );
910+ }
911+ auto conn = ConnectionPool::getInstance ().getConnection (host, port, password);
912+
913+ UnaryExecutor::Execute<string_t , int64_t >(key_vector, result, args.size (), [&](string_t key) {
914+ try {
915+ auto response = conn->execute (RedisProtocol::formatTtl (key.GetString ()));
916+ // Returns: -2 if key doesn't exist, -1 if key has no expiry, or positive TTL in seconds
917+ return RedisProtocol::parseIntegerResponse (response);
918+ } catch (std::exception &e) {
919+ throw InvalidInputException (" Redis TTL error: %s" , e.what ());
920+ }
921+ });
922+ }
923+
924+ static void RedisExpireAtFunction (DataChunk &args, ExpressionState &state, Vector &result) {
925+ auto &key_vector = args.data [0 ];
926+ auto ×tamp_vector = args.data [1 ];
927+ auto &secret_vector = args.data [2 ];
928+
929+ // Extract secret once before executor loop (optimization)
930+ string host, port, password;
931+ if (!GetRedisSecret (state.GetContext (), secret_vector.GetValue (0 ).ToString (), host, port, password)) {
932+ throw InvalidInputException (" Redis secret not found" );
933+ }
934+ auto conn = ConnectionPool::getInstance ().getConnection (host, port, password);
935+
936+ BinaryExecutor::Execute<string_t , int64_t , bool >(
937+ key_vector, timestamp_vector, result, args.size (), [&](string_t key, int64_t timestamp) {
938+ try {
939+ auto response = conn->execute (RedisProtocol::formatExpireAt (key.GetString (), timestamp));
940+ auto result_int = RedisProtocol::parseIntegerResponse (response);
941+ // Returns 1 if expiry was set (key exists), 0 if key doesn't exist
942+ return result_int == 1 ;
943+ } catch (std::exception &e) {
944+ throw InvalidInputException (" Redis EXPIREAT error: %s" , e.what ());
945+ }
946+ });
947+ }
948+
848949static void LoadInternal (ExtensionLoader &loader) {
849950 // Register the secret functions first!
850951 CreateRedisSecretFunctions::Register (loader);
@@ -972,6 +1073,36 @@ static void LoadInternal(ExtensionLoader &loader) {
9721073 " stream) or 'none' if the key does not exist." ,
9731074 {" key" , " secret_name" }, {" SELECT redis_type('mykey', 'my_redis_secret');" });
9741075
1076+ // Register redis_expire scalar function
1077+ add_scalar_function (
1078+ ScalarFunction (" redis_expire" , {LogicalType::VARCHAR, LogicalType::BIGINT, LogicalType::VARCHAR},
1079+ LogicalType::BOOLEAN, RedisExpireFunction),
1080+ " Set a time-to-live (TTL) in seconds for a key. Returns true if the TTL was set, false if the key does not exist." ,
1081+ {" key" , " seconds" , " secret_name" },
1082+ {" SELECT redis_expire('mykey', 3600, 'my_redis_secret');" ,
1083+ " SELECT redis_expire('session:' || id, 300, 'my_redis_secret') FROM users;" });
1084+
1085+ // Register redis_ttl scalar function
1086+ add_scalar_function (ScalarFunction (" redis_ttl" , {LogicalType::VARCHAR, LogicalType::VARCHAR}, LogicalType::BIGINT,
1087+ RedisTTLFunction),
1088+ " Get the remaining time-to-live (TTL) of a key in seconds. Returns -2 if the key does not exist, "
1089+ " -1 if the key exists but has no expiry set." ,
1090+ {" key" , " secret_name" },
1091+ {" SELECT redis_ttl('mykey', 'my_redis_secret');" ,
1092+ " SELECT key, redis_ttl(key, 'my_redis_secret') FROM (SELECT redis_get(key) as key FROM "
1093+ " redis_keys('*', 'my_redis_secret')); " });
1094+
1095+ // Register redis_expireat scalar function
1096+ add_scalar_function (
1097+ ScalarFunction (" redis_expireat" , {LogicalType::VARCHAR, LogicalType::BIGINT, LogicalType::VARCHAR},
1098+ LogicalType::BOOLEAN, RedisExpireAtFunction),
1099+ " Set an expiry time (Unix timestamp) for a key. Returns true if the expiry was set, false if the key does not "
1100+ " exist." ,
1101+ {" key" , " timestamp" , " secret_name" },
1102+ {" SELECT redis_expireat('mykey', 1736918400, 'my_redis_secret');" ,
1103+ " SELECT redis_expireat('event:' || id, EXTRACT(EPOCH FROM (event_time + INTERVAL '1 day'))::BIGINT, "
1104+ " 'my_redis_secret') FROM events;" });
1105+
9751106 // Register redis_keys table function
9761107 add_table_function (
9771108 TableFunction (" redis_keys" , {LogicalType::VARCHAR, LogicalType::VARCHAR}, RedisKeysFunction, RedisKeysBind),
@@ -1006,7 +1137,7 @@ static void LoadInternal(ExtensionLoader &loader) {
10061137 {" scan_pattern" , " hscan_pattern" , " count" , " secret_name" },
10071138 {" SELECT * FROM redis_hscan_over_scan('user:*', '*', 100, 'my_redis_secret');" });
10081139
1009- QueryFarmSendTelemetry (loader, " redis" , " 2025120401 " );
1140+ QueryFarmSendTelemetry (loader, " redis" , " 2026011401 " );
10101141}
10111142
10121143void RedisExtension::Load (ExtensionLoader &loader) {
@@ -1018,7 +1149,7 @@ std::string RedisExtension::Name() {
10181149}
10191150
10201151std::string RedisExtension::Version () const {
1021- return " 2025120401 " ;
1152+ return " 2026011401 " ;
10221153}
10231154
10241155} // namespace duckdb
0 commit comments