From a7d92072c0bdc3a3e1c99de64f353e932846bc2a Mon Sep 17 00:00:00 2001 From: Alejandro Soto Date: Sat, 4 May 2024 14:19:48 -0600 Subject: rtl/pkt_switch: replace axi_timer example with pkt_switch --- rtl/pkt_switch/testbench.py | 283 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 rtl/pkt_switch/testbench.py (limited to 'rtl/pkt_switch/testbench.py') diff --git a/rtl/pkt_switch/testbench.py b/rtl/pkt_switch/testbench.py new file mode 100644 index 0000000..8a8b22a --- /dev/null +++ b/rtl/pkt_switch/testbench.py @@ -0,0 +1,283 @@ + +'''Copyright (c) 2020-2023, MC ASIC Design Consulting +All rights reserved. + +Author: Marek Cieplucha, https://github.com/mciepluc + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met (The BSD 2-Clause +License): + +1. Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL POTENTIAL VENTURES LTD BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ''' + +""" +Example packet switch testbench with functional coverage and constrained +randomization. Simple packet switch is a module that routes packets from the +input interface to output interfaces (1 or 2) depending on configured address +or length based filter. Test generates random packets and checks if it has been +transmitted correctly. +""" + +import cocotb +from cocotb.triggers import Timer, RisingEdge, ReadOnly + +from cocotb_bus.drivers import BusDriver +from cocotb_bus.monitors import BusMonitor + +from cocotb_coverage.coverage import * +from cocotb_coverage.crv import * + +import numpy as np + +class Packet(Randomized): + def __init__(self, data = [0, 3, 0]): + Randomized.__init__(self) + self.addr = data[0] + self.len = len(data) + self.payload = data[2:] + + self.add_rand("addr", list(range(256))) + self.add_rand("len", list(range(3,32))) + + def post_randomize(self): + self.payload = [np.random.randint(256) for _ in range(self.len-2)] + +class PacketIFDriver(BusDriver): + ''' + Packet Interface Driver + ''' + _signals = ["data", "valid"] + + def __init__(self, entity, name, clock): + BusDriver.__init__(self, entity, name, clock) + self.clock = clock + self.bus.data.setimmediatevalue(0) + self.bus.valid.setimmediatevalue(0) + + @cocotb.coroutine + def send(self, packet): + self.bus.valid.value = 1 + # transmit header + self.bus.data.value = packet.addr + yield RisingEdge(self.clock) + self.bus.data.value = packet.len + yield RisingEdge(self.clock) + for byte in packet.payload: + self.bus.data.value = byte + yield RisingEdge(self.clock) + self.bus.valid.value = 0 + yield RisingEdge(self.clock) + +class PacketIFMonitor(BusMonitor): + ''' + Packet Interface Monitor + ''' + _signals = ["data", "valid"] + + def __init__(self, entity, name, clock): + BusMonitor.__init__(self, entity, name, clock) + self.clock = clock + + @cocotb.coroutine + def _monitor_recv(self): + pkt_receiving = False + received_data = [] + while True: + yield RisingEdge(self.clock) + yield ReadOnly() + if (self.bus.valid == 1): + pkt_receiving = True + received_data.append(int(self.bus.data)) + elif pkt_receiving and (self.bus.valid == 0): # packet ended + pkt = Packet(received_data) + self._recv(pkt) + pkt_receiving = False + received_data = [] + +# simple clock generator +@cocotb.coroutine +def clock_gen(signal, period=10000): + while True: + signal.value = 0 + yield Timer(period/2) + signal.value = 1 + yield Timer(period/2) + +@cocotb.test() +async def pkt_switch_test(dut): + """ PKT_SWITCH Test """ + + log = cocotb.logging.getLogger("cocotb.test") # logger instance + await cocotb.start(clock_gen(dut.clk, period=100)) # start clock running + + # reset & init + dut.rst_n.value = 1 + dut.datain_data.value = 0 + dut.datain_valid.value = 0 + dut.ctrl_addr.value = 0 + dut.ctrl_data.value = 0 + dut.ctrl_wr.value = 0 + + await Timer(1000) + dut.rst_n.value = 0 + await Timer(1000) + dut.rst_n.value = 1 + + # procedure of writing configuration registers + @cocotb.coroutine + def write_config(addr, data): + for [a, d] in zip(addr, data): + dut.ctrl_addr.value = a + dut.ctrl_data.value = d + dut.ctrl_wr.value = 1 + yield RisingEdge(dut.clk) + dut.ctrl_wr.value = 0 + + enable_transmit_both = lambda: write_config([0], [4]) + disable_filtering = lambda: write_config([0], [0]) + + @cocotb.coroutine + def enable_addr_filtering(addr, mask): + yield write_config([0, 2, 3], [1, addr, mask]) + + @cocotb.coroutine + def enable_len_filtering(low_limit, up_limit): + yield write_config([0, 4, 5], [2, low_limit, up_limit]) + + driver = PacketIFDriver(dut, name="datain", clock=dut.clk) + monitor0 = PacketIFMonitor(dut, name="dataout0", clock=dut.clk) + monitor1 = PacketIFMonitor(dut, name="dataout1", clock=dut.clk) + + expected_data0 = [] # queue of expeced packet at interface 0 + expected_data1 = [] # queue of expeced packet at interface 1 + + + def scoreboarding(pkt, queue_expected): + assert pkt.addr == queue_expected[0].addr + assert pkt.len == queue_expected[0].len + assert pkt.payload == queue_expected[0].payload + queue_expected.pop() + + monitor0.add_callback(lambda _: scoreboarding(_, expected_data0)) + monitor1.add_callback(lambda _: scoreboarding(_, expected_data1)) + monitor0.add_callback(lambda _: log.info("Receiving packet on interface 0 (packet not filtered)")) + monitor1.add_callback(lambda _: log.info("Receiving packet on interface 1 (packet filtered)")) + + # functional coverage - check received packet + + @CoverPoint( + "top.packet_length", + xf = lambda pkt, event, addr, mask, ll, ul: pkt.len, # packet length + bins = list(range(3,32)) # may be 3 ... 31 bytes + ) + @CoverPoint("top.event", vname="event", bins = ["DIS", "TB", "AF", "LF"]) + @CoverPoint( + "top.filt_addr", + xf = lambda pkt, event, addr, mask, ll, ul: # filtering based on particular bits in header + (addr & mask & 0x0F) if event == "AF" else None, # check only if event is "address filtering" + bins = list(range(16)), # check only 4 LSBs if all options tested + ) + @CoverPoint( + "top.filt_len_eq", + xf = lambda pkt, event, addr, mask, ll, ul: ll == ul, # filtering of a single packet length + bins = [True, False] + ) + @CoverPoint( + "top.filt_len_ll", + vname = "ll", # lower limit of packet length + bins = list(range(3,32)) # 3 ... 31 + ) + @CoverPoint( + "top.filt_len_ul", + vname = "ul", # upper limit of packet length + bins = list(range(3,32)) # 3 ... 31 + ) + @CoverCross( + "top.filt_len_ll_x_packet_length", + items = ["top.packet_length", "top.filt_len_ll"] + ) + @CoverCross( + "top.filt_len_ul_x_packet_length", + items = ["top.packet_length", "top.filt_len_ul"] + ) + def log_sequence(pkt, event, addr, mask, ll, ul): + log.info("Processing packet:") + log.info(" ADDRESS: %X", pkt.addr) + log.info(" LENGTH: %d", pkt.len) + log.info(" PAYLOAD: " + str(pkt.payload)) + if event == "DIS": + log.info("Filtering disabled") + elif event == "TB": + log.info("Transmit on both interfaces") + elif event == "AF": + log.info("Address filtering, address: %02X, mask: %02X", addr, mask) + elif event == "LF": + log.info("Length filtering, lower limit: %d, upper limit: %d", ll, ul) + + # main loop + for _ in range(1000): # is that enough repetitions to ensure coverage goal? Check out! + + event = np.random.choice(["DIS", "TB", "AF", "LF"]) + # DIS - disable filtering : expect all packets on interface 0 + # TB - transmit bot : expect all packets on interface 0 and 1 + # AF - address filtering : expect filtered packets on interface 1, others on 0 + # LF - length filtering : expect filtered packets on interface 1, others on 0 + + # randomize test data + pkt = Packet(); + pkt.randomize() + addr = np.random.randint(256) # 0x00 .. 0xFF + mask = np.random.randint(256) # 0x00 .. 0xFF + low_limit = np.random.randint(3,32) # 3 ... 31 + up_limit = np.random.randint(low_limit,32) # low_limit ... 31 + + # expect the packet on the particular interface + if event == "DIS": + await disable_filtering() + expected_data0.append(pkt) + elif event == "TB": + await enable_transmit_both() + expected_data0.append(pkt) + expected_data1.append(pkt) + elif event == "AF": + await enable_addr_filtering(addr, mask) + if ((pkt.addr & mask) == (addr & mask)): + expected_data1.append(pkt) + else: + expected_data0.append(pkt) + elif event == "LF": + await enable_len_filtering(low_limit, up_limit) + if (low_limit <= pkt.len <= up_limit): + expected_data1.append(pkt) + else: + expected_data0.append(pkt) + + # wait DUT + await driver.send(pkt) + await RisingEdge(dut.clk) + await RisingEdge(dut.clk) + + # LOG the action + log_sequence(pkt, event, addr, mask, low_limit, up_limit) + + # print coverage report + coverage_db.report_coverage(log.info, bins=False) + # export + coverage_db.export_to_xml(filename="coverage_pkt_switch.xml") + coverage_db.export_to_yaml(filename="coverage_pkt_switch.yml") -- cgit v1.2.3