Skip to content

Commit 058e3c5

Browse files
committed
Refactor nbexecute to do file-ops in sync context
1 parent e302030 commit 058e3c5

File tree

1 file changed

+71
-67
lines changed

1 file changed

+71
-67
lines changed

appyter/render/nbexecute.py

Lines changed: 71 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from appyter.cli import cli
1414
from appyter.ext.emitter import json_emitter_factory
1515
from appyter.ext.fsspec.core import url_to_chroot_fs
16-
from appyter.ext.asyncio.helpers import ensure_async_contextmanager, ensure_sync
16+
from appyter.ext.asyncio.helpers import ensure_sync, ensure_async
1717
from appyter.ext.nbclient import NotebookClientIOPubHook
1818
from appyter.parse.nb import nb_from_ipynb_io, nb_to_ipynb_io, nb_to_json
1919
from appyter.ext.click import click_option_setenv, click_argument_setenv
@@ -33,9 +33,10 @@ async def iopub_hook(cell, cell_index):
3333
await emit({ 'type': 'cell', 'data': [cell, cell_index] })
3434
return iopub_hook
3535

36-
async def nbexecute_async(ipynb='', emit=json_emitter_factory(sys.stdout), cwd='', subscribe=None, fuse=False):
36+
def nbexecute_sync(ipynb='', emit=json_emitter_factory(sys.stdout), cwd='', subscribe=None, fuse=False):
3737
logger.info('starting')
3838
assert callable(emit), 'Emit must be callable'
39+
emit_sync = ensure_sync(emit)
3940
with fsspec.open(str(URI(cwd).join(ipynb)), 'r') as fr:
4041
nb = nb_from_ipynb_io(fr)
4142
#
@@ -62,78 +63,30 @@ async def nbexecute_async(ipynb='', emit=json_emitter_factory(sys.stdout), cwd='
6263
del nb.metadata['execution_info']
6364
#
6465
if 'completed' in nb.metadata['appyter']['nbexecute']:
65-
await emit({ 'type': 'error', 'data': f"Execution already completed at {nb.metadata['appyter']['nbexecute']['completed']}" })
66+
emit_sync({ 'type': 'error', 'data': f"Execution already completed at {nb.metadata['appyter']['nbexecute']['completed']}" })
6667
return
6768
elif 'started' in nb.metadata['appyter']['nbexecute']:
68-
await emit({ 'type': 'error', 'data': f"Execution already started at {nb.metadata['appyter']['nbexecute']['started']}" })
69+
emit_sync({ 'type': 'error', 'data': f"Execution already started at {nb.metadata['appyter']['nbexecute']['started']}" })
6970
return
7071
#
71-
await emit({ 'type': 'status', 'data': 'Starting' })
72+
emit_sync({ 'type': 'status', 'data': 'Starting' })
7273
state = dict(progress=0, status='Starting')
7374
#
7475
try:
7576
files = nb.metadata['appyter']['nbconstruct'].get('files')
7677
logger.debug(f"{cwd=} {files=}")
77-
async with ensure_async_contextmanager(url_to_chroot_fs(cwd, pathmap=files)) as fs:
78-
async with ensure_async_contextmanager(fs.mount(fuse=fuse)) as mnt:
78+
with url_to_chroot_fs(cwd, pathmap=files) as fs:
79+
with fs.mount(fuse=fuse) as mnt:
7980
# setup execution_info with start time
8081
nb.metadata['appyter']['nbexecute']['started'] = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc).isoformat()
8182
with (mnt/ipynb).open('w') as fw:
8283
nb_to_ipynb_io(nb, fw)
83-
#
84-
logger.info('initializing')
85-
if callable(subscribe):
86-
await subscribe(lambda: dict(nb=nb_to_json(nb), **state))
87-
#
88-
try:
89-
iopub_hook = iopub_hook_factory(nb, emit)
90-
client = NotebookClientIOPubHook(
91-
nb,
92-
allow_errors=True,
93-
timeout=None,
94-
kernel_name='python3',
95-
resources={ 'metadata': {'path': str(mnt) } },
96-
iopub_hook=iopub_hook,
97-
)
98-
await emit({ 'type': 'nb', 'data': nb_to_json(nb) })
99-
try:
100-
async with client.async_setup_kernel(
101-
env=dict(
102-
{ k: v for k, v in os.environ.items() if not k.startswith('APPYTER_') },
103-
PYTHONPATH=':'.join(sys.path),
104-
),
105-
):
106-
logger.info('executing')
107-
state.update(status='Executing...', progress=0)
108-
await emit({ 'type': 'status', 'data': state['status'] })
109-
await emit({ 'type': 'progress', 'data': state['progress'] })
110-
client.set_widgets_metadata()
111-
n_cells = len(nb.cells)
112-
exec_count = 1
113-
for index, cell in enumerate(nb.cells):
114-
logger.debug(f"executing cell {index}")
115-
cell = await client.async_execute_cell(
116-
cell, index,
117-
execution_count=exec_count,
118-
)
119-
if cell_is_code(cell):
120-
if cell_has_error(cell):
121-
raise Exception('Cell execution error on cell %d' % (exec_count))
122-
exec_count += 1
123-
if index < n_cells-1:
124-
state['progress'] = index + 1
125-
await emit({ 'type': 'progress', 'data': state['progress'] })
126-
else:
127-
state['status'] = 'Success'
128-
await emit({ 'type': 'status', 'data': state['status'] })
129-
finally:
130-
client.set_widgets_metadata()
131-
except asyncio.CancelledError:
132-
logger.info('cancelled')
133-
raise
134-
except Exception as e:
84+
exc, ret = ensure_sync(nbexecute_async_inner)(nb, mnt, emit=emit, subscribe=subscribe)
85+
nb = ret['nb']
86+
state = ret['state']
87+
if exc:
13588
logger.info(f"execution error: {traceback.format_exc()}")
136-
await emit({ 'type': 'error', 'data': str(e) })
89+
emit_sync({ 'type': 'error', 'data': str(exc) })
13790
# Save execution completion time
13891
logger.info('saving')
13992
nb.metadata['appyter']['nbexecute']['completed'] = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc).isoformat()
@@ -153,19 +106,70 @@ async def nbexecute_async(ipynb='', emit=json_emitter_factory(sys.stdout), cwd='
153106
nb_to_ipynb_io(nb, fw)
154107
#
155108
logger.info('finalized')
156-
except asyncio.CancelledError:
157-
logger.info(f"outer cancelled")
158-
raise
159-
except Exception as e:
109+
except Exception:
160110
logger.error(traceback.format_exc())
161111
if state['status'] != 'Success':
162-
await emit({ 'type': 'error', 'data': f"Error occured while executing" })
112+
emit_sync({ 'type': 'error', 'data': f"Error occured while executing" })
163113
else:
164-
await emit({ 'type': 'error', 'data': f"Error occured while finalizing" })
114+
emit_sync({ 'type': 'error', 'data': f"Error occured while finalizing" })
165115
raise
166116
finally:
167117
logger.info('complete')
168-
#
118+
119+
async def nbexecute_async_inner(nb, mnt, emit=None, subscribe=None):
120+
state = dict(progress=0, status='Starting')
121+
try:
122+
logger.info('initializing')
123+
if callable(subscribe):
124+
await subscribe(lambda: dict(nb=nb_to_json(nb), **state))
125+
iopub_hook = iopub_hook_factory(nb, emit)
126+
client = NotebookClientIOPubHook(
127+
nb,
128+
allow_errors=True,
129+
timeout=None,
130+
kernel_name='python3',
131+
resources={ 'metadata': {'path': str(mnt) } },
132+
iopub_hook=iopub_hook,
133+
)
134+
await emit({ 'type': 'nb', 'data': nb_to_json(nb) })
135+
try:
136+
async with client.async_setup_kernel(
137+
env=dict(
138+
{ k: v for k, v in os.environ.items() if not k.startswith('APPYTER_') },
139+
PYTHONPATH=':'.join(sys.path),
140+
),
141+
):
142+
logger.info('executing')
143+
state.update(status='Executing...', progress=0)
144+
await emit({ 'type': 'status', 'data': state['status'] })
145+
await emit({ 'type': 'progress', 'data': state['progress'] })
146+
client.set_widgets_metadata()
147+
n_cells = len(nb.cells)
148+
exec_count = 1
149+
for index, cell in enumerate(nb.cells):
150+
logger.debug(f"executing cell {index}")
151+
cell = await client.async_execute_cell(
152+
cell, index,
153+
execution_count=exec_count,
154+
)
155+
if cell_is_code(cell):
156+
if cell_has_error(cell):
157+
raise Exception('Cell execution error on cell %d' % (exec_count))
158+
exec_count += 1
159+
if index < n_cells-1:
160+
state['progress'] = index + 1
161+
await emit({ 'type': 'progress', 'data': state['progress'] })
162+
else:
163+
state['status'] = 'Success'
164+
await emit({ 'type': 'status', 'data': state['status'] })
165+
finally:
166+
client.set_widgets_metadata()
167+
except Exception as exc:
168+
return exc, dict(nb=nb, state=state)
169+
else:
170+
return None, dict(nb=nb, state=state)
171+
172+
nbexecute_async = ensure_async(nbexecute_sync)
169173

170174
@cli.command(help='Execute a jupyter notebook on the command line asynchronously')
171175
@click.option('-s', type=str, metavar='URI', default='file:///dev/stdout', help='Status stream')

0 commit comments

Comments
 (0)