Compare commits

...

3 Commits

Author SHA1 Message Date
725d334e2f [API-PY] Implemented first version of driver 2026-05-17 23:15:28 +01:00
e30f5f7042 [API-PY] Added driver exceptions 2026-05-17 23:15:03 +01:00
fab54ea67e [API-PY] Added variable hardware magic 2026-05-17 23:14:42 +01:00
4 changed files with 414 additions and 7 deletions

View File

@ -3,8 +3,10 @@ W.H.S.P.A.H is an entirely self-hosted,
local and free way to interact with CaiXianlin shock collars.
"""
from .driver import Transmitter
from .driver import MODES
from . import exceptions
from . import utils
__all__ = ['exceptions', 'utils']
__all__ = ['Transmitter', 'MODES', 'exceptions', 'utils']
__version__ = '1.0.0'

View File

@ -0,0 +1,336 @@
"""
All device related parts of the WHSPAH API.
"""
from threading import Thread
from threading import Lock
from enum import Enum
import logging
import select
import struct
import time
import os
import serial
import whspah.exceptions as exceptions
import whspah.utils as utils
CONNECTION_READ_TIMEOUT = 2.5
TX_PING_INTERVAL = 1
RX_PING_TIMEOUT = 5
class MODES(Enum):
"""
The different commands or "modes" of the shockers.
"""
SHOCK = 1
VIBRATE = 2
BEEP = 3
LIGHT = 4
class STATES(Enum):
"""
The different states of the transmitter object.
"""
DISCONNECTED = 1
CONNECTING = 2
READY = 3
def calculateChecksum(packet: bytes) -> int:
"""
Returns the checksum for the provided packet.
"""
checksum = 0
for b in packet:
checksum ^= b
checksum ^= 0x55
return checksum
class Transmitter:
"""
Initialises a connection to a physical WHSPAH device.
Args:
port (os.PathLike): The serial port to connect to. If this is default,
the WHSPAH API will attempt to search the system for a WHSPAH device.
If this is None, no connection attempt will be made until changePort()
is manually called.
exceptionOnFiledTX (bool): If True, TransmitError will get raised if transmitting to the
device for any reason, otherwise commands will just return
False on error.
baudrate (int): What baudrate to use when connecting to the hardware.
This is only needed if you're messing with custom firmware, in all other
situations, this should be left as default.
Raises:
NoWHSPAHDeviceFound: Raised if the specified port is not a valid WHSPAH device,
or, if port is None, when no WHSPAH devies were found.
"""
def __init__(self, port: os.PathLike=..., *,
exceptionOnFiledTX: bool=True, baudrate: int=9600):
# A default logger, this gets overwritten by changePort() later,
# this is just to keep typehints working mostly.
self._log = logging.getLogger('WHSPAH-API.???')
self._baud = baudrate
self._ser: serial.Serial = None
self._serWriteLock = Lock()
self._port = port
self.exceptionOnFiledTX = exceptionOnFiledTX
self._runConnThread = False
self._connThread = Thread(target=self._connThreadFunc)
self._stateCallbacks = []
self._state = STATES.DISCONNECTED
if self._port is ...:
ports = utils.scanForValidPorts(baudrate=self._baud)
if len(ports) == 0:
raise exceptions.NoWHSPAHDeviceFound()
self._port = ports[0]
if self._port is not None:
self.changePort(self._port, baudrate)
def changePort(self, port: os.PathLike, timeout: float=5.0, *, baudrate: int=9600):
"""
Changes the port this device object is connected to.
Automatically closes any existing connection, then re-connects to the new port.
Args:
port (os.PathLike): The new port to connect to.
timeout (float): The maximum amount of time to wait for a successful connection.
baudrate (int): What baudrate to use when connecting to the hardware.
This is only needed if you're messing with custom firmware, in all other
situations, this should be left as default.
"""
self.close()
self._port = port
self._baud = baudrate
self._log = logging.getLogger('WHSPAH-API.'+self._port.split('/')[-1])
self._runConnThread = True
self._connThread.start()
st = time.perf_counter()
while (not self.ready) and time.perf_counter() - st < timeout:
time.sleep(0.5)
if time.perf_counter() - st > timeout:
self.close()
raise TimeoutError(f'Could not connect to device in {timeout} seconds.')
def close(self):
"""
Closes the connection with the WHSPAH device.
"""
if not self._runConnThread:
return
self._log.info('Disconnecting...')
self._runConnThread = False
try:
self._connThread.join()
except RuntimeError:
pass
def addStateCallback(self, callback: callable):
"""
Adds a callback function for state changes.
All callbacks will be ran with the following positional arguments:
- Transmitter: The Transmitter object that called the callback.
- STATES: The new state of the transmitter.
"""
self._stateCallbacks.append(callback)
def _setState(self, newState: STATES):
"""
Updates the current state of the transmitter and runs all callbacks.
"""
self._state = newState
for callback in self._stateCallbacks:
Thread(target=callback, args=(self, newState)).start()
def _connThreadFunc(self):
"""
This gets ran as a thread to keep a persistant connection with the WHSPAH device.
"""
lastTXPing = 0
lastRXPing = 0
buffer = b''
while self._runConnThread:
if self._ser is None:
try:
# Connect to serial port
self._log.debug('Connecting...')
self._setState(STATES.CONNECTING)
self._ser = serial.Serial(self._port, self._baud,
timeout=CONNECTION_READ_TIMEOUT)
self._ser.flushInput()
# Initialise timeouts
select.select([self._ser], [], [], CONNECTION_READ_TIMEOUT)
st = time.perf_counter()
# Wait for WHSPAH message
while ((rx:=self._ser.read_until(b'\x00')) != utils.HARDWARE_MAGIC) and\
time.perf_counter() - st < CONNECTION_READ_TIMEOUT:
self._log.debug(rx)
time.sleep(0.1)
# If time timeout was exceeded
if time.perf_counter() - st >= CONNECTION_READ_TIMEOUT:
# Raise an error
raise exceptions.NotValidWHSPAHDevice(self._port)
# Send the connect command
self._sendPacket(b'\x01')
# Wait for connect acknowledgement
lastTXPing = time.perf_counter()
lastRXPing = time.perf_counter()
except (serial.serialutil.SerialException, exceptions.NotValidWHSPAHDevice,
TimeoutError, exceptions.TransmitError, AttributeError) as e:
self._log.warning('Could not connect to device: %s', e)
self._ser = None
self._setState(STATES.DISCONNECTED)
time.sleep(1)
else:
try:
if self._ser.in_waiting:
buffer += self._ser.read()
if len(buffer) >= 3 and buffer[-1] == 0:
cmdID = buffer[0]
receivedChecksum = buffer[-2]
expectedChecksum = calculateChecksum(buffer[:-2])
if cmdID == ord('/') and buffer != utils.HARDWARE_MAGIC:
message = buffer[1:-1].decode()
self._log.info('<- %s', message)
else:
if receivedChecksum == expectedChecksum:
# Connection acknowledgment
if cmdID == 1 and not self.ready:
self._setState(STATES.READY)
self._log.info('Connected!')
# Ping
elif cmdID == 2 and self.ready:
lastRXPing = time.perf_counter()
else:
self._log.warning('Received malformed packet from device: '+
'Checksum mismatch: %s expected, %s received.',
expectedChecksum, receivedChecksum)
buffer = b''
if time.perf_counter() - lastTXPing > CONNECTION_READ_TIMEOUT and\
not self.ready:
# Raise an error
self._log.debug('No acknowledgment received.')
raise TimeoutError()
# Send occasional ping
if time.perf_counter() - lastTXPing > TX_PING_INTERVAL and self.ready:
self._sendPacket(b'\x02')
lastTXPing = time.perf_counter()
# Check for timeout
if time.perf_counter() - lastRXPing > RX_PING_TIMEOUT:
raise TimeoutError('Device hasn\'t sent a ping in 5s.')
except (AttributeError, exceptions.TransmitError,
serial.serialutil.SerialException, serial.serialutil.PortNotOpenError,
TimeoutError, OSError) as e:
self._log.warning('An error occured while communicating with the '+
'WHSPAH device: %s', e)
self._log.warning('Disconnecting...')
self._setState(STATES.DISCONNECTED)
self._ser = None
time.sleep(1)
def transmit(self, transmitterID: int, channel: int, mode: MODES,
intensity: int=0, lucalEncoded: bool=False) -> bool:
"""
Transmits the given arguments to the WHSPAH device.
Returns:
bool: True if transmitted successfully, False otherwise.
Raises:
TransmitError: Raised if `exceptionOnFiledTX` is True, and the transmit fails.
InvalidTransmitterID: If the provided transmitter ID is invalid.
InvalidChannel: If the provided channel is invalid.
InvalidMode: If the provided mode is invalid.
InvalidIntensity: If the provided intensity is invalid.
"""
# TODO: Test with a non-lucal encoded shocker
if transmitterID < 126 or transmitterID > 32767 or not isinstance(transmitterID, int):
raise exceptions.InvalidTransmitterID(transmitterID)
if channel < 1 or channel > 3 or not isinstance(channel, int):
raise exceptions.InvalidChannel(channel)
if mode not in MODES:
raise exceptions.InvalidMode(mode)
if mode == MODES.BEEP and intensity != 0:
raise exceptions.InvalidIntensity(intensity, True)
if intensity < 0 or intensity > 99 or not isinstance(intensity, int):
raise exceptions.InvalidIntensity(intensity)
args = struct.pack('<HBBB?', transmitterID, channel-1, mode.value, intensity, lucalEncoded)
try:
self._sendPacket(b'\x10' + args)
return True
except exceptions.TransmitError as e:
if self.exceptionOnFiledTX:
raise e
return False
def _sendPacket(self, packet: bytearray | bytes):
"""
Sends a packet over the serial port, automatically appending the checksum.
Raises:
TransmitError: If the transmititon failed.
"""
packet = bytearray(packet)
checksum = calculateChecksum(packet)
packet.append(checksum)
packet.append(0x00)
try:
with self._serWriteLock:
self._ser.write(packet)
except (AttributeError, serial.serialutil.PortNotOpenError) as e:
self._ser = None
raise exceptions.TransmitError(self._port) from e
@property
def state(self):
"""
Returns the current state of the transmitter.
"""
return self._state
@property
def ready(self):
"""
Returns True if the transmitter is ready.
"""
return self._state == STATES.READY

View File

@ -2,11 +2,73 @@
Holds all WHSPAH exceptions.
"""
import os
class NoWHSPAHDeviceFound(OSError):
"""
Gets raised when whspah.Device() is called with no port,
Gets raised when whspah.Transmitter() is called with no port,
and no port could be automatically found.
"""
def __init__(self):
super().__init__('No WHSPAH device was found.')
class NotValidWHSPAHDevice(OSError):
"""
Gets raised if the specified transmitter port is not a WHSPAH device.
"""
def __init__(self, port: os.PathLike):
super().__init__(f'{port} is not a WHSPAH device.')
class TransmitError(OSError):
"""
Gets raised when transmitting to the device fails.
This typically happens when trying to communicate with an unplugged device.
"""
def __init__(self, port: os.PathLike):
super().__init__(f'Failed to transmit to {port}.')
# ----- Transmit argument errors -----
class InvalidTransmitterID(ValueError):
"""
Gets raised if the chosen transmitter ID is not within range.
"""
def __init__(self, transmitterID: int):
super().__init__(f'Transmitter ID {transmitterID} is not valid. '+
'Transmitter ID must be an int within the range 126 to 32767.')
class InvalidChannel(ValueError):
"""
Gets raised if the chosen channel is not within range.
"""
def __init__(self, channel: int):
super().__init__(f'Channel {channel} is not valid.'+
' Channel must be an int within the range 1 to 3.')
class InvalidMode(ValueError):
"""
Gets raised if the chosen mode is not valid.
"""
def __init__(self, mode: 'whspah.MODES'):
super().__init__(f'Mode {mode} is not valid.'+
' Mode must be either an int within the range 1 to 4,'+
' or a selection from `whspah.MODES`.')
class InvalidIntensity(ValueError):
"""
Gets raised if the chosen intensity is not within range.
"""
def __init__(self, intensity: int, shouldBeZero=False):
message = f'Intensity {intensity} is not valid.'
if shouldBeZero:
message += ' The mode specified requires the intensity to be 0.'
else:
message += ' Intensity must be an int within the range 0 to 99.'
super().__init__(message)

