|
1 | | -print('This is a placeholder!') |
| 1 | +from pynq import DefaultHierarchy, DefaultIP, allocate |
| 2 | +from pynq import Overlay |
| 3 | +from datetime import datetime |
| 4 | +import pynq.lib.dma |
| 5 | +import numpy as np |
| 6 | + |
| 7 | + |
| 8 | +class NeuralNetworkOverlay(Overlay): |
| 9 | + def __init__(self, bitfile_name, x_shape, y_shape, dtype=np.float32, dtbo=None, download=True, ignore_version=False, |
| 10 | + device=None): |
| 11 | + super().__init__(bitfile_name, dtbo=None, download=True, ignore_version=False, device=None) |
| 12 | + self.regin = self.myproject_axi_0.register_map.in_r.address |
| 13 | + self.regout = self.myproject_axi_0.register_map.out_r.address |
| 14 | + self.ctrl = self.myproject_axi_0.register_map.CTRL |
| 15 | + self.input_buffer = allocate(shape=x_shape, dtype=dtype) |
| 16 | + self.output_buffer = allocate(shape=y_shape, dtype=dtype) |
| 17 | + |
| 18 | + def _print_dt(self, timea, timeb, N): |
| 19 | + dt = (timeb - timea) |
| 20 | + dts = dt.seconds + dt.microseconds * 10 ** -6 |
| 21 | + rate = N / dts |
| 22 | + print("Classified {} samples in {} seconds ({} inferences / s)".format(N, dts, rate)) |
| 23 | + return dts, rate |
| 24 | + |
| 25 | + def predict(self, X, debug=False, profile=False, encode=None, decode=None): |
| 26 | + """ |
| 27 | + Obtain the predictions of the NN implemented in the FPGA. |
| 28 | + Parameters: |
| 29 | + - X : the input vector. Should be numpy ndarray. |
| 30 | + - dtype : the data type of the elements of the input/output vectors. |
| 31 | + Note: it should be set depending on the interface of the accelerator; if it uses 'float' |
| 32 | + types for the 'data' AXI-Stream field, 'np.float32' dtype is the correct one to use. |
| 33 | + Instead if it uses 'ap_fixed<A,B>', 'np.intA' is the correct one to use (note that A cannot |
| 34 | + any integer value, but it can assume {..., 8, 16, 32, ...} values. Check `numpy` |
| 35 | + doc for more info). |
| 36 | + In this case the encoding/decoding has to be computed by the PS. For example for |
| 37 | + 'ap_fixed<16,6>' type the following 2 functions are the correct one to use for encode/decode |
| 38 | + 'float' -> 'ap_fixed<16,6>': |
| 39 | + ``` |
| 40 | + def encode(xi): |
| 41 | + return np.int16(round(xi * 2**10)) # note 2**10 = 2**(A-B) |
| 42 | + def decode(yi): |
| 43 | + return yi * 2**-10 |
| 44 | + encode_v = np.vectorize(encode) # to apply them element-wise |
| 45 | + decode_v = np.vectorize(decode) |
| 46 | + ``` |
| 47 | + - profile : boolean. Set it to `True` to print the performance of the algorithm in term of `inference/s`. |
| 48 | + - encode/decode: function pointers. See `dtype` section for more information. |
| 49 | + - return: an output array based on `np.ndarray` with a shape equal to `y_shape` and a `dtype` equal to |
| 50 | + the namesake parameter. |
| 51 | + """ |
| 52 | + if profile: |
| 53 | + timea = datetime.now() |
| 54 | + if encode is not None: |
| 55 | + X = encode(X) |
| 56 | + self.input_buffer[:] = X |
| 57 | + self.myproject_axi_0.write(self.regin, self.input_buffer.physical_address) |
| 58 | + self.myproject_axi_0.write(self.regout, self.output_buffer.physical_address) |
| 59 | + self.myproject_axi_0.write(self.ctrl.AP_START, 0x1) |
| 60 | + if debug: |
| 61 | + print("Config OK") |
| 62 | + while not self.ctrl.AP_DONE: |
| 63 | + if debug: |
| 64 | + print("Polling...") |
| 65 | + if debug: |
| 66 | + print("Done OK") |
| 67 | + # result = self.output_buffer.copy() |
| 68 | + if decode is not None: |
| 69 | + self.output_buffer = decode(self.output_buffer) |
| 70 | + |
| 71 | + if profile: |
| 72 | + timeb = datetime.now() |
| 73 | + dts, rate = self._print_dt(timea, timeb, len(X)) |
| 74 | + return self.output_buffer, dts, rate |
| 75 | + else: |
| 76 | + return self.output_buffer |
0 commit comments