Skip to content

Commit 85dfae8

Browse files
committed
close channels!
its architecturally a bit ambiguous who should be responsible for this... channels live inside providers inside executors and its arguable that providers shoudl do that, when closed by executors (which they are not). however, the DFK contains a bunch of channel initialization, and so it is legitimate for it to also contain a bunch of channel cleanup TESTING: nothing actually tests this - perhaps a mock channel that we check DFK setup and shutdown works on? I have seent this test in CI on cleanup of: parsl/tests/test_htex/test_connected_blocks.py E AssertionError: The provider model assumes a provider has channel(s) 1986 but that test works for me... ?
1 parent 3e007d8 commit 85dfae8

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

parsl/dataflow/dflow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,7 +1277,39 @@ def cleanup(self) -> None:
12771277
executor.shutdown()
12781278
logger.info(f"Shut down executor {executor.label}")
12791279

1280+
if hasattr(executor, 'provider'):
1281+
logger.info(f"Closing channel(s) for {executor.label}")
1282+
1283+
# The reasoning about which of .channel or .channels should
1284+
# be present is not well described (or describable) in the
1285+
# type system, so there are a lot of asserts here that
1286+
# attempt to describe the non-checked assumptions.
1287+
1288+
# This logic is based on the logic in add_executors.
1289+
1290+
# These two asserts could be an XOR but the 'and' and 'or'
1291+
# components of the XOR are separated here to give different
1292+
# error text.
1293+
assert hasattr(executor.provider, 'channel') or hasattr(executor.provider, 'channels'), \
1294+
"The provider model assumes a provider has channel(s)"
1295+
assert not (hasattr(executor.provider, 'channel') and hasattr(executor.provider, 'channels')), \
1296+
"The provider model assumes a provider does not have .channel and .channels"
1297+
1298+
if hasattr(executor.provider, 'channels'):
1299+
for channel in executor.provider.channels:
1300+
channel.close()
1301+
else:
1302+
assert hasattr(executor.provider, 'channel'), "If provider has no .channels, it must have .channel"
1303+
executor.provider.channel.close()
1304+
1305+
logger.info(f"Closed executor channel(s) for {executor.label}")
1306+
12801307
logger.info("Terminated executors")
1308+
1309+
logger.info("Closing channels")
1310+
1311+
logger.info("Closed channels")
1312+
12811313
self.time_completed = datetime.datetime.now()
12821314

12831315
if self.monitoring:

0 commit comments

Comments
 (0)