Skip to content

Commit 4930b5f

Browse files
author
Hugo Osvaldo Barrera
committed
Drop multithreading support
This is mainly in preparation to moving to an async architecture.
1 parent 25435ce commit 4930b5f

File tree

6 files changed

+35
-231
lines changed

6 files changed

+35
-231
lines changed

tests/system/cli/test_sync.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -50,41 +50,6 @@ def test_sync_inexistant_pair(tmpdir, runner):
5050
assert "pair foo does not exist." in result.output.lower()
5151

5252

53-
def test_debug_connections(tmpdir, runner):
54-
runner.write_with_general(
55-
dedent(
56-
"""
57-
[pair my_pair]
58-
a = "my_a"
59-
b = "my_b"
60-
collections = null
61-
62-
[storage my_a]
63-
type = "filesystem"
64-
path = "{0}/path_a/"
65-
fileext = ".txt"
66-
67-
[storage my_b]
68-
type = "filesystem"
69-
path = "{0}/path_b/"
70-
fileext = ".txt"
71-
"""
72-
).format(str(tmpdir))
73-
)
74-
75-
tmpdir.mkdir("path_a")
76-
tmpdir.mkdir("path_b")
77-
78-
result = runner.invoke(["discover"])
79-
assert not result.exception
80-
81-
result = runner.invoke(["-vdebug", "sync", "--max-workers=3"])
82-
assert "using 3 maximal workers" in result.output.lower()
83-
84-
result = runner.invoke(["-vdebug", "sync"])
85-
assert "using 1 maximal workers" in result.output.lower()
86-
87-
8853
def test_empty_storage(tmpdir, runner):
8954
runner.write_with_general(
9055
dedent(

vdirsyncer/cli/__init__.py

Lines changed: 28 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -65,33 +65,6 @@ def app(ctx, config):
6565
main = app
6666

6767

68-
def max_workers_callback(ctx, param, value):
69-
if value == 0 and logging.getLogger("vdirsyncer").level == logging.DEBUG:
70-
value = 1
71-
72-
cli_logger.debug(f"Using {value} maximal workers.")
73-
return value
74-
75-
76-
def max_workers_option(default=0):
77-
help = "Use at most this many connections. "
78-
if default == 0:
79-
help += (
80-
'The default is 0, which means "as many as necessary". '
81-
"With -vdebug enabled, the default is 1."
82-
)
83-
else:
84-
help += f"The default is {default}."
85-
86-
return click.option(
87-
"--max-workers",
88-
default=default,
89-
type=click.IntRange(min=0, max=None),
90-
callback=max_workers_callback,
91-
help=help,
92-
)
93-
94-
9568
def collections_arg_callback(ctx, param, value):
9669
"""
9770
Expand the various CLI shortforms ("pair, pair/collection") to an iterable
@@ -126,10 +99,9 @@ def collections_arg_callback(ctx, param, value):
12699
"to be deleted from both sides."
127100
),
128101
)
129-
@max_workers_option()
130102
@pass_context
131103
@catch_errors
132-
def sync(ctx, collections, force_delete, max_workers):
104+
def sync(ctx, collections, force_delete):
133105
"""
134106
Synchronize the given collections or pairs. If no arguments are given, all
135107
will be synchronized.
@@ -151,53 +123,36 @@ def sync(ctx, collections, force_delete, max_workers):
151123
vdirsyncer sync bob/first_collection
152124
"""
153125
from .tasks import prepare_pair, sync_collection
154-
from .utils import WorkerQueue
155126

156-
wq = WorkerQueue(max_workers)
157-
158-
with wq.join():
159-
for pair_name, collections in collections:
160-
wq.put(
161-
functools.partial(
162-
prepare_pair,
163-
pair_name=pair_name,
164-
collections=collections,
165-
config=ctx.config,
166-
force_delete=force_delete,
167-
callback=sync_collection,
168-
)
169-
)
170-
wq.spawn_worker()
127+
for pair_name, collections in collections:
128+
prepare_pair(
129+
pair_name=pair_name,
130+
collections=collections,
131+
config=ctx.config,
132+
force_delete=force_delete,
133+
callback=sync_collection,
134+
)
171135

172136

173137
@app.command()
174138
@collections_arg
175-
@max_workers_option()
176139
@pass_context
177140
@catch_errors
178-
def metasync(ctx, collections, max_workers):
141+
def metasync(ctx, collections):
179142
"""
180143
Synchronize metadata of the given collections or pairs.
181144
182145
See the `sync` command for usage.
183146
"""
184147
from .tasks import prepare_pair, metasync_collection
185-
from .utils import WorkerQueue
186148

187-
wq = WorkerQueue(max_workers)
188-
189-
with wq.join():
190-
for pair_name, collections in collections:
191-
wq.put(
192-
functools.partial(
193-
prepare_pair,
194-
pair_name=pair_name,
195-
collections=collections,
196-
config=ctx.config,
197-
callback=metasync_collection,
198-
)
199-
)
200-
wq.spawn_worker()
149+
for pair_name, collections in collections:
150+
prepare_pair(
151+
pair_name=pair_name,
152+
collections=collections,
153+
config=ctx.config,
154+
callback=metasync_collection,
155+
)
201156

202157

203158
@app.command()
@@ -210,33 +165,25 @@ def metasync(ctx, collections, max_workers):
210165
"for debugging. This is slow and may crash for broken servers."
211166
),
212167
)
213-
@max_workers_option(default=1)
214168
@pass_context
215169
@catch_errors
216-
def discover(ctx, pairs, max_workers, list):
170+
def discover(ctx, pairs, list):
217171
"""
218172
Refresh collection cache for the given pairs.
219173
"""
220174
from .tasks import discover_collections
221-
from .utils import WorkerQueue
222175

223176
config = ctx.config
224-
wq = WorkerQueue(max_workers)
225-
226-
with wq.join():
227-
for pair_name in pairs or config.pairs:
228-
pair = config.get_pair(pair_name)
229-
230-
wq.put(
231-
functools.partial(
232-
discover_collections,
233-
status_path=config.general["status_path"],
234-
pair=pair,
235-
from_cache=False,
236-
list_collections=list,
237-
)
238-
)
239-
wq.spawn_worker()
177+
178+
for pair_name in pairs or config.pairs:
179+
pair = config.get_pair(pair_name)
180+
181+
discover_collections(
182+
status_path=config.general["status_path"],
183+
pair=pair,
184+
from_cache=False,
185+
list_collections=list,
186+
)
240187

241188

242189
@app.command()

vdirsyncer/cli/config.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
from configparser import RawConfigParser
55
from itertools import chain
66

7-
from click_threading import get_ui_worker
8-
97
from .. import exceptions
108
from .. import PROJECT_HOME
119
from ..utils import cached_property
@@ -257,11 +255,7 @@ def resolve(a, b):
257255
b_name = self.config_b["instance_name"]
258256
command = conflict_resolution[1:]
259257

260-
def inner():
261-
return _resolve_conflict_via_command(a, b, command, a_name, b_name)
262-
263-
ui_worker = get_ui_worker()
264-
return ui_worker.put(inner)
258+
return _resolve_conflict_via_command(a, b, command, a_name, b_name)
265259

266260
return resolve
267261
else:

vdirsyncer/cli/tasks.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import functools
21
import json
32

43
from .. import exceptions
@@ -16,15 +15,13 @@
1615
from .utils import save_status
1716

1817

19-
def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
18+
def prepare_pair(pair_name, collections, config, callback, **kwargs):
2019
pair = config.get_pair(pair_name)
2120

2221
all_collections = dict(
2322
collections_for_pair(status_path=config.general["status_path"], pair=pair)
2423
)
2524

26-
# spawn one worker less because we can reuse the current one
27-
new_workers = -1
2825
for collection_name in collections or all_collections:
2926
try:
3027
config_a, config_b = all_collections[collection_name]
@@ -35,20 +32,12 @@ def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
3532
pair_name, json.dumps(collection_name), list(all_collections)
3633
)
3734
)
38-
new_workers += 1
3935

4036
collection = CollectionConfig(pair, collection_name, config_a, config_b)
41-
wq.put(
42-
functools.partial(
43-
callback, collection=collection, general=config.general, **kwargs
44-
)
45-
)
46-
47-
for _ in range(new_workers):
48-
wq.spawn_worker()
37+
callback(collection=collection, general=config.general, **kwargs)
4938

5039

51-
def sync_collection(wq, collection, general, force_delete):
40+
def sync_collection(collection, general, force_delete):
5241
pair = collection.pair
5342
status_name = get_status_name(pair.name, collection.name)
5443

@@ -87,7 +76,7 @@ def error_callback(e):
8776
raise JobFailed()
8877

8978

90-
def discover_collections(wq, pair, **kwargs):
79+
def discover_collections(pair, **kwargs):
9180
rv = collections_for_pair(pair=pair, **kwargs)
9281
collections = list(c for c, (a, b) in rv)
9382
if collections == [None]:
@@ -128,7 +117,7 @@ def repair_collection(config, collection, repair_unsafe_uid):
128117
repair_storage(storage, repair_unsafe_uid=repair_unsafe_uid)
129118

130119

131-
def metasync_collection(wq, collection, general):
120+
def metasync_collection(collection, general):
132121
from ..metasync import metasync
133122

134123
pair = collection.pair

vdirsyncer/cli/utils.py

Lines changed: 0 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
import contextlib
22
import errno
33
import importlib
4-
import itertools
54
import json
65
import os
7-
import queue
86
import sys
97

108
import click
11-
import click_threading
129
from atomicwrites import atomic_write
1310

1411
from . import cli_logger
@@ -311,92 +308,6 @@ def handle_storage_init_error(cls, config):
311308
)
312309

313310

314-
class WorkerQueue:
315-
"""
316-
A simple worker-queue setup.
317-
318-
Note that workers quit if queue is empty. That means you have to first put
319-
things into the queue before spawning the worker!
320-
"""
321-
322-
def __init__(self, max_workers):
323-
self._queue = queue.Queue()
324-
self._workers = []
325-
self._max_workers = max_workers
326-
self._shutdown_handlers = []
327-
328-
# According to http://stackoverflow.com/a/27062830, those are
329-
# threadsafe compared to increasing a simple integer variable.
330-
self.num_done_tasks = itertools.count()
331-
self.num_failed_tasks = itertools.count()
332-
333-
def shutdown(self):
334-
while self._shutdown_handlers:
335-
try:
336-
self._shutdown_handlers.pop()()
337-
except Exception:
338-
pass
339-
340-
def _worker(self):
341-
while True:
342-
try:
343-
func = self._queue.get(False)
344-
except queue.Empty:
345-
break
346-
347-
try:
348-
func(wq=self)
349-
except Exception:
350-
handle_cli_error()
351-
next(self.num_failed_tasks)
352-
finally:
353-
self._queue.task_done()
354-
next(self.num_done_tasks)
355-
if not self._queue.unfinished_tasks:
356-
self.shutdown()
357-
358-
def spawn_worker(self):
359-
if self._max_workers and len(self._workers) >= self._max_workers:
360-
return
361-
362-
t = click_threading.Thread(target=self._worker)
363-
t.start()
364-
self._workers.append(t)
365-
366-
@contextlib.contextmanager
367-
def join(self):
368-
assert self._workers or not self._queue.unfinished_tasks
369-
ui_worker = click_threading.UiWorker()
370-
self._shutdown_handlers.append(ui_worker.shutdown)
371-
_echo = click.echo
372-
373-
with ui_worker.patch_click():
374-
yield
375-
376-
if not self._workers:
377-
# Ugly hack, needed because ui_worker is not running.
378-
click.echo = _echo
379-
cli_logger.critical("Nothing to do.")
380-
sys.exit(5)
381-
382-
ui_worker.run()
383-
self._queue.join()
384-
for worker in self._workers:
385-
worker.join()
386-
387-
tasks_failed = next(self.num_failed_tasks)
388-
tasks_done = next(self.num_done_tasks)
389-
390-
if tasks_failed > 0:
391-
cli_logger.error(
392-
"{} out of {} tasks failed.".format(tasks_failed, tasks_done)
393-
)
394-
sys.exit(1)
395-
396-
def put(self, f):
397-
return self._queue.put(f)
398-
399-
400311
def assert_permissions(path, wanted):
401312
permissions = os.stat(path).st_mode & 0o777
402313
if permissions > wanted:

0 commit comments

Comments
 (0)