1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
import enum
import cocotb
from cocotb.binary import BinaryValue
from cocotb.triggers import Lock, RisingEdge, ReadOnly
from cocotb_bus.drivers import BusDriver
class AXIBurst(enum.IntEnum):
FIXED = 0b00
INCR = 0b01
WRAP = 0b10
class AXIxRESP(enum.IntEnum):
OKAY = 0b00
EXOKAY = 0b01
SLVERR = 0b10
DECERR = 0b11
class AXIProtocolError(Exception):
def __init__(self, message: str, xresp: AXIxRESP):
super().__init__(message)
self.xresp = xresp
class AXIReadBurstLengthMismatch(Exception):
pass
class AXI4Agent(BusDriver):
'''
AXI4 Agent
Monitors an internal memory and handles read and write requests.
'''
_signals = [
"ARREADY", "ARVALID", "ARADDR", # Read address channel
"ARLEN", "ARSIZE", "ARBURST", "ARPROT",
"RREADY", "RVALID", "RDATA", "RLAST", # Read response channel
"AWREADY", "AWADDR", "AWVALID", # Write address channel
"AWPROT", "AWSIZE", "AWBURST", "AWLEN",
"WREADY", "WVALID", "WDATA",
]
# Not currently supported by this driver
_optional_signals = [
"WLAST", "WSTRB",
"BVALID", "BREADY", "BRESP", "RRESP",
"RCOUNT", "WCOUNT", "RACOUNT", "WACOUNT",
"ARLOCK", "AWLOCK", "ARCACHE", "AWCACHE",
"ARQOS", "AWQOS", "ARID", "AWID",
"BID", "RID", "WID"
]
def __init__(self, entity, name, clock, memory, callback=None, event=None,
big_endian=False, **kwargs):
BusDriver.__init__(self, entity, name, clock, **kwargs)
self.clock = clock
self.big_endian = big_endian
self.bus.ARREADY.setimmediatevalue(0)
self.bus.RVALID.setimmediatevalue(0)
self.bus.RLAST.setimmediatevalue(0)
self.bus.AWREADY.setimmediatevalue(0)
self._memory = memory
self.write_address_busy = Lock("%s_wabusy" % name)
self.read_address_busy = Lock("%s_rabusy" % name)
self.write_data_busy = Lock("%s_wbusy" % name)
cocotb.start_soon(self._read_data())
cocotb.start_soon(self._write_data())
def _size_to_bytes_in_beat(self, AxSIZE):
if AxSIZE < 7:
return 2 ** AxSIZE
return None
async def _write_data(self):
clock_re = RisingEdge(self.clock)
while True:
while True:
self.bus.WREADY.value = 0
await ReadOnly()
if self.bus.AWVALID.value:
self.bus.WREADY.value = 1
break
await clock_re
await ReadOnly()
_awaddr = int(self.bus.AWADDR)
_awlen = int(self.bus.AWLEN)
_awsize = int(self.bus.AWSIZE)
_awburst = int(self.bus.AWBURST)
_awprot = int(self.bus.AWPROT)
burst_length = _awlen + 1
bytes_in_beat = self._size_to_bytes_in_beat(_awsize)
if __debug__:
self.log.debug(
"AWADDR %d\n" % _awaddr +
"AWLEN %d\n" % _awlen +
"AWSIZE %d\n" % _awsize +
"AWBURST %d\n" % _awburst +
"AWPROT %d\n" % _awprot +
"BURST_LENGTH %d\n" % burst_length +
"Bytes in beat %d\n" % bytes_in_beat)
burst_count = burst_length
await clock_re
while True:
if self.bus.WVALID.value:
word = self.bus.WDATA.value
word.big_endian = self.big_endian
_burst_diff = burst_length - burst_count
_st = _awaddr + (_burst_diff * bytes_in_beat) # start
_end = _awaddr + ((_burst_diff + 1) * bytes_in_beat) # end
self._memory[_st:_end] = array.array('B', word.buff)
burst_count -= 1
if burst_count == 0:
break
await clock_re
async def _read_data(self):
clock_re = RisingEdge(self.clock)
while True:
self.bus.ARREADY.value = 1
while True:
await ReadOnly()
if self.bus.ARVALID.value:
break
await clock_re
await ReadOnly()
_araddr = int(self.bus.ARADDR)
_arlen = int(self.bus.ARLEN)
_arsize = int(self.bus.ARSIZE)
_arburst = int(self.bus.ARBURST)
_arprot = int(self.bus.ARPROT)
burst_length = _arlen + 1
bytes_in_beat = self._size_to_bytes_in_beat(_arsize)
word = BinaryValue(n_bits=bytes_in_beat*8, bigEndian=self.big_endian)
if __debug__:
self.log.debug(
"ARADDR %d\n" % _araddr +
"ARLEN %d\n" % _arlen +
"ARSIZE %d\n" % _arsize +
"ARBURST %d\n" % _arburst +
"ARPROT %d\n" % _arprot +
"BURST_LENGTH %d\n" % burst_length +
"Bytes in beat %d\n" % bytes_in_beat)
burst_count = burst_length
await clock_re
self.bus.ARREADY.value = 0
self.bus.RVALID.value = 1
while burst_count > 0:
_burst_diff = burst_length - burst_count
_st = _araddr + (_burst_diff * bytes_in_beat)
_end = _araddr + ((_burst_diff + 1) * bytes_in_beat)
word.buff = self._memory[_st:_end].tobytes()
self.bus.RDATA.value = word
self.bus.RLAST.value = int(burst_count == 1)
await ReadOnly()
if self.bus.RREADY.value:
burst_count -= 1
await clock_re
self.bus.RLAST.value = 0
self.bus.RVALID.value = 0
|