Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 109 additions & 4 deletions redisvl/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import warnings
import weakref
from pathlib import Path
from math import ceil
from typing import (
TYPE_CHECKING,
Expand All @@ -28,6 +29,7 @@
from redis.asyncio import Redis as AsyncRedis
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
from redis.cluster import RedisCluster
import yaml

from redisvl.query.hybrid import HybridQuery
from redisvl.query.query import VectorQuery
Expand Down Expand Up @@ -335,6 +337,10 @@ def storage_type(self) -> StorageType:
def from_yaml(cls, schema_path: str, **kwargs):
"""Create a SearchIndex from a YAML schema file.

If the YAML file contains ``_redis_url`` or ``_connection_kwargs``
keys (produced by ``to_yaml(include_connection=True)``), they are
automatically passed to the constructor.

Args:
schema_path (str): Path to the YAML schema file.

Expand All @@ -347,15 +353,26 @@ def from_yaml(cls, schema_path: str, **kwargs):

index = SearchIndex.from_yaml("schemas/schema.yaml", redis_url="redis://localhost:6379")
"""
schema = IndexSchema.from_yaml(schema_path)
return cls(schema=schema, **kwargs)
path_obj = Path(schema_path)
try:
resolved_path = path_obj.resolve()
except Exception as exc:
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_yaml catches a broad Exception around Path.resolve(), while other from_yaml helpers in this repo catch OSError specifically (e.g., IndexSchema.from_yaml and SemanticRouter.from_yaml). Narrowing this to except OSError would avoid masking unexpected errors and keep error-handling consistent.

Suggested change
except Exception as exc:
except OSError as exc:

Copilot uses AI. Check for mistakes.
raise ValueError(f"Invalid schema path: {schema_path}") from exc
try:
with resolved_path.open() as f:
data = yaml.safe_load(f) or {}
except FileNotFoundError as exc:
raise FileNotFoundError(f"Schema file not found: {resolved_path}") from exc
return cls.from_dict(data, **kwargs)

@classmethod
def from_dict(cls, schema_dict: Dict[str, Any], **kwargs):
"""Create a SearchIndex from a dictionary.

Args:
schema_dict (Dict[str, Any]): A dictionary containing the schema.
schema_dict (Dict[str, Any]): A dictionary containing the schema
and optionally ``_redis_url`` and ``_connection_kwargs``
(produced by ``to_dict(include_connection=True)``).

Returns:
SearchIndex: A RedisVL SearchIndex object.
Expand All @@ -376,9 +393,97 @@ def from_dict(cls, schema_dict: Dict[str, Any], **kwargs):
}, redis_url="redis://localhost:6379")

"""
schema = IndexSchema.from_dict(schema_dict)
# Extract connection metadata so it reaches the constructor
# instead of being silently dropped by IndexSchema validation.
copy = dict(schema_dict)
if "_redis_url" in copy:
url = copy.pop("_redis_url")
# Skip sanitized URLs (contain ****) to avoid silent auth
# failures — these are for inspection only, not round-trip.
if url and "****" not in url:
kwargs.setdefault("redis_url", url)
if "_connection_kwargs" in copy:
kwargs.setdefault("connection_kwargs", copy.pop("_connection_kwargs"))

schema = IndexSchema.from_dict(copy)
return cls(schema=schema, **kwargs)

def _sanitize_redis_url(self) -> Optional[str]:
"""Sanitize a Redis URL by masking passwords.

Handles both ``user:password@`` and ``:password@`` URL patterns.

Returns:
Optional[str]: The sanitized URL, or None if ``_redis_url``
is not set.
"""
if not self._redis_url:
return None
from urllib.parse import urlparse

parsed = urlparse(self._redis_url)
# Only mask when a password component is present (including
# empty-string passwords). Username-only URLs like
# ``redis://user@host:6379`` are left unchanged.
if parsed.password is not None:
host_info = parsed.hostname or ""
if parsed.port:
host_info += f":{parsed.port}"
user_part = f"{parsed.username}:" if parsed.username is not None else ":"
netloc = f"{user_part}****@{host_info}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IPv6 addresses produce malformed URLs after sanitization

Low Severity

_sanitize_redis_url reconstructs the netloc using parsed.hostname, which strips square brackets from IPv6 addresses. For a URL like redis://:pass@[::1]:6379, parsed.hostname returns ::1 instead of [::1], so the reconstructed host_info becomes ::1:6379 — an ambiguous string where the port is indistinguishable from part of the IPv6 address. The resulting URL redis://:****@::1:6379 is malformed; it needs to be redis://:****@[::1]:6379.

Fix in Cursor Fix in Web

return parsed._replace(netloc=netloc).geturl()
return self._redis_url

Comment on lines +426 to +436
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sanitize_redis_url() reconstructs the URL netloc using parsed.hostname/parsed.port, which can produce invalid or lossy URLs for valid Redis URLs that don’t fit the single-host pattern (e.g., IPv6 hosts that require brackets, or redis+sentinel:// URLs with comma-separated hosts). Consider sanitizing by operating on parsed.netloc (split on @, then mask only the password portion of the userinfo) so the host segment is preserved verbatim while still redacting credentials.

Suggested change
# empty-string passwords). Username-only URLs like
# ``redis://user@host:6379`` are left unchanged.
if parsed.password is not None:
host_info = parsed.hostname or ""
if parsed.port:
host_info += f":{parsed.port}"
user_part = f"{parsed.username}:" if parsed.username is not None else ":"
netloc = f"{user_part}****@{host_info}"
return parsed._replace(netloc=netloc).geturl()
return self._redis_url
# empty-string passwords). Username-only URLs like
# ``redis://user@host:6379`` are left unchanged.
if parsed.password is None:
return self._redis_url
netloc = parsed.netloc
# If there is no userinfo/host separator, leave unchanged.
if "@" not in netloc:
return self._redis_url
userinfo, host_part = netloc.rsplit("@", 1)
# userinfo may be "user:password", ":password", or just "password".
if ":" in userinfo:
username, _ = userinfo.split(":", 1)
if username:
masked_userinfo = f"{username}:****"
else:
# Password-only form ":password"
masked_userinfo = ":****"
else:
# No ":" present; treat entire userinfo as password.
masked_userinfo = "****"
masked_netloc = f"{masked_userinfo}@{host_part}"
return parsed._replace(netloc=masked_netloc).geturl()

Copilot uses AI. Check for mistakes.
def to_dict(self, include_connection: bool = False) -> Dict[str, Any]:
"""Serialize the index configuration to a dictionary.

Args:
include_connection: Whether to include connection parameters
(Redis URL and connection kwargs). Passwords are masked
(``****``). The serialized URL is for **inspection
only** — ``from_dict`` will not restore sanitized URLs.
Defaults to False.

Returns:
A dictionary representation of the index configuration.
"""
config = self.schema.to_dict()

if include_connection:
if self._redis_url is not None:
config["_redis_url"] = self._sanitize_redis_url()
safe_keys = {"ssl_cert_reqs"}
if self._connection_kwargs:
config["_connection_kwargs"] = {
k: v for k, v in self._connection_kwargs.items()
if k in safe_keys
}

return config

def to_yaml(
self, file_path: str, include_connection: bool = False, overwrite: bool = True
) -> None:
"""Serialize the index configuration to a YAML file.

Args:
file_path: Destination path for the YAML file.
include_connection: Whether to include connection parameters.
Passwords are masked (``****``) and the URL is for
**inspection only**. Defaults to False.
overwrite: Whether to overwrite an existing file. If False,
raises FileExistsError when the file exists. Defaults to True.

Raises:
FileExistsError: If *file_path* already exists and
*overwrite* is False.
"""
fp = Path(file_path).resolve()
if fp.exists() and not overwrite:
raise FileExistsError(f"File {file_path} already exists.")
with open(fp, "w") as f:
yaml.dump(self.to_dict(include_connection=include_connection), f, sort_keys=False)

def disconnect(self):
"""Disconnect from the Redis database."""
raise NotImplementedError("This method should be implemented by subclasses.")
Expand Down
208 changes: 208 additions & 0 deletions tests/unit/test_index_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import os
import tempfile

import pytest
import yaml

from redisvl.index.index import AsyncSearchIndex, SearchIndex
from redisvl.schema.schema import IndexSchema


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

SAMPLE_SCHEMA = {
"index": {
"name": "test-index",
"prefix": "doc",
"storage_type": "hash",
},
"fields": [
{"name": "title", "type": "tag"},
{"name": "body", "type": "text"},
{
"name": "vector",
"type": "vector",
"attrs": {
"dims": 3,
"algorithm": "flat",
"datatype": "float32",
},
},
],
}


def _make_index(cls=SearchIndex, **conn):
schema = IndexSchema.from_dict(SAMPLE_SCHEMA)
return cls(schema=schema, **conn)


# ---------------------------------------------------------------------------
# to_dict tests
# ---------------------------------------------------------------------------


class TestToDict:
def test_schema_only(self):
idx = _make_index()
d = idx.to_dict()
assert d["index"]["name"] == "test-index"
assert d["index"]["prefix"] == "doc"
assert len(d["fields"]) == 3
assert "_redis_url" not in d
assert "_connection_kwargs" not in d

def test_include_connection_with_url(self):
idx = _make_index(redis_url="redis://:secret@localhost:6379")
d = idx.to_dict(include_connection=True)
assert d["_redis_url"] == "redis://:****@localhost:6379"

def test_include_connection_password_only_url(self):
"""Password-only URLs (:pass@host) are sanitized correctly."""
idx = _make_index(redis_url="redis://:mysecret@localhost:6379")
d = idx.to_dict(include_connection=True)
assert d["_redis_url"] == "redis://:****@localhost:6379"

def test_include_connection_user_pass_url(self):
idx = _make_index(redis_url="redis://admin:secret@localhost:6379")
d = idx.to_dict(include_connection=True)
assert d["_redis_url"] == "redis://admin:****@localhost:6379"

def test_include_connection_username_only_url(self):
"""Username-only URLs (no password) are left unchanged."""
idx = _make_index(redis_url="redis://readonly@localhost:6379")
d = idx.to_dict(include_connection=True)
assert d["_redis_url"] == "redis://readonly@localhost:6379"

def test_include_connection_no_url(self):
"""When initialized with a client, _redis_url is None — omit it."""
idx = _make_index()
d = idx.to_dict(include_connection=True)
assert "_redis_url" not in d

def test_include_connection_filters_sensitive_kwargs(self):
idx = _make_index(
redis_url="redis://localhost:6379",
connection_kwargs={"password": "s3cret", "ssl_cert_reqs": "required"},
)
d = idx.to_dict(include_connection=True)
# password should NOT leak
assert "s3cret" not in d["_connection_kwargs"]
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion checks dict key membership, not whether the secret value leaked (i.e., it would still pass if {"password": "s3cret"} were present). Since this test is specifically about preventing credential leaks, consider asserting on the values (or on yaml.dump(d) / str(d)) so the intent is validated directly.

Suggested change
assert "s3cret" not in d["_connection_kwargs"]
assert "s3cret" not in yaml.dump(d["_connection_kwargs"])

Copilot uses AI. Check for mistakes.
assert d["_connection_kwargs"] == {"ssl_cert_reqs": "required"}

def test_async_index_to_dict(self):
idx = _make_index(cls=AsyncSearchIndex, redis_url="redis://localhost:6379")
d = idx.to_dict(include_connection=True)
assert d["_redis_url"] == "redis://localhost:6379"


# ---------------------------------------------------------------------------
# to_yaml tests
# ---------------------------------------------------------------------------


class TestToYaml:
def test_writes_yaml(self):
idx = _make_index()
with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f:
path = f.name
try:
idx.to_yaml(path)
with open(path) as f:
data = yaml.safe_load(f)
assert data["index"]["name"] == "test-index"
finally:
os.unlink(path)

def test_overwrite_false_raises(self):
idx = _make_index()
with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f:
path = f.name
try:
idx.to_yaml(path)
with pytest.raises(FileExistsError):
idx.to_yaml(path, overwrite=False)
finally:
os.unlink(path)

def test_overwrite_true(self):
idx = _make_index()
with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f:
path = f.name
try:
idx.to_yaml(path)
idx.to_yaml(path, overwrite=True) # should not raise
finally:
os.unlink(path)

def test_async_to_yaml(self):
idx = _make_index(cls=AsyncSearchIndex)
with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f:
path = f.name
try:
idx.to_yaml(path)
with open(path) as f:
data = yaml.safe_load(f)
assert data["index"]["name"] == "test-index"
finally:
os.unlink(path)


# ---------------------------------------------------------------------------
# Round-trip tests
# ---------------------------------------------------------------------------


class TestRoundTrip:
def test_roundtrip_dict_schema_only(self):
idx = _make_index()
d = idx.to_dict()
restored = SearchIndex.from_dict(d)
assert restored.schema.index.name == "test-index"

def test_roundtrip_dict_with_connection(self):
idx = _make_index(redis_url="redis://localhost:6379")
d = idx.to_dict(include_connection=True)
restored = SearchIndex.from_dict(d)
assert restored._redis_url == "redis://localhost:6379"
assert restored.schema.index.name == "test-index"

def test_roundtrip_dict_async_with_connection(self):
idx = _make_index(cls=AsyncSearchIndex, redis_url="redis://localhost:6379")
d = idx.to_dict(include_connection=True)
restored = AsyncSearchIndex.from_dict(d)
assert restored._redis_url == "redis://localhost:6379"

def test_roundtrip_yaml(self):
idx = _make_index(redis_url="redis://localhost:6379")
with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f:
path = f.name
try:
idx.to_yaml(path, include_connection=True)
restored = SearchIndex.from_yaml(path)
assert restored.schema.index.name == "test-index"
assert restored._redis_url == "redis://localhost:6379"
finally:
os.unlink(path)

def test_roundtrip_dict_with_connection_kwargs(self):
idx = _make_index(
redis_url="redis://localhost:6379",
connection_kwargs={"ssl_cert_reqs": "optional"},
)
d = idx.to_dict(include_connection=True)
restored = SearchIndex.from_dict(d)
assert restored._redis_url == "redis://localhost:6379"
assert restored._connection_kwargs == {"ssl_cert_reqs": "optional"}

def test_roundtrip_dict_sanitized_url_skipped(self):
"""Sanitized URLs (containing ****) should not be restored — they'd
cause auth failures."""
idx = _make_index(redis_url="redis://:secret@localhost:6379")
d = idx.to_dict(include_connection=True)
assert d["_redis_url"] == "redis://:****@localhost:6379"
restored = SearchIndex.from_dict(d)
# The sanitized URL should be silently skipped, not restored
assert restored._redis_url is None
Loading