Skip to content

Commit e891786

Browse files
feat: Redis key_prefix field (#437)
* Redis uploader key_prefix field * Changelog and version bump: Redis uploader key_prefix field * Make dev version * Revert "Make dev version" This reverts commit 2b67e23.
1 parent 898d642 commit e891786

File tree

4 files changed

+50
-19
lines changed

4 files changed

+50
-19
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 0.5.19
2+
3+
### Features
4+
5+
* **Add `key_prefix` field to Redis Uploader** - Allow users to input custom prefix for keys saved inside Redis connector
6+
17
## 0.5.18
28

39
### Fixes

test/integration/connectors/test_redis.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,22 @@
2323
)
2424

2525

26-
async def delete_record(client: Redis, element_id: str) -> None:
27-
await client.delete(element_id)
26+
async def delete_record(client: Redis, element_id: str, key_prefix: str) -> None:
27+
key_with_prefix = f"{key_prefix}{element_id}"
28+
await client.delete(key_with_prefix)
2829

2930

30-
async def validate_upload(client: Redis, first_element: dict):
31+
async def validate_upload(client: Redis, first_element: dict, key_prefix: str) -> None:
3132
element_id = first_element["element_id"]
33+
key_with_prefix = f"{key_prefix}{element_id}"
3234
expected_text = first_element["text"]
3335
expected_embeddings = first_element["embeddings"]
3436
async with client.pipeline(transaction=True) as pipe:
3537
try:
36-
response = await pipe.json().get(element_id, "$").execute()
38+
response = await pipe.json().get(key_with_prefix, "$").execute()
3739
response = response[0][0]
3840
except redis_exceptions.ResponseError:
39-
response = await pipe.get(element_id).execute()
41+
response = await pipe.get(key_with_prefix).execute()
4042
response = json.loads(response[0])
4143

