diff options
Diffstat (limited to 'tb/gfx_shader_bind/testbench/axi.py')
| -rw-r--r-- | tb/gfx_shader_bind/testbench/axi.py | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/tb/gfx_shader_bind/testbench/axi.py b/tb/gfx_shader_bind/testbench/axi.py new file mode 100644 index 0000000..2f58531 --- /dev/null +++ b/tb/gfx_shader_bind/testbench/axi.py @@ -0,0 +1,190 @@ +import enum + +import cocotb +from cocotb.binary import BinaryValue +from cocotb.triggers import Lock, RisingEdge, ReadOnly + +from cocotb_bus.drivers import BusDriver + +class AXIBurst(enum.IntEnum): + FIXED = 0b00 + INCR = 0b01 + WRAP = 0b10 + + +class AXIxRESP(enum.IntEnum): + OKAY = 0b00 + EXOKAY = 0b01 + SLVERR = 0b10 + DECERR = 0b11 + + +class AXIProtocolError(Exception): + def __init__(self, message: str, xresp: AXIxRESP): + super().__init__(message) + self.xresp = xresp + + +class AXIReadBurstLengthMismatch(Exception): + pass + + +class AXI4Agent(BusDriver): + ''' + AXI4 Agent + + Monitors an internal memory and handles read and write requests. + ''' + _signals = [ + "ARREADY", "ARVALID", "ARADDR", # Read address channel + "ARLEN", "ARSIZE", "ARBURST", "ARPROT", + + "RREADY", "RVALID", "RDATA", "RLAST", # Read response channel + + "AWREADY", "AWADDR", "AWVALID", # Write address channel + "AWPROT", "AWSIZE", "AWBURST", "AWLEN", + + "WREADY", "WVALID", "WDATA", + + ] + + # Not currently supported by this driver + _optional_signals = [ + "WLAST", "WSTRB", + "BVALID", "BREADY", "BRESP", "RRESP", + "RCOUNT", "WCOUNT", "RACOUNT", "WACOUNT", + "ARLOCK", "AWLOCK", "ARCACHE", "AWCACHE", + "ARQOS", "AWQOS", "ARID", "AWID", + "BID", "RID", "WID" + ] + + def __init__(self, entity, name, clock, memory, callback=None, event=None, + big_endian=False, **kwargs): + + BusDriver.__init__(self, entity, name, clock, **kwargs) + self.clock = clock + + self.big_endian = big_endian + self.bus.ARREADY.setimmediatevalue(0) + self.bus.RVALID.setimmediatevalue(0) + self.bus.RLAST.setimmediatevalue(0) + self.bus.AWREADY.setimmediatevalue(0) + self._memory = memory + + self.write_address_busy = Lock("%s_wabusy" % name) + self.read_address_busy = Lock("%s_rabusy" % name) + self.write_data_busy = Lock("%s_wbusy" % name) + + cocotb.start_soon(self._read_data()) + cocotb.start_soon(self._write_data()) + + def _size_to_bytes_in_beat(self, AxSIZE): + if AxSIZE < 7: + return 2 ** AxSIZE + return None + + async def _write_data(self): + clock_re = RisingEdge(self.clock) + + while True: + while True: + self.bus.WREADY.value = 0 + await ReadOnly() + if self.bus.AWVALID.value: + self.bus.WREADY.value = 1 + break + await clock_re + + await ReadOnly() + _awaddr = int(self.bus.AWADDR) + _awlen = int(self.bus.AWLEN) + _awsize = int(self.bus.AWSIZE) + _awburst = int(self.bus.AWBURST) + _awprot = int(self.bus.AWPROT) + + burst_length = _awlen + 1 + bytes_in_beat = self._size_to_bytes_in_beat(_awsize) + + if __debug__: + self.log.debug( + "AWADDR %d\n" % _awaddr + + "AWLEN %d\n" % _awlen + + "AWSIZE %d\n" % _awsize + + "AWBURST %d\n" % _awburst + + "AWPROT %d\n" % _awprot + + "BURST_LENGTH %d\n" % burst_length + + "Bytes in beat %d\n" % bytes_in_beat) + + burst_count = burst_length + + await clock_re + + while True: + if self.bus.WVALID.value: + word = self.bus.WDATA.value + word.big_endian = self.big_endian + _burst_diff = burst_length - burst_count + _st = _awaddr + (_burst_diff * bytes_in_beat) # start + _end = _awaddr + ((_burst_diff + 1) * bytes_in_beat) # end + self._memory[_st:_end] = array.array('B', word.buff) + burst_count -= 1 + if burst_count == 0: + break + await clock_re + + async def _read_data(self): + clock_re = RisingEdge(self.clock) + + while True: + self.bus.ARREADY.value = 1 + while True: + await ReadOnly() + if self.bus.ARVALID.value: + break + await clock_re + + await ReadOnly() + _araddr = int(self.bus.ARADDR) + _arlen = int(self.bus.ARLEN) + _arsize = int(self.bus.ARSIZE) + _arburst = int(self.bus.ARBURST) + _arprot = int(self.bus.ARPROT) + + burst_length = _arlen + 1 + bytes_in_beat = self._size_to_bytes_in_beat(_arsize) + + word = BinaryValue(n_bits=bytes_in_beat*8, bigEndian=self.big_endian) + + if __debug__: + self.log.debug( + "ARADDR %d\n" % _araddr + + "ARLEN %d\n" % _arlen + + "ARSIZE %d\n" % _arsize + + "ARBURST %d\n" % _arburst + + "ARPROT %d\n" % _arprot + + "BURST_LENGTH %d\n" % burst_length + + "Bytes in beat %d\n" % bytes_in_beat) + + burst_count = burst_length + + await clock_re + self.bus.ARREADY.value = 0 + + self.bus.RVALID.value = 1 + while burst_count > 0: + _burst_diff = burst_length - burst_count + _st = _araddr + (_burst_diff * bytes_in_beat) + _end = _araddr + ((_burst_diff + 1) * bytes_in_beat) + word.buff = self._memory[_st:_end].tobytes() + self.bus.RDATA.value = word + self.bus.RLAST.value = int(burst_count == 1) + + await ReadOnly() + + if self.bus.RREADY.value: + burst_count -= 1 + + await clock_re + self.bus.RLAST.value = 0 + + self.bus.RVALID.value = 0 |
