Skip to content

Commit 9bc72c7

Browse files
committed
wip scheduler
1 parent 7e70bdb commit 9bc72c7

File tree

2 files changed

+53
-11
lines changed

2 files changed

+53
-11
lines changed

cloudquery/sdk/scheduler/scheduler.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
from typing import List, Generator, Any
33
import queue
44
import time
5-
from cloudquery.sdk.message import SyncMessage, SyncInsertMessage
5+
from cloudquery.sdk.schema import Table
6+
from cloudquery.sdk.message import SyncMessage, SyncInsertMessage, SyncMigrateMessage
67
import concurrent.futures
78
from typing import Generator
89

10+
# This is all WIP
911
class Task:
1012
def __init__(self, fetcher, parent_item):
1113
self._fetcher = fetcher
@@ -66,15 +68,47 @@ def run_task(task):
6668

6769
print(f"Running task {task}")
6870

71+
72+
QUEUE_PER_WORKER = 100
73+
6974
class Scheduler:
70-
def __init__(self):
75+
def __init__(self, concurrency: int, queue_size: int = 0, max_depth : int = 3):
7176
self._queue = queue.Queue()
72-
# self._pool = ThreadPool(processes=10)
77+
self._max_depth = max_depth
78+
if concurrency <= 0:
79+
raise ValueError("concurrency must be greater than 0")
80+
if queue_size <= 0:
81+
raise ValueError("queue_size must be greater than 0")
82+
if max_depth <= 0:
83+
raise ValueError("max_depth must be greater than 0")
84+
self._queue_size = queue_size if queue_size > 0 else concurrency * QUEUE_PER_WORKER
85+
self._pools = []
86+
self._queues = []
87+
current_depth_concurrency = concurrency
88+
current_depth_queue_size = queue_size
89+
for i in range(max_depth + 1):
90+
self._queues.append(queue.Queue(maxsize=current_depth_queue_size))
91+
self._pools.append(concurrent.futures.ThreadPoolExecutor(max_workers=current_depth_concurrency))
92+
current_depth_concurrency = current_depth_concurrency // 2 if current_depth_concurrency > 1 else 1
93+
current_depth_queue_size = current_depth_queue_size // 2 if current_depth_queue_size > 1 else 1
94+
95+
def worker(self, max_depth: int):
96+
while True:
97+
task = self._queues[max_depth].get()
98+
if task is None:
99+
break
100+
self._pools[max_depth].submit(*task)
101+
102+
def table_resolver(self, table: Table, client, res: queue.Queue):
103+
for resource in table.resolve(client):
104+
pass
105+
# task.resolve
73106

74-
def sync(self, fetchers: List[Fetcher]) -> Generator[SyncMessage]:
75-
task_queue = queue.Queue()
76-
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
77-
for worker_id in range(3):
78-
executor.submit(worker_task, task_queue, worker_id)
79-
main_task(task_queue)
80-
107+
def sync(self, client, tables: List[Table], res: queue.Queue, deterministic_cq_id=False):
108+
for table in tables:
109+
res.put(SyncMigrateMessage(record=table.to_arrow_schemas()))
110+
for table in tables:
111+
clients = table.multiplex(client)
112+
for client in clients:
113+
self._queues[0].put((table.resolver, client, res))
114+
self._queues[0].put(None)

cloudquery/sdk/schema/table.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from __future__ import annotations
22

3-
from typing import List
3+
from typing import List, Generator, Any
44

55
import pyarrow as pa
66

77
from cloudquery.sdk.schema import arrow
88
from .column import Column
99

10+
class Client:
11+
pass
1012

1113
class Table:
1214
def __init__(self, name: str, columns: List[Column], title: str = "", description: str = "",
@@ -21,6 +23,12 @@ def __init__(self, name: str, columns: List[Column], title: str = "", descriptio
2123
self.relations = relations
2224
self.is_incremental = is_incremental
2325

26+
def multiplex(self, client) -> List[Table]:
27+
raise [client]
28+
29+
def resolver(self, client: Client, parent=None) -> Generator[Any]:
30+
raise NotImplementedError()
31+
2432
@property
2533
def primary_keys(self):
2634
return [column.name for column in self.columns if column.primary_key]

0 commit comments

Comments
 (0)