Skip to content

Commit 052f333

Browse files
committed
Ingestion with a imbalanced producer / consumer.
1 parent 4867fc1 commit 052f333

File tree

5 files changed

+255
-8
lines changed

5 files changed

+255
-8
lines changed

apps/dataset-ingestion/app/app.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ build_coco() {
4141
date
4242
adb utils log --level INFO "${APP}: Loading begins"
4343
echo "loading data..."
44+
python3 ingest_streaming.py /app/input/val/val_images.adb.csv $BATCH_SIZE $NUM_WORKERS
45+
python3 ingest_streaming.py /app/input/val/val_pixelmaps.adb.csv $BATCH_SIZE $NUM_WORKERS
4446
python3 ingestion_demo_trial.py -R /app/input -C $CLEAN -B $BATCH_SIZE -W $NUM_WORKERS -S $SAMPLE_COUNT -T $INCLUDE_TRAIN
4547

4648
# Validation
@@ -64,6 +66,9 @@ build_faces() {
6466

6567
# Ingest the CSV files
6668
adb utils log --level INFO "${APP}: Loading faces dataset"
69+
python3 /app/build_faces/create_indexes.py
70+
python3 /app/build_faces/ingest_streaming.py /app/input/faces/pruned_celebA.csv $BATCH_SIZE $NUM_WORKERS
71+
python3 /app/build_faces/ingest_streaming.py /app/input/faces/hqimages.adb.csv $BATCH_SIZE $NUM_WORKERS
6772
bash /app/build_faces/load.sh
6873
adb utils log --level INFO "${APP}: Successful completion"
6974
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import time
2+
from concurrent.futures import ThreadPoolExecutor
3+
from enum import Enum
4+
from queue import Full, Queue
5+
6+
import pandas as pd
7+
import requests
8+
from aperturedb.CommonLibrary import create_connector
9+
from aperturedb.QueryGenerator import QueryGenerator
10+
from typer import Typer
11+
12+
13+
class HTTPStorageURLS():
14+
def __init__(self, q: Queue, df: pd.DataFrame, executor: ThreadPoolExecutor):
15+
self.executor = executor
16+
self.q = q
17+
self.df = df
18+
self.row = self.df.iloc[0]
19+
self.session = requests.Session()
20+
self.sync()
21+
22+
def sync(self):
23+
def download_blob(i, row):
24+
url = row["url"]
25+
r = self.session.get(url)
26+
try:
27+
self.q.put((row, r.content))
28+
except Full:
29+
print("Queue is full")
30+
time.sleep(1)
31+
32+
for i, row in enumerate(self.df.to_dict("records")):
33+
self.executor.submit(download_blob, i, row)
34+
print(f"Synced to {self.q}")
35+
36+
class GoogleCloudStorage():
37+
def __init__(self, q: Queue, df: pd.DataFrame, executor: ThreadPoolExecutor):
38+
self.executor = executor
39+
self.q = q
40+
self.df = df
41+
self.row = self.df.iloc[0]
42+
gs_url = self.row["gs_url"]
43+
from google.cloud import storage
44+
self.client = storage.Client.create_anonymous_client()
45+
self.source_bucket_name = gs_url.split("/")[2]
46+
self.source_bucket = self.client.bucket(self.source_bucket_name)
47+
self.sync()
48+
49+
def sync(self):
50+
def download_blob(i, row):
51+
object_name = row["gs_url"].split("gs://" + self.source_bucket_name + "/")[-1]
52+
blob = self.source_bucket.blob(object_name).download_as_bytes()
53+
try:
54+
self.q.put((row, blob))
55+
except Full:
56+
print("Queue is full")
57+
time.sleep(1)
58+
59+
for i, row in enumerate(self.df.to_dict("records")):
60+
self.executor.submit(download_blob, i, row)
61+
print(f"Synced to {self.q}")
62+
63+
class ObjectStorage(Enum):
64+
GCS = 1
65+
HTTP = 2
66+
67+
class Sequence(QueryGenerator):
68+
def __init__(self, input_csv: str):
69+
super().__init__()
70+
self.q = Queue(maxsize=1000)
71+
72+
73+
self.df = pd.read_csv(input_csv)
74+
url_type = self.df.columns[0]
75+
if url_type == "gs_url":
76+
self.storage = ObjectStorage.GCS
77+
elif url_type == "url":
78+
self.storage = ObjectStorage.HTTP
79+
else:
80+
raise ValueError("Invalid URL type")
81+
self.executor = ThreadPoolExecutor(max_workers=64)
82+
if self.storage == ObjectStorage.GCS:
83+
self.gcs = GoogleCloudStorage(self.q, self.df, self.executor)
84+
elif self.storage == ObjectStorage.HTTP:
85+
self.gcs = HTTPStorageURLS(self.q, self.df, self.executor)
86+
# Hack to reuse extra 5 items on top of the queue
87+
# which are used to check if generator has implemented getitem
88+
# And what is commands per query, and blobs per query.
89+
self.inspect = 0
90+
91+
def __del__(self):
92+
self.executor.shutdown()
93+
94+
def getitem(self, subscript):
95+
data = self.q.get()
96+
if self.inspect < 5:
97+
self.q.put(data)
98+
self.inspect += 1
99+
q = [
100+
{
101+
"AddImage": {
102+
"properties": data[0]
103+
}
104+
}
105+
]
106+
return q, [data[1]]
107+
108+
def __len__(self):
109+
return len(self.df)
110+
111+
112+
app = Typer()
113+
@app.command()
114+
def ingest(input_csv: str, batch_size: int, num_workers: int):
115+
s = Sequence(input_csv)
116+
client = create_connector()
117+
from aperturedb.ParallelLoader import ParallelLoader
118+
loader = ParallelLoader(client=client)
119+
loader.ingest(s, batch_size, num_workers, True)
120+
print("Done")
121+
122+
123+
if __name__ == "__main__":
124+
app()

apps/dataset-ingestion/app/build_coco/ingestion_demo_trial.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
import sys
32
from aperturedb.Utils import Utils
43
from aperturedb.CommonLibrary import create_connector
54

@@ -17,9 +16,7 @@ def ingest_coco(cli_args):
1716
dbutils.create_entity_index("_Descriptor", "yfcc_id")
1817

1918
args = {
20-
"images": "IMAGE",
2119
"bboxes": "BOUNDING_BOX",
22-
"pixelmaps": "IMAGE",
2320
"img_pixelmap_connections": "CONNECTION",
2421
"polygons": "POLYGON",
2522
"images.adb.csv_clip_pytorch_embeddings_metadata": "DESCRIPTOR",
@@ -29,10 +26,9 @@ def ingest_coco(cli_args):
2926
if cli_args.train == "true":
3027
stages.append("train")
3128

32-
objs = ["images",
29+
objs = [
3330
"bboxes",
3431
"polygons",
35-
"pixelmaps",
3632
"img_pixelmap_connections",
3733
"images.adb.csv_clip_pytorch_embeddings_metadata",
3834
"images.adb.csv_clip_pytorch_embeddings_connection"]
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import time
2+
from concurrent.futures import ThreadPoolExecutor
3+
from enum import Enum
4+
from queue import Full, Queue
5+
6+
import pandas as pd
7+
import requests
8+
from aperturedb.CommonLibrary import create_connector
9+
from aperturedb.QueryGenerator import QueryGenerator
10+
from typer import Typer
11+
12+
13+
class HTTPStorageURLS():
14+
def __init__(self, q: Queue, df: pd.DataFrame, executor: ThreadPoolExecutor):
15+
self.executor = executor
16+
self.q = q
17+
self.df = df
18+
self.row = self.df.iloc[0]
19+
self.session = requests.Session()
20+
self.sync()
21+
22+
def sync(self):
23+
def download_blob(i, row):
24+
url = row["url"]
25+
r = self.session.get(url)
26+
try:
27+
self.q.put((row, r.content))
28+
except Full:
29+
print("Queue is full")
30+
time.sleep(1)
31+
32+
for i, row in enumerate(self.df.to_dict("records")):
33+
self.executor.submit(download_blob, i, row)
34+
print(f"Synced to {self.q}")
35+
36+
class GoogleCloudStorage():
37+
def __init__(self, q: Queue, df: pd.DataFrame, executor: ThreadPoolExecutor):
38+
self.executor = executor
39+
self.q = q
40+
self.df = df
41+
self.row = self.df.iloc[0]
42+
gs_url = self.row["gs_url"]
43+
from google.cloud import storage
44+
self.client = storage.Client.create_anonymous_client()
45+
self.source_bucket_name = gs_url.split("/")[2]
46+
self.source_bucket = self.client.bucket(self.source_bucket_name)
47+
self.sync()
48+
49+
def sync(self):
50+
def download_blob(i, row):
51+
object_name = row["gs_url"].split("gs://" + self.source_bucket_name + "/")[-1]
52+
blob = self.source_bucket.blob(object_name).download_as_bytes()
53+
try:
54+
self.q.put((row, blob))
55+
except Full:
56+
print("Queue is full")
57+
time.sleep(1)
58+
59+
for i, row in enumerate(self.df.to_dict("records")):
60+
self.executor.submit(download_blob, i, row)
61+
print(f"Synced to {self.q}")
62+
63+
class ObjectStorage(Enum):
64+
GCS = 1
65+
HTTP = 2
66+
67+
class Sequence(QueryGenerator):
68+
def __init__(self, input_csv: str):
69+
super().__init__()
70+
self.q = Queue(maxsize=1000)
71+
72+
73+
self.df = pd.read_csv(input_csv)
74+
url_type = self.df.columns[0]
75+
if url_type == "gs_url":
76+
self.storage = ObjectStorage.GCS
77+
elif url_type == "url":
78+
self.storage = ObjectStorage.HTTP
79+
else:
80+
raise ValueError("Invalid URL type")
81+
self.executor = ThreadPoolExecutor(max_workers=64)
82+
if self.storage == ObjectStorage.GCS:
83+
self.gcs = GoogleCloudStorage(self.q, self.df, self.executor)
84+
elif self.storage == ObjectStorage.HTTP:
85+
self.gcs = HTTPStorageURLS(self.q, self.df, self.executor)
86+
# Hack to reuse extra 5 items on top of the queue
87+
# which are used to check if generator has implemented getitem
88+
# And what is commands per query, and blobs per query.
89+
self.inspect = 0
90+
91+
def __del__(self):
92+
self.executor.shutdown()
93+
94+
def getitem(self, subscript):
95+
data = self.q.get()
96+
if self.inspect < 5:
97+
self.q.put(data)
98+
self.inspect += 1
99+
q = [
100+
{
101+
"AddImage": {
102+
"properties": data[0]
103+
}
104+
}
105+
]
106+
return q, [data[1]]
107+
108+
def __len__(self):
109+
return len(self.df)
110+
111+
112+
app = Typer()
113+
@app.command()
114+
def ingest(input_csv: str, batch_size: int, num_workers: int):
115+
s = Sequence(input_csv)
116+
client = create_connector()
117+
from aperturedb.ParallelLoader import ParallelLoader
118+
loader = ParallelLoader(client=client)
119+
loader.ingest(s, batch_size, num_workers, True)
120+
print("Done")
121+
122+
123+
if __name__ == "__main__":
124+
app()

apps/dataset-ingestion/app/build_faces/load.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,17 @@ if [[ ${CLEAN} == "true" ]]; then
1111
adb utils execute remove_all --force
1212
fi
1313
cd /app/build_faces
14-
python3 create_indexes.py
14+
1515
python3 create_descriptorsets.py
1616

1717
echo "Ingesting"
1818
cd /app/input/faces
19-
adb ingest from-csv pruned_celebA.csv --transformer image_properties --transformer common_properties --ingest-type IMAGE --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2019
adb ingest from-csv celebA.csv_clip_pytorch_embeddings_metadata.adb.csv --ingest-type DESCRIPTOR --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2120
adb ingest from-csv celebA.csv_clip_pytorch_embeddings_connection.adb.csv --ingest-type CONNECTION --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2221

2322
adb ingest from-csv celebA.csv_facenet_pytorch_embeddings_metadata.adb.csv --ingest-type DESCRIPTOR --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2423
adb ingest from-csv celebA.csv_facenet_pytorch_embeddings_connection.adb.csv --ingest-type CONNECTION --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2524

26-
adb ingest from-csv hqimages.adb.csv --ingest-type IMAGE --transformer common_properties --transformer image_properties --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2725
adb ingest from-csv hqpolygons.adb.csv --ingest-type POLYGON --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2826
adb ingest from-csv hqbboxes.adb.csv --ingest-type BOUNDING_BOX --batchsize ${BATCH_SIZE} --num-workers ${NUM_WORKERS} --sample-count ${SAMPLE_COUNT}
2927

0 commit comments

Comments
 (0)