1- from ..outputs import OutputProcessor
1+ import json
2+ from pathlib import Path
3+ from tempfile import TemporaryDirectory
4+ from uuid import uuid4
5+
6+ from ..outputs import OutputProcessor , OutputsManager
7+
8+ class TestOutputProcessor (OutputProcessor ):
9+
10+ settings = {}
11+
212
313def test_instantiation ():
4- op = OutputProcessor ()
14+ op = OutputProcessor ()
15+
16+ def create_incoming_message (cell_id ):
17+ msg_id = str (uuid4 ())
18+ header = {"msg_id" : msg_id , "msg_type" : "shell" }
19+ parent_header = {}
20+ metadata = {"cellId" : cell_id }
21+ msg = [json .dumps (item ) for item in [header , parent_header , metadata ]]
22+ return msg_id , msg
23+
24+ def test_incoming_message ():
25+
26+ with TemporaryDirectory () as td :
27+ op = TestOutputProcessor ()
28+ om = OutputsManager ()
29+ op .settings ["outputs_manager" ] = om
30+ op .outputs_path = Path (td ) / "outputs"
31+ # Simulate the running of a cell
32+ cell_id = str (uuid4 ())
33+ msg_id , msg = create_incoming_message (cell_id )
34+ op .process_incoming_message ('shell' , msg )
35+ assert op .get_cell_id (msg_id ) == cell_id
36+ assert op .get_msg_id (cell_id ) == msg_id
37+ # # Clear the cell_id
38+ # op.clear(cell_id)
39+ # assert op.get_cell_id(msg_id) is None
40+ # assert op.get_msg_id(cell_id) is None
41+ # # Simulate the running of a cell
42+ # cell_id = str(uuid4())
43+ # msg_id, msg = create_incoming_message(cell_id)
44+ # op.process_incoming_message('shell', msg)
45+ # assert op.get_cell_id(msg_id) == cell_id
46+ # assert op.get_msg_id(cell_id) == msg_id
47+ # # Run it again without clearing to ensure it self clears
48+ # cell_id = str(uuid4())
49+ # msg_id, msg = create_incoming_message(cell_id)
50+ # assert op.get_cell_id(msg_id) == cell_id
51+ # assert op.get_msg_id(cell_id) == msg_id
0 commit comments