Skip to content

Commit 11fbe4a

Browse files
committed
close channels!
its architecturally a bit ambiguous who should be responsible for this... channels live inside providers inside executors and its arguable that providers shoudl do that, when closed by executors (which they are not). however, the DFK contains a bunch of channel initialization, and so it is legitimate for it to also contain a bunch of channel cleanup TESTING: nothing actually tests this - perhaps a mock channel that we check DFK setup and shutdown works on? I have seent this test in CI on cleanup of: parsl/tests/test_htex/test_connected_blocks.py E AssertionError: The provider model assumes a provider has channel(s) 1986 but that test works for me... ?
1 parent 367b0bd commit 11fbe4a

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

parsl/dataflow/dflow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,39 @@ def cleanup(self) -> None:
12561256
executor.shutdown()
12571257
logger.info(f"Shut down executor {executor.label}")
12581258

1259+
if hasattr(executor, 'provider'):
1260+
logger.info(f"Closing channel(s) for {executor.label}")
1261+
1262+
# The reasoning about which of .channel or .channels should
1263+
# be present is not well described (or describable) in the
1264+
# type system, so there are a lot of asserts here that
1265+
# attempt to describe the non-checked assumptions.
1266+
1267+
# This logic is based on the logic in add_executors.
1268+
1269+
# These two asserts could be an XOR but the 'and' and 'or'
1270+
# components of the XOR are separated here to give different
1271+
# error text.
1272+
assert hasattr(executor.provider, 'channel') or hasattr(executor.provider, 'channels'), \
1273+
"The provider model assumes a provider has channel(s)"
1274+
assert not (hasattr(executor.provider, 'channel') and hasattr(executor.provider, 'channels')), \
1275+
"The provider model assumes a provider does not have .channel and .channels"
1276+
1277+
if hasattr(executor.provider, 'channels'):
1278+
for channel in executor.provider.channels:
1279+
channel.close()
1280+
else:
1281+
assert hasattr(executor.provider, 'channel'), "If provider has no .channels, it must have .channel"
1282+
executor.provider.channel.close()
1283+
1284+
logger.info(f"Closed executor channel(s) for {executor.label}")
1285+
12591286
logger.info("Terminated executors")
1287+
1288+
logger.info("Closing channels")
1289+
1290+
logger.info("Closed channels")
1291+
12601292
self.time_completed = datetime.datetime.now()
12611293

12621294
if self.monitoring:

0 commit comments

Comments
 (0)