View File

@ -11,10 +11,16 @@ import io
import serial.tools.list_ports
import serial
# The hardware magic string.
# I couldn't really think of a better place to put this,
# so I figured utils is the closest we have to a globals.py file.
# I don't want to create a globals.py file for this one variable.
HARDWARE_MAGIC = b'/WHSPAH\x00'
def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout: float) -> None:
"""
Gets called as a thread by _sanForValidPorts().
Attempts to connect to the port specified, then waits for "WHSPAH\n".
Attempts to connect to the port specified, then waits for magic string.
If a successful port was found, it will be closed, then appended to
resultList.
"""
@ -25,12 +31,13 @@ def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout:
# Start a timer
st = time.perf_counter()
# Connect to the serial port
ser = serial.Serial(port, baud, timeout=0)
ser = serial.Serial(port, baud, timeout=timeout)
ser.flushInput()
# Select it to enforce timeouts
select.select([ser], [], [], timeout)
# Attempt to read "WHSPAH\n" from the port
while (ser.readline() != b'WHSPAH\n') and time.perf_counter() - st < timeout:
time.sleep(0.5)
# Attempt to read magic string from the port
while (ser.read_until(b'\x00') != HARDWARE_MAGIC) and time.perf_counter() - st < timeout:
time.sleep(0.1)
# If all of the above completed before the timeout, add the port to the result list
if time.perf_counter() - st < timeout: