Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/manual/src/applets/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ Applet index
program/index
debug/index
control/index
sensor/index
internal/index
1 change: 1 addition & 0 deletions docs/manual/src/applets/interface/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ I/O interfaces
jtag_xvc
swd_probe
probe_rs
ps2_host
7 changes: 7 additions & 0 deletions docs/manual/src/applets/interface/ps2_host.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
``ps2-host``
============

.. _applet.interface.ps2_host:

.. autoprogram:: glasgow.applet.interface.ps2_host:PS2HostApplet._get_argparser_for_sphinx("ps2-host")
:prog: glasgow run ps2-host
11 changes: 11 additions & 0 deletions docs/manual/src/applets/sensor/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.. _applet.sensor:

Sensor data collection
======================

.. automodule:: glasgow.applet.sensor

.. toctree::
:maxdepth: 2

mouse_ps2
7 changes: 7 additions & 0 deletions docs/manual/src/applets/sensor/mouse_ps2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
``sensor-mouse-ps2``
====================

.. _applet.sensor.mouse_ps2:

.. autoprogram:: glasgow.applet.sensor.mouse_ps2:SensorMousePS2Applet._get_argparser_for_sphinx("sensor-mouse-ps2")
:prog: glasgow run sensor-mouse-ps2
164 changes: 86 additions & 78 deletions software/glasgow/applet/interface/ps2_host/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# The Intel 8042 controller fixes, or rather works around, this problem by completely encapsulating
# the handling of PS/2. In fact many (if not most) PS/2 commands that are sent to i8042 result in
# no PS/2 traffic at all, with their response being either synthesized on the fly on the i8042, or,
# even worse, them being an in-band commands to i8042 itself. This lead mice developers to a quite
# even worse, them being in-band commands to i8042 itself. This lead mice developers to a quite
# horrifying response: any advanced settings are conveyed to the mouse and back by abusing
# the sensitivity adjustment commands as 2-bit at a time a communication channel; keyboards use
# similar hacks.
Expand All @@ -61,11 +61,15 @@

import logging
import asyncio

from amaranth import *
from amaranth.lib import data, io
from amaranth.lib.cdc import FFSynchronizer
from amaranth.lib import data, io, wiring, stream, cdc
from amaranth.lib.wiring import In, Out

from glasgow.applet import GlasgowAppletV2, GlasgowAppletError

from ... import *

__all__ = ["PS2HostComponent", "PS2HostInterface", "PS2HostError"]


_frame_layout = data.StructLayout({
Expand All @@ -91,16 +95,18 @@ def _prepare_frame(frame, data):
]


class PS2Bus(Elaboratable):
class PS2Bus(wiring.Component):
falling : Out(1)
rising : Out(1)
clock_i : Out(1, init=1)
clock_o : In(1, init=1)
data_i : Out(1, init=1)
data_o : In(1, init=1)

def __init__(self, ports):
self.ports = ports

self.falling = Signal()
self.rising = Signal()
self.clock_i = Signal(init=1)
self.clock_o = Signal(init=1)
self.data_i = Signal(init=1)
self.data_o = Signal(init=1)
super().__init__()

def elaborate(self, platform):
m = Module()
Expand All @@ -115,8 +121,8 @@ def elaborate(self, platform):
]

m.submodules += [
FFSynchronizer(clock_buffer.i, self.clock_i, init=1),
FFSynchronizer(data_buffer.i, self.data_i, init=1),
cdc.FFSynchronizer(clock_buffer.i, self.clock_i, init=1),
cdc.FFSynchronizer(data_buffer.i, self.data_i, init=1),
]

clock_s = Signal(init=1)
Expand All @@ -135,19 +141,21 @@ def elaborate(self, platform):
return m


class PS2HostController(Elaboratable):
def __init__(self, bus):
self.bus = bus
class PS2HostController(wiring.Component):
en : In(1) # whether communication should be allowed or inhibited
stb : Out(1) # strobed for 1 cycle after each stop bit to indicate an update

self.en = Signal() # whether communication should be allowed or inhibited
self.stb = Signal() # strobed for 1 cycle after each stop bit to indicate an update
i_valid : In(1) # whether i_data is to be transmitted
i_data : In(8) # data byte written to device
i_ack : Out(1) # whether the device acked the data ("line control" bit)

self.i_valid = Signal() # whether i_data is to be transmitted
self.i_data = Signal(8) # data byte written to device
self.i_ack = Signal() # whether the device acked the data ("line control" bit)
o_valid : Out(1) # whether o_data has been received correctly
o_data : Out(8) # data byte read from device

def __init__(self, bus):
self.bus = bus

self.o_valid = Signal() # whether o_data has been received correctly
self.o_data = Signal(8) # data byte read from device
super().__init__()

def elaborate(self, platform):
m = Module()
Expand Down Expand Up @@ -224,32 +232,35 @@ def elaborate(self, platform):
return m


class PS2HostSubtarget(Elaboratable):
def __init__(self, ports, in_fifo, out_fifo, inhibit_cyc):
self.ports = ports
self.in_fifo = in_fifo
self.out_fifo = out_fifo
self.inhibit_cyc = inhibit_cyc
class PS2HostComponent(wiring.Component):
i_stream: In(stream.Signature(8))
o_stream: Out(stream.Signature(8))

def __init__(self, ports, *, inhibit_cyc):
self._ports = ports
self._inhibit_cyc = inhibit_cyc

super().__init__()

def elaborate(self, platform):
m = Module()

m.submodules.bus = bus = PS2Bus(self.ports)
m.submodules.bus = bus = PS2Bus(self._ports)
m.submodules.ctrl = ctrl = PS2HostController(bus)

timer = Signal(range(self.inhibit_cyc))
timer = Signal(range(self._inhibit_cyc))
count = Signal(7)
error = Signal()

with m.FSM():
with m.State("RECV-COMMAND"):
m.d.comb += self.out_fifo.r_en.eq(1)
with m.If(self.out_fifo.r_rdy):
m.d.comb += self.i_stream.ready.eq(1)
with m.If(self.i_stream.valid):
m.d.sync += [
count.eq(self.out_fifo.r_data[:7]),
count.eq(self.i_stream.payload[:7]),
error.eq(0),
]
with m.If(self.out_fifo.r_data[7]):
with m.If(self.i_stream.payload[7]):
m.d.sync += ctrl.i_valid.eq(1)
m.next = "WRITE-BYTE"
with m.Else():
Expand All @@ -260,10 +271,10 @@ def elaborate(self, platform):
m.next = "READ-WAIT"

with m.State("WRITE-BYTE"):
m.d.comb += self.out_fifo.r_en.eq(1)
with m.If(self.out_fifo.r_rdy):
m.d.comb += self.i_stream.ready.eq(1)
with m.If(self.i_stream.valid):
m.d.sync += [
ctrl.i_data.eq(self.out_fifo.r_data),
ctrl.i_data.eq(self.i_stream.payload),
ctrl.en.eq(1),
]
m.next = "WRITE-WAIT"
Expand All @@ -275,8 +286,8 @@ def elaborate(self, platform):
# You better be reading from that FIFO. (But the FIFO is 4× larger than the largest
# possible command response, so it's never a race.)
m.d.comb += [
self.in_fifo.w_en.eq(1),
self.in_fifo.w_data.eq(ctrl.i_ack),
self.o_stream.valid.eq(1),
self.o_stream.payload.eq(ctrl.i_ack),
]
with m.If((count == 0) | ~ctrl.i_ack):
m.next = "INHIBIT"
Expand All @@ -290,8 +301,8 @@ def elaborate(self, platform):
m.next = "READ-BYTE"
with m.State("READ-BYTE"):
m.d.comb += [
self.in_fifo.w_en.eq(1),
self.in_fifo.w_data.eq(ctrl.o_data),
self.o_stream.valid.eq(1),
self.o_stream.payload.eq(ctrl.o_data),
]
with m.If(~ctrl.o_valid & (error == 0)):
m.d.sync += error.eq(count)
Expand All @@ -302,8 +313,8 @@ def elaborate(self, platform):

with m.State("SEND-ERROR"):
m.d.comb += [
self.in_fifo.w_en.eq(1),
self.in_fifo.w_data.eq(error),
self.o_stream.valid.eq(1),
self.o_stream.payload.eq(error),
]
m.next = "INHIBIT"

Expand All @@ -312,7 +323,7 @@ def elaborate(self, platform):
# sending two back-to-back commands (or a command and a read, etc).
m.d.sync += [
ctrl.en.eq(0),
timer.eq(self.inhibit_cyc),
timer.eq(self._inhibit_cyc - 1),
]
m.next = "INHIBIT-WAIT"

Expand All @@ -330,10 +341,17 @@ class PS2HostError(GlasgowAppletError):


class PS2HostInterface:
def __init__(self, interface, logger):
self._lower = interface
def __init__(self, logger, assembly, *, clock, data, reset):
self._logger = logger
self._level = logging.DEBUG if self._logger.name == __name__ else logging.TRACE

inhibit_cyc = round(60e-6 / assembly.sys_clk_period)

ports = assembly.add_port_group(clock=clock, data=data, reset=reset)
assembly.use_pulls({clock: "high", data: "high"})
component = assembly.add_submodule(PS2HostComponent(ports, inhibit_cyc=inhibit_cyc))
self._pipe = assembly.add_inout_pipe(component.o_stream, component.i_stream)

self._streaming = False

def _log(self, message, *args):
Expand All @@ -342,13 +360,14 @@ def _log(self, message, *args):
async def send_command(self, cmd, ret=0):
assert ret < 0x7f
assert not self._streaming
await self._lower.write([0x80|(ret + 1), cmd])
line_ack, = await self._lower.read(1)
await self._pipe.send([0x80|(ret + 1), cmd])
await self._pipe.flush()
line_ack, = await self._pipe.recv(1)
if not line_ack:
self._log("cmd=%02x nak", cmd)
raise PS2HostError("peripheral did not acknowledge command {:#04x}"
.format(cmd))
cmd_ack, *result, error = await self._lower.read(1 + ret + 1)
cmd_ack, *result, error = await self._pipe.recv(1 + ret + 1)
result = bytes(result)
self._log("cmd=%02x ack=%02x ret=<%s>", cmd, cmd_ack, result.hex())
if error > 0:
Expand All @@ -373,8 +392,9 @@ async def send_command(self, cmd, ret=0):
async def recv_packet(self, ret):
assert ret < 0x7f
assert not self._streaming
await self._lower.write([ret])
*result, error = await self._lower.read(ret + 1)
await self._pipe.send([ret])
await self._pipe.flush()
*result, error = await self._pipe.recv(ret + 1)
result = bytes(result)
self._log("ret=<%s>", result.hex())
if error > 0:
Expand All @@ -385,12 +405,13 @@ async def recv_packet(self, ret):
async def stream(self, callback):
assert not self._streaming
self._streaming = True
await self._lower.write([0x7f])
await self._pipe.send([0x7f])
await self._pipe.flush()
while True:
await callback(*await self._lower.read(1))
await callback(*await self._pipe.recv(1))


class PS2HostApplet(GlasgowApplet):
class PS2HostApplet(GlasgowAppletV2):
logger = logging.getLogger(__name__)
help = "communicate with IBM PS/2 peripherals"
description = """
Expand All @@ -404,44 +425,31 @@ class PS2HostApplet(GlasgowApplet):

@classmethod
def add_build_arguments(cls, parser, access):
super().add_build_arguments(parser, access)

access.add_voltage_argument(parser)
access.add_pins_argument(parser, "clock", default=True)
access.add_pins_argument(parser, "data", default=True)
access.add_pins_argument(parser, "reset")

def build(self, target, args):
self.mux_interface = iface = target.multiplexer.claim_interface(self, args)
subtarget = iface.add_subtarget(PS2HostSubtarget(
ports =iface.get_port_group(
clock = args.clock,
data = args.data,
reset = args.reset
),
in_fifo=iface.get_in_fifo(),
out_fifo=iface.get_out_fifo(),
inhibit_cyc=int(target.sys_clk_freq * 60e-6),
))

async def run(self, device, args):
iface = await device.demultiplexer.claim_interface(self, self.mux_interface, args,
pull_high={args.clock, args.data})
return PS2HostInterface(iface, self.logger)
def build(self, args):
with self.assembly.add_applet(self):
self.assembly.use_voltage(args.voltage)
self.ps2_iface = PS2HostInterface(self.logger, self.assembly,
clock=args.clock, data=args.data, reset=args.reset)

@classmethod
def add_interact_arguments(cls, parser):
def add_run_arguments(cls, parser):
def hex_bytes(arg):
return bytes.fromhex(arg)
parser.add_argument(
"init", metavar="INIT", type=hex_bytes, nargs="?", default=b"",
help="send each byte from INIT as an initialization command")

async def interact(self, device, args, iface):
async def run(self, args):
for init_byte in args.init:
await iface.send_command(init_byte)
await self.ps2_iface.send_command(init_byte)
async def print_byte(byte):
print(f"{byte:02x}", end=" ", flush=True)
await iface.stream(print_byte)
await self.ps2_iface.stream(print_byte)

@classmethod
def tests(cls):
Expand Down
4 changes: 2 additions & 2 deletions software/glasgow/applet/interface/ps2_host/test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from ... import *
from glasgow.applet import GlasgowAppletV2TestCase, synthesis_test
from . import PS2HostApplet


class PS2HostAppletTestCase(GlasgowAppletTestCase, applet=PS2HostApplet):
class PS2HostAppletTestCase(GlasgowAppletV2TestCase, applet=PS2HostApplet):
@synthesis_test
def test_build(self):
self.assertBuilds()
Loading
Loading