4
4
import random
5
5
6
6
from bus2csr import (
7
- FrontBusTestInterface ,
8
- compare_values ,
9
7
dword2int ,
10
8
get_frontend_bus_if ,
11
9
int2bytes ,
12
10
int2dword ,
13
11
)
14
- from utils import mask_bits , rand_bits , rand_bits32
15
12
16
13
import cocotb
17
- from cocotb .handle import SimHandleBase
18
- from cocotb .triggers import ClockCycles , Combine , Event , RisingEdge , Timer , with_timeout
14
+ from cocotb .triggers import ClockCycles , Combine , RisingEdge , Timer , with_timeout
19
15
from cocotb_helpers import reset_n
20
16
21
17
from cocotbext .axi .constants import AxiBurstType
@@ -29,7 +25,7 @@ async def timeout_task(timeout):
29
25
async def initialize (dut , timeout = 50 ):
30
26
"""
31
27
Common test initialization routine which sets up environment and starts a timeout coroutine
32
- to observe whether the test did not fall in infinite loop.
28
+ to observe whether the test did not fall into infinite loop.
33
29
"""
34
30
35
31
cocotb .log .setLevel (logging .DEBUG )
@@ -69,6 +65,7 @@ async def initialize(dut, timeout=50):
69
65
await ClockCycles (tb .clk , 20 )
70
66
await reset_n (tb .clk , tb .rst_n , cycles = 5 )
71
67
68
+ # Generate test data
72
69
data_len = random .randint (10 , 64 )
73
70
test_data = [random .randint (0 , 2 ** 32 - 1 ) for _ in range (data_len )]
74
71
@@ -87,7 +84,7 @@ async def writer():
87
84
# Write sequence should just write data
88
85
for d in test_data :
89
86
await tb .write_csr (fifo_addr , int2bytes (d ))
90
- # Wait for read to finish to avoid multiple writes per read
87
+ # Wait for read to finish in order to avoid multiple writes per read
91
88
await tb .axi_m .wait_read ()
92
89
93
90
async def reader (return_data ):
@@ -120,11 +117,14 @@ async def test_collision_with_read(dut):
120
117
121
118
fifo_addr = tb .reg_map .I3C_EC .SECFWRECOVERYIF .INDIRECT_FIFO_DATA .base_addr
122
119
120
+ read_offset = 2
121
+
123
122
async def writer ():
124
123
# Write sequence should write data on each read data
125
124
for i , d in enumerate (test_data ):
126
- # Awaiting read request causes writing simultaneously with read data channel activity
127
- if i > 2 :
125
+ # Load first two dwords independently
126
+ if i > read_offset :
127
+ # Awaiting read request causes writing simultaneously with read data channel activity
128
128
await RisingEdge (dut .s_cpuif_req )
129
129
assert not dut .s_cpuif_req_is_wr .value
130
130
# Wait additional cycle to line up write with FIFO read delay
@@ -133,7 +133,6 @@ async def writer():
133
133
134
134
async def reader (return_data ):
135
135
# Wait until there is data in FIFO
136
- read_offset = 2
137
136
while int (dut .fifo_depth_o .value ) < read_offset :
138
137
await RisingEdge (tb .clk )
139
138
@@ -161,7 +160,10 @@ async def test_write_read_burst(dut):
161
160
162
161
fifo_addr = tb .reg_map .I3C_EC .SECFWRECOVERYIF .INDIRECT_FIFO_DATA .base_addr
163
162
163
+ # Run write burst to fill the FIFO
164
164
await with_timeout (tb .axi_m .write_dwords (fifo_addr , test_data , burst = AxiBurstType .FIXED ), 1 , "us" )
165
+
166
+ # Run read burst to empty the FIFO
165
167
received_data = await with_timeout (tb .axi_m .read_dwords (fifo_addr , count = data_len , burst = AxiBurstType .FIXED ), 1 , "us" )
166
168
167
169
assert received_data == test_data , "Received data does not match sent data!"
@@ -187,9 +189,13 @@ async def reader(return_data):
187
189
received_data = []
188
190
half_write_timer = ClockCycles (tb .clk , data_len * single_write_cycles // 2 )
189
191
192
+ # Request write burst
190
193
w = cocotb .start_soon (writer ())
191
194
await half_write_timer
195
+
196
+ # Request read burst during write burst, should wait for write to finish
192
197
r = cocotb .start_soon (reader (received_data ))
198
+
193
199
await Combine (w , r )
194
200
195
201
assert received_data == test_data , "Received data does not match sent data!"
@@ -219,14 +225,18 @@ async def reader(return_data):
219
225
# Request 1st write burst
220
226
w1 = cocotb .start_soon (writer ())
221
227
await half_write_timer
228
+
222
229
# Request 1st read burst during 1st write burst, should wait for write to finish
223
230
r1 = cocotb .start_soon (reader (received_data1 ))
224
231
await half_write_timer
232
+
225
233
# Request 2nd write burst that will collision with 1st read burst, should wait for read to finish
226
234
w2 = cocotb .start_soon (writer ())
227
235
await half_write_timer
236
+
228
237
# Request 2nd read burst during 2nd write burst, should wait for write to finish
229
238
r2 = cocotb .start_soon (reader (received_data2 ))
239
+
230
240
await Combine (w1 , r1 , w2 , r2 )
231
241
232
242
assert received_data1 == test_data , "Received data from 1st burst does not match sent data!"
0 commit comments