4244
embedding_similarity = np.linalg.norm(
@@ -53,15 +55,17 @@ async def redis_destination_test(
5355
upload_file: Path,
5456
tmp_path: Path,
5557
connection_kwargs: dict,
58+
uploader_config: dict,
5659
uri: Optional[str] = None,
5760
password: Optional[str] = None,
5861
):
5962
uploader = RedisUploader(
6063
connection_config=RedisConnectionConfig(
6164
**connection_kwargs, access_config=RedisAccessConfig(uri=uri, password=password)
6265
),
63-
upload_config=RedisUploaderConfig(batch_size=10),
66+
upload_config=RedisUploaderConfig(batch_size=10, **uploader_config),
6467
)
68+
key_prefix = uploader.upload_config.key_prefix
6569

6670
file_data = FileData(
6771
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
@@ -78,20 +82,32 @@ async def redis_destination_test(
7882

7983
if uri:
8084
async with from_url(uri) as client:
81-
await validate_upload(client=client, first_element=first_element)
85+
await validate_upload(
86+
client=client,
87+
first_element=first_element,
88+
key_prefix=key_prefix,
89+
)
8290
else:
8391
async with Redis(**connection_kwargs, password=password) as client:
84-
await validate_upload(client=client, first_element=first_element)
92+
await validate_upload(
93+
client=client,
94+
first_element=first_element,
95+
key_prefix=key_prefix,
96+
)
8597
except Exception as e:
8698
raise e
8799
finally:
88100
if uri:
89101
async with from_url(uri) as client:
90-
tasks = [delete_record(client, element["element_id"]) for element in elements]
102+
tasks = [
103+
delete_record(client, element["element_id"], key_prefix) for element in elements
104+
]
91105
await asyncio.gather(*tasks)
92106
else:
93107
async with Redis(**connection_kwargs, password=password) as client:
94-
tasks = [delete_record(client, element["element_id"]) for element in elements]
108+
tasks = [
109+
delete_record(client, element["element_id"], key_prefix) for element in elements
110+
]
95111
await asyncio.gather(*tasks)
96112

97113

@@ -105,15 +121,23 @@ async def test_redis_destination_azure_with_password(upload_file: Path, tmp_path
105121
"db": 0,
106122
"ssl": True,
107123
}
124+
uploader_config = {
125+
"key_prefix": "test_ingest:",
126+
}
108127
redis_pw = os.environ["AZURE_REDIS_INGEST_TEST_PASSWORD"]
109-
await redis_destination_test(upload_file, tmp_path, connection_kwargs, password=redis_pw)
128+
await redis_destination_test(
129+
upload_file, tmp_path, connection_kwargs, uploader_config, password=redis_pw
130+
)
110131

111132

112133
@pytest.mark.asyncio
113134
@pytest.mark.tags(REDIS_CONNECTOR_TYPE, DESTINATION_TAG, "redis", NOSQL_TAG)
114135
@requires_env("AZURE_REDIS_INGEST_TEST_PASSWORD")
115136
async def test_redis_destination_azure_with_uri(upload_file: Path, tmp_path: Path):
116137
connection_kwargs = {}
138+
uploader_config = {
139+
"key_prefix": "test_ingest:",
140+
}
117141
redis_pw = os.environ["AZURE_REDIS_INGEST_TEST_PASSWORD"]
118142
uri = f"rediss://:{redis_pw}@utic-dashboard-dev.redis.cache.windows.net:6380/0"
119-
await redis_destination_test(upload_file, tmp_path, connection_kwargs, uri=uri)
143+
await redis_destination_test(upload_file, tmp_path, connection_kwargs, uploader_config, uri=uri)

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.5.18" # pragma: no cover
1+
__version__ = "0.5.19" # pragma: no cover

unstructured_ingest/v2/processes/connectors/redisdb.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def create_client(self) -> Generator["Redis", None, None]:
110110

111111
class RedisUploaderConfig(UploaderConfig):
112112
batch_size: int = Field(default=100, description="Number of records per batch")
113+
key_prefix: str = Field(default="", description="Prefix for Redis keys")
113114

114115

115116
@dataclass
@@ -145,11 +146,11 @@ async def _write_batch(self, batch: list[dict], redis_stack: bool) -> None:
145146
async with self.connection_config.create_async_client() as async_client:
146147
async with async_client.pipeline(transaction=True) as pipe:
147148
for element in batch:
148-
element_id = element["element_id"]
149+
key_with_prefix = f"{self.upload_config.key_prefix}{element['element_id']}"
149150
if redis_stack:
150-
pipe.json().set(element_id, "$", element)
151+
pipe.json().set(key_with_prefix, "$", element)
151152
else:
152-
pipe.set(element_id, json.dumps(element))
153+
pipe.set(key_with_prefix, json.dumps(element))
153154
await pipe.execute()
154155

155156
@requires_dependencies(["redis"], extras="redis")
@@ -159,16 +160,16 @@ async def _check_redis_stack(self, element: dict) -> bool:
159160
redis_stack = True
160161
async with self.connection_config.create_async_client() as async_client:
161162
async with async_client.pipeline(transaction=True) as pipe:
162-
element_id = element["element_id"]
163+
key_with_prefix = f"{self.upload_config.key_prefix}{element['element_id']}"
163164
try:
164165
# Redis with stack extension supports JSON type
165-
await pipe.json().set(element_id, "$", element).execute()
166+
await pipe.json().set(key_with_prefix, "$", element).execute()
166167
except redis_exceptions.ResponseError as e:
167168
message = str(e)
168169
if "unknown command `JSON.SET`" in message:
169170
# if this error occurs, Redis server doesn't support JSON type,
170171
# so save as string type instead
171-
await pipe.set(element_id, json.dumps(element)).execute()
172+
await pipe.set(key_with_prefix, json.dumps(element)).execute()
172173
redis_stack = False
173174
else:
174175
raise e

0 commit comments

Comments
 (0)