Skip to content

Commit c9f5428

Browse files
committed
fixes #210
1 parent d285be8 commit c9f5428

File tree

3 files changed

+152
-38
lines changed

3 files changed

+152
-38
lines changed

fastcore/_nbdev.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@
160160
"repo_details": "03_xtras.ipynb",
161161
"run": "03_xtras.ipynb",
162162
"do_request": "03_xtras.ipynb",
163+
"threaded": "03_xtras.ipynb",
164+
"start_server": "03_xtras.ipynb",
165+
"start_client": "03_xtras.ipynb",
163166
"sort_by_run": "03_xtras.ipynb",
164167
"trace": "03_xtras.ipynb",
165168
"round_multiple": "03_xtras.ipynb",
@@ -172,7 +175,6 @@
172175
"parallel": "03_xtras.ipynb",
173176
"run_procs": "03_xtras.ipynb",
174177
"parallel_gen": "03_xtras.ipynb",
175-
"threaded": "03_xtras.ipynb",
176178
"lenient_issubclass": "04_dispatch.ipynb",
177179
"sorted_topologically": "04_dispatch.ipynb",
178180
"TypeDispatch": "04_dispatch.ipynb",

fastcore/xtras.py

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
__all__ = ['dict2obj', 'repr_dict', 'is_listy', 'shufflish', 'mapped', 'IterLen', 'ReindexCollection', 'open_file',
44
'save_pickle', 'load_pickle', 'maybe_open', 'image_size', 'bunzip', 'join_path_file', 'urlwrap', 'urlopen',
55
'urlread', 'urljson', 'urlcheck', 'urlclean', 'urlsave', 'urlvalid', 'untar_dir', 'repo_details', 'run',
6-
'do_request', 'sort_by_run', 'trace', 'round_multiple', 'modified_env', 'ContextManagers', 'str2bool',
7-
'set_num_threads', 'ProcessPoolExecutor', 'ThreadPoolExecutor', 'parallel', 'run_procs', 'parallel_gen',
8-
'threaded']
6+
'do_request', 'threaded', 'start_server', 'start_client', 'sort_by_run', 'trace', 'round_multiple',
7+
'modified_env', 'ContextManagers', 'str2bool', 'set_num_threads', 'ProcessPoolExecutor',
8+
'ThreadPoolExecutor', 'parallel', 'run_procs', 'parallel_gen']
99

1010
# Cell
1111
from .imports import *
@@ -14,7 +14,7 @@
1414
from functools import wraps
1515

1616
import mimetypes,pickle,random,json,urllib,subprocess,shlex,bz2,gzip,zipfile,tarfile
17-
import imghdr,struct,socket,distutils.util,urllib.request,tempfile
17+
import imghdr,struct,socket,distutils.util,urllib.request,tempfile,time
1818
from contextlib import contextmanager,ExitStack
1919
from pdb import set_trace
2020
from urllib.request import Request
@@ -271,6 +271,43 @@ def do_request(url, post=False, headers=None, **data):
271271
data = None
272272
return urljson(Request(url, headers=headers, data=data or None))
273273

274+
# Cell
275+
def threaded(f):
276+
"Run `f` in a thread, and returns the thread"
277+
@wraps(f)
278+
def _f(*args, **kwargs):
279+
res = Thread(target=f, args=args, kwargs=kwargs)
280+
res.start()
281+
return res
282+
return _f
283+
284+
# Cell
285+
def _socket_det(port,host,dgram):
286+
if isinstance(port,int): family,addr = socket.AF_INET,(host or socket.gethostname(),port)
287+
else: family,addr = socket.AF_UNIX,port
288+
return family,addr,(socket.SOCK_STREAM,socket.SOCK_DGRAM)[dgram]
289+
290+
# Cell
291+
def start_server(port, host=None, dgram=False, n_queue=None):
292+
"Create a `socket` server on `port`, with optional `host`, of type `dgram`"
293+
listen_args = [n_queue] if n_queue else []
294+
family,addr,typ = _socket_det(port,host,dgram)
295+
if family==socket.AF_UNIX:
296+
if os.path.exists(addr): os.unlink(addr)
297+
assert not os.path.exists(addr), f"{addr} in use"
298+
s = socket.socket(family, typ)
299+
s.bind(addr)
300+
s.listen(*listen_args)
301+
return s
302+
303+
# Cell
304+
def start_client(port, host=None, dgram=False):
305+
"Create a `socket` client on `port`, with optional `host`, of type `dgram`"
306+
family,addr,typ = _socket_det(port,host,dgram)
307+
s = socket.socket(family, typ)
308+
s.connect(addr)
309+
return s
310+
274311
# Cell
275312
def _is_instance(f, gs):
276313
tst = [g if type(g) in [type, 'function'] else g.__class__ for g in gs]
@@ -342,7 +379,6 @@ def str2bool(s):
342379
# Cell
343380
from multiprocessing import Process, Queue
344381
import concurrent.futures
345-
import time
346382
from multiprocessing import Manager
347383

