5
5
from hamcrest import assert_that , contains_inanyorder , has_length
6
6
7
7
from nodestream .pipeline import Flush
8
- from nodestream .pipeline .transformers import (
9
- ConcurrentTransformer ,
10
- SwitchTransformer ,
11
- Transformer ,
12
- )
8
+ from nodestream .pipeline .transformers import (ConcurrentTransformer , SwitchTransformer ,
9
+ Transformer , )
13
10
from nodestream .pipeline .value_providers import JmespathValueProvider
14
11
15
12
@@ -27,11 +24,12 @@ async def transform_record(self, record):
27
24
28
25
29
26
@pytest .mark .asyncio
30
- async def test_concurrent_transformer_alL_items_collect ():
27
+ async def test_concurrent_transformer_all_items_collect ():
31
28
items = list (range (100 ))
32
29
add = AddOneConcurrently ()
30
+ mock_context = MagicMock ()
33
31
result = [r for i in items async for r in add .process_record (i , None )]
34
- result .extend ([r async for r in add .emit_outstanding_records ()])
32
+ result .extend ([r async for r in add .emit_outstanding_records (mock_context )])
35
33
assert_that (result , contains_inanyorder (* [i + 1 for i in items ]))
36
34
assert_that (result , has_length (len (items )))
37
35
@@ -80,23 +78,25 @@ async def downstream_client():
80
78
async def test_concurrent_transformer_worker_cleanup (mocker ):
81
79
add = AddOneConcurrently ()
82
80
add .thread_pool = mocker .Mock ()
83
- await add .finish (None )
81
+ mock_context = MagicMock ()
82
+ await add .finish (mock_context )
84
83
add .thread_pool .shutdown .assert_called_once_with (True )
85
84
86
85
87
86
@pytest .mark .asyncio
88
- async def test_concurrent_transformer_flush (mocker ):
87
+ async def test_concurrent_transformer_flush ():
89
88
add = AddOneConcurrently ()
89
+ mock_context = MagicMock ()
90
90
result = [
91
91
r for record in (1 , Flush , 2 ) async for r in add .process_record (record , None )
92
92
]
93
- result .extend ([r async for r in add .emit_outstanding_records ()])
93
+ result .extend ([r async for r in add .emit_outstanding_records (mock_context )])
94
94
assert_that (result , contains_inanyorder (2 , 3 , Flush ))
95
95
96
96
97
- class addNTransformer (Transformer ):
98
- def __init__ (self , N ):
99
- self .n = N
97
+ class AddNTransformer (Transformer ):
98
+ def __init__ (self , n ):
99
+ self .n = n
100
100
101
101
async def transform_record (self , record ):
102
102
yield dict (type = record ["type" ], value = record ["value" ] + self .n )
@@ -105,17 +105,17 @@ async def transform_record(self, record):
105
105
TEST_PROVIDER = JmespathValueProvider .from_string_expression ("type" )
106
106
TEST_CASES = {
107
107
"first" : {
108
- "implementation" : "tests.unit.pipeline.transformers.test_transformer:addNTransformer " ,
109
- "arguments" : {"N " : 1 },
108
+ "implementation" : "tests.unit.pipeline.transformers.test_transformer:AddNTransformer " ,
109
+ "arguments" : {"n " : 1 },
110
110
},
111
111
"second" : {
112
- "implementation" : "tests.unit.pipeline.transformers.test_transformer:addNTransformer " ,
113
- "arguments" : {"N " : 2 },
112
+ "implementation" : "tests.unit.pipeline.transformers.test_transformer:AddNTransformer " ,
113
+ "arguments" : {"n " : 2 },
114
114
},
115
115
}
116
116
DEFAULT_CASE = {
117
- "implementation" : "tests.unit.pipeline.transformers.test_transformer:addNTransformer " ,
118
- "arguments" : {"N " : 3 },
117
+ "implementation" : "tests.unit.pipeline.transformers.test_transformer:AddNTransformer " ,
118
+ "arguments" : {"n " : 3 },
119
119
}
120
120
121
121
TEST_DATA = [
@@ -164,7 +164,7 @@ async def test_switch_transformer_without_default():
164
164
assert results == TEST_RESULTS_WITH_NO_DEFAULT
165
165
166
166
167
- class TestConcurrentTransformer (ConcurrentTransformer ):
167
+ class MockConcurrentTransformer (ConcurrentTransformer ):
168
168
def __init__ (self ):
169
169
self .done = False
170
170
self .queue_size = 0
@@ -176,6 +176,6 @@ async def transform_record(self, record):
176
176
177
177
@pytest .mark .asyncio
178
178
async def test_emit_outstanding_records ():
179
- transformer = TestConcurrentTransformer ()
179
+ transformer = MockConcurrentTransformer ()
180
180
context = MagicMock ()
181
181
assert transformer .emit_outstanding_records (context )
0 commit comments