Skip to content

Commit 7382250

Browse files
authored
fix: pinecone index creation (#395)
* fix pinecone opinionated writes, fix potential weaviate name formatting bug, add identity to default index name for opinionated writes, add reference test script for validation, change step name due to potentially confusing log, change method name due to potentially confusing naming * tidy, cv * remove kwargs pass
1 parent e816a9e commit 7382250

File tree

9 files changed

+164
-17
lines changed

9 files changed

+164
-17
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 0.5.8
2+
3+
### Fixes
4+
5+
* **Fix on pinecone index creation functionality**
6+
17
## 0.5.7
28

39
### Fixes
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
You can run these to validate your development effort end-to-end. They are not intended to run in CI.
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
DEST_PATH=$(dirname "$(realpath "$0")")
6+
SCRIPT_DIR=$(dirname "$DEST_PATH")
7+
cd "$SCRIPT_DIR"/.. || exit 1
8+
OUTPUT_FOLDER_NAME=s3-pinecone-dest
9+
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
10+
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
11+
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
12+
13+
if [ -z "$PINECONE_API_KEY" ]; then
14+
echo "Skipping Pinecone ingest test because PINECONE_API_KEY env var is not set."
15+
exit 0
16+
fi
17+
18+
RANDOM_SUFFIX=$((RANDOM % 100000 + 1))
19+
20+
# Set the variables with default values if they're not set in the environment
21+
PINECONE_INDEX=${PINECONE_INDEX:-"ingest-test-$RANDOM_SUFFIX"}
22+
PINECONE_HOST_POSTFIX=${PINECONE_HOST_POSTFIX:-"4627-b74a"}
23+
PINECONE_PROJECT_ID=${PINECONE_PROJECT_ID:-"art8iaj"}
24+
25+
# shellcheck disable=SC1091
26+
source "$SCRIPT_DIR"/cleanup.sh
27+
function cleanup {
28+
29+
# Get response code to check if index exists
30+
response_code=$(curl \
31+
-s -o /dev/null \
32+
-w "%{http_code}" \
33+
--request GET \
34+
--url "https://api.pinecone.io/indexes/$PINECONE_INDEX" \
35+
--header 'accept: application/json' \
36+
--header "Api-Key: $PINECONE_API_KEY")
37+
38+
# Cleanup (delete) index if it exists
39+
if [ "$response_code" == "200" ]; then
40+
echo ""
41+
echo "deleting index $PINECONE_INDEX"
42+
curl --request DELETE \
43+
"https://api.pinecone.io/indexes/$PINECONE_INDEX" \
44+
--header "Api-Key: $PINECONE_API_KEY" \
45+
--header 'content-type: application/json'
46+
47+
else
48+
echo "There was an error during index deletion for index $PINECONE_INDEX, with response code: $response_code. It might be that index $PINECONE_INDEX does not exist, so there is nothing to delete."
49+
fi
50+
51+
# Local file cleanup
52+
cleanup_dir "$WORK_DIR"
53+
cleanup_dir "$OUTPUT_DIR"
54+
}
55+
56+
trap cleanup EXIT
57+
58+
echo "Creating index $PINECONE_INDEX"
59+
response_code=$(curl \
60+
-s -o /dev/null \
61+
-w "%{http_code}" \
62+
--request POST \
63+
--url "https://api.pinecone.io/indexes" \
64+
--header "accept: application/json" \
65+
--header "content-type: application/json" \
66+
--header "Api-Key: $PINECONE_API_KEY" \
67+
--data '
68+
{
69+
"name": "'"$PINECONE_INDEX"'",
70+
"dimension": 384,
71+
"metric": "cosine",
72+
"spec": {
73+
"serverless": {
74+
"cloud": "aws",
75+
"region": "us-east-1"
76+
}
77+
}
78+
}
79+
')
80+
81+
if [ "$response_code" -lt 400 ]; then
82+
echo "Index creation success: $response_code"
83+
else
84+
echo "Index creation failure: $response_code"
85+
exit 1
86+
fi
87+
88+
PYTHONPATH=. ./unstructured_ingest/main.py \
89+
local \
90+
--num-processes "$max_processes" \
91+
--output-dir "$OUTPUT_DIR" \
92+
--strategy fast \
93+
--verbose \
94+
--reprocess \
95+
--input-path example-docs/book-war-and-peace-1225p.txt \
96+
--work-dir "$WORK_DIR" \
97+
--chunking-strategy by_title \
98+
--chunk-combine-text-under-n-chars 150 --chunk-new-after-n-chars 1500 --chunk-max-characters 2500 --chunk-multipage-sections \
99+
--embedding-provider "huggingface" \
100+
pinecone \
101+
--api-key "$PINECONE_API_KEY" \
102+
--index-name "$PINECONE_INDEX" \
103+
--batch-size 80
104+
105+
# It can take some time for the index to catch up with the content that was written, this check between 10s sleeps
106+
# to give it that time process the writes. Will timeout after checking for a minute.
107+
num_of_vectors_remote=0
108+
attempt=1
109+
sleep_amount=8
110+
while [ "$num_of_vectors_remote" -eq 0 ] && [ "$attempt" -lt 4 ]; do
111+
echo "attempt $attempt: sleeping $sleep_amount seconds to let index finish catching up after writes"
112+
sleep $sleep_amount
113+
114+
num_of_vectors_remote=$(curl --request POST \
115+
-s \
116+
--url "https://$PINECONE_INDEX-$PINECONE_PROJECT_ID.svc.aped-$PINECONE_HOST_POSTFIX.pinecone.io/describe_index_stats" \
117+
--header "accept: application/json" \
118+
--header "content-type: application/json" \
119+
--header "Api-Key: $PINECONE_API_KEY" | jq -r '.totalVectorCount')
120+
121+
echo "vector count in Pinecone: $num_of_vectors_remote"
122+
attempt=$((attempt + 1))
123+
done
124+
125+
EXPECTED=1825
126+
127+
if [ "$num_of_vectors_remote" -ne $EXPECTED ]; then
128+
echo "Number of vectors in Pinecone are $num_of_vectors_remote when the expected number is $EXPECTED. Test failed."
129+
exit 1
130+
fi

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.5.7" # pragma: no cover
1+
__version__ = "0.5.8" # pragma: no cover

unstructured_ingest/connector/pinecone.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ def to_dict(self, **kwargs):
6363
@property
6464
def pinecone_index(self):
6565
if self._index is None:
66-
self._index = self.create_index()
66+
self._index = self.get_index()
6767
return self._index
6868

6969
def initialize(self):
7070
pass
7171

7272
@requires_dependencies(["pinecone"], extras="pinecone")
73-
def create_index(self) -> "PineconeIndex":
73+
def get_index(self) -> "PineconeIndex":
7474
from pinecone import Pinecone
7575
from unstructured import __version__ as unstructured_version
7676

@@ -83,6 +83,16 @@ def create_index(self) -> "PineconeIndex":
8383
logger.debug(f"connected to index: {pc.describe_index(self.connector_config.index_name)}")
8484
return index
8585

86+
@requires_dependencies(["pinecone"], extras="pinecone")
87+
def create_index(self) -> "PineconeIndex":
88+
logger.warning(
89+
"create_index (a misleading name as of now) will be deprecated soon. "
90+
+ "Use get_index instead. This is due to unstructured supporting actual "
91+
+ "index creation/provisioning now. "
92+
+ "(Support for v2 connectors only. you are currently using a v1 connector.)"
93+
)
94+
return self.get_index()
95+
8696
@DestinationConnectionError.wrap
8797
def check_connection(self):
8898
_ = self.pinecone_index

unstructured_ingest/v2/interfaces/uploader.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ def is_batch(self) -> bool:
3838
def run_batch(self, contents: list[UploadContent], **kwargs: Any) -> None:
3939
raise NotImplementedError()
4040

41-
def create_destination(self, destination_name: str = "elements", **kwargs: Any) -> bool:
41+
def create_destination(
42+
self, destination_name: str = "unstructuredautocreated", **kwargs: Any
43+
) -> bool:
4244
# Update the uploader config if needed with a new destination that gets created.
4345
# Return a flag on if anything was created or not.
4446
return False
@@ -61,6 +63,6 @@ async def run_data_async(self, data: list[dict], file_data: FileData, **kwargs:
6163
@dataclass
6264
class VectorDBUploader(Uploader, ABC):
6365
def create_destination(
64-
self, vector_length: int, destination_name: str = "elements", **kwargs: Any
66+
self, vector_length: int, destination_name: str = "unstructuredautocreated", **kwargs: Any
6567
) -> bool:
6668
return False

unstructured_ingest/v2/pipeline/steps/index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
IndexerT = TypeVar("IndexerT", bound=Indexer)
1313

14-
STEP_ID = "index"
14+
STEP_ID = "indexer"
1515

1616

1717
@dataclass

unstructured_ingest/v2/processes/connectors/pinecone.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def format_destination_name(self, destination_name: str) -> str:
208208
def create_destination(
209209
self,
210210
vector_length: int,
211-
destination_name: str = "elements",
211+
destination_name: str = "unstructuredautocreated",
212212
destination_type: Literal["pod", "serverless"] = "serverless",
213213
serverless_cloud: str = "aws",
214214
serverless_region: str = "us-west-2",
@@ -219,7 +219,7 @@ def create_destination(
219219
) -> bool:
220220
from pinecone import PodSpec, ServerlessSpec
221221

222-
index_name = destination_name or self.connection_config.index_name
222+
index_name = self.connection_config.index_name or destination_name
223223
index_name = self.format_destination_name(index_name)
224224
self.connection_config.index_name = index_name
225225

@@ -228,13 +228,11 @@ def create_destination(
228228
logger.info(f"creating pinecone index {index_name}")
229229

230230
pc = self.connection_config.get_client()
231-
232231
if destination_type == "serverless":
233232
pc.create_index(
234-
name=destination_name,
233+
name=index_name,
235234
dimension=vector_length,
236235
spec=ServerlessSpec(cloud=serverless_cloud, region=serverless_region),
237-
**kwargs,
238236
)
239237

240238
return True
@@ -244,7 +242,6 @@ def create_destination(
244242
name=destination_name,
245243
dimension=vector_length,
246244
spec=PodSpec(environment=pod_environment, pod_type=pod_type, pods=pod_count),
247-
**kwargs,
248245
)
249246

250247
return True

unstructured_ingest/v2/processes/connectors/weaviate/weaviate.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,13 @@ def format_destination_name(self, destination_name: str) -> str:
241241
return formatted.capitalize()
242242

243243
def create_destination(
244-
self, destination_name: str = "elements", vector_length: Optional[int] = None, **kwargs: Any
244+
self,
245+
destination_name: str = "unstructuredautocreated",
246+
vector_length: Optional[int] = None,
247+
**kwargs: Any,
245248
) -> bool:
246-
destination_name = self.format_destination_name(destination_name)
247249
collection_name = self.upload_config.collection or destination_name
250+
collection_name = self.format_destination_name(collection_name)
248251
self.upload_config.collection = collection_name
249252

250253
connectors_dir = Path(__file__).parents[1]
@@ -254,9 +257,7 @@ def create_destination(
254257
collection_config["class"] = collection_name
255258

256259
if not self._collection_exists():
257-
logger.info(
258-
f"creating default weaviate collection '{collection_name}' with default configs"
259-
)
260+
logger.info(f"creating weaviate collection '{collection_name}' with default configs")
260261
with self.connection_config.get_client() as weaviate_client:
261262
weaviate_client.collections.create_from_dict(config=collection_config)
262263
return True

0 commit comments

Comments
 (0)