348384
# Cell
@@ -446,14 +482,4 @@ def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
446482
if progress_bar: items = progress_bar(items, leave=False)
447483
f=partial(_f_pg, cls(**kwargs), queue)
448484
done=partial(_done_pg, queue, items)
449-
yield from run_procs(f, done, L(batches,idx).zip())
450-
451-
# Cell
452-
def threaded(f):
453-
"Run `f` in a thread, and returns the thread"
454-
@wraps(f)
455-
def _f(*args, **kwargs):
456-
res = Thread(target=f, args=args, kwargs=kwargs)
457-
res.start()
458-
return res
459-
return _f
485+
yield from run_procs(f, done, L(batches,idx).zip())

nbs/03_xtras.ipynb

Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"from functools import wraps\n",
2323
"\n",
2424
"import mimetypes,pickle,random,json,urllib,subprocess,shlex,bz2,gzip,zipfile,tarfile\n",
25-
"import imghdr,struct,socket,distutils.util,urllib.request,tempfile\n",
25+
"import imghdr,struct,socket,distutils.util,urllib.request,tempfile,time\n",
2626
"from contextlib import contextmanager,ExitStack\n",
2727
"from pdb import set_trace\n",
2828
"from urllib.request import Request\n",
@@ -588,7 +588,7 @@
588588
{
589589
"data": {
590590
"text/plain": [
591-
"['e', 'c', 'h', 'b', 'g', 'a', 'd', 'f']"
591+
"['a', 'c', 'f', 'd', 'g', 'e', 'b', 'h']"
592592
]
593593
},
594594
"execution_count": null,
@@ -874,7 +874,7 @@
874874
"cell_type": "markdown",
875875
"metadata": {},
876876
"source": [
877-
"Utilities (other than extensions to Pathlib.Path) for dealing with IO."
877+
"Utilities (other than extensions to Pathlib.Path) for dealing with IO and sockets."
878878
]
879879
},
880880
{
@@ -1411,6 +1411,110 @@
14111411
" return urljson(Request(url, headers=headers, data=data or None))"
14121412
]
14131413
},
1414+
{
1415+
"cell_type": "code",
1416+
"execution_count": null,
1417+
"metadata": {},
1418+
"outputs": [],
1419+
"source": [
1420+
"#export\n",
1421+
"def threaded(f):\n",
1422+
" \"Run `f` in a thread, and returns the thread\"\n",
1423+
" @wraps(f)\n",
1424+
" def _f(*args, **kwargs):\n",
1425+
" res = Thread(target=f, args=args, kwargs=kwargs)\n",
1426+
" res.start()\n",
1427+
" return res\n",
1428+
" return _f"
1429+
]
1430+
},
1431+
{
1432+
"cell_type": "markdown",
1433+
"metadata": {},
1434+
"source": [
1435+
"See `start_client` for an example."
1436+
]
1437+
},
1438+
{
1439+
"cell_type": "code",
1440+
"execution_count": null,
1441+
"metadata": {},
1442+
"outputs": [],
1443+
"source": [
1444+
"#export\n",
1445+
"def _socket_det(port,host,dgram):\n",
1446+
" if isinstance(port,int): family,addr = socket.AF_INET,(host or socket.gethostname(),port)\n",
1447+
" else: family,addr = socket.AF_UNIX,port\n",
1448+
" return family,addr,(socket.SOCK_STREAM,socket.SOCK_DGRAM)[dgram]"
1449+
]
1450+
},
1451+
{
1452+
"cell_type": "code",
1453+
"execution_count": null,
1454+
"metadata": {},
1455+
"outputs": [],
1456+
"source": [
1457+
"#export\n",
1458+
"def start_server(port, host=None, dgram=False, n_queue=None):\n",
1459+
" \"Create a `socket` server on `port`, with optional `host`, of type `dgram`\"\n",
1460+
" listen_args = [n_queue] if n_queue else []\n",
1461+
" family,addr,typ = _socket_det(port,host,dgram)\n",
1462+
" if family==socket.AF_UNIX:\n",
1463+
" if os.path.exists(addr): os.unlink(addr)\n",
1464+
" assert not os.path.exists(addr), f\"{addr} in use\"\n",
1465+
" s = socket.socket(family, typ)\n",
1466+
" s.bind(addr)\n",
1467+
" s.listen(*listen_args)\n",
1468+
" return s"
1469+
]
1470+
},
1471+
{
1472+
"cell_type": "markdown",
1473+
"metadata": {},
1474+
"source": [
1475+
"You can create a TCP client and server pass an int as `port` and optional `host`. `host` defaults to your main network interface if not provided. You can create a Unix socket client and server by passing a string to `port`. A `SOCK_STREAM` socket is created by default, unless you pass `dgram=True`, in which case a `SOCK_DGRAM` socket is created. `n_queue` sets the listening queue size."
1476+
]
1477+
},
1478+
{
1479+
"cell_type": "code",
1480+
"execution_count": null,
1481+
"metadata": {},
1482+
"outputs": [],
1483+
"source": [
1484+
"#export\n",
1485+
"def start_client(port, host=None, dgram=False):\n",
1486+
" \"Create a `socket` client on `port`, with optional `host`, of type `dgram`\"\n",
1487+
" family,addr,typ = _socket_det(port,host,dgram)\n",
1488+
" s = socket.socket(family, typ)\n",
1489+
" s.connect(addr)\n",
1490+
" return s"
1491+
]
1492+
},
1493+
{
1494+
"cell_type": "code",
1495+
"execution_count": null,
1496+
"metadata": {},
1497+
"outputs": [
1498+
{
1499+
"name": "stdout",
1500+
"output_type": "stream",
1501+
"text": [
1502+
"b'hello'\n"
1503+
]
1504+
}
1505+
],
1506+
"source": [
1507+
"@threaded\n",
1508+
"def _f():\n",
1509+
" with start_server(47354, 'localhost') as s:\n",
1510+
" conn,addr = s.accept()\n",
1511+
" print(conn.recv(1024))\n",
1512+
"\n",
1513+
"_f()\n",
1514+
"time.sleep(0.2) # Wait for server to start\n",
1515+
"with start_client(47354, 'localhost') as c: c.send(b\"hello\")"
1516+
]
1517+
},
14141518
{
14151519
"cell_type": "markdown",
14161520
"metadata": {},
@@ -1664,7 +1768,6 @@
16641768
"#export\n",
16651769
"from multiprocessing import Process, Queue\n",
16661770
"import concurrent.futures\n",
1667-
"import time\n",
16681771
"from multiprocessing import Manager"
16691772
]
16701773
},
@@ -2045,23 +2148,6 @@
20452148
"test_eq(res.sorted().itemgot(1), x+1)"
20462149
]
20472150
},
2048-
{
2049-
"cell_type": "code",
2050-
"execution_count": null,
2051-
"metadata": {},
2052-
"outputs": [],
2053-
"source": [
2054-
"#export\n",
2055-
"def threaded(f):\n",
2056-
" \"Run `f` in a thread, and returns the thread\"\n",
2057-
" @wraps(f)\n",
2058-
" def _f(*args, **kwargs):\n",
2059-
" res = Thread(target=f, args=args, kwargs=kwargs)\n",
2060-
" res.start()\n",
2061-
" return res\n",
2062-
" return _f"
2063-
]
2064-
},
20652151
{
20662152
"cell_type": "markdown",
20672153
"metadata": {},

0 commit comments

Comments
 (0)