Compare commits

...

13 Commits

12 changed files with 571 additions and 11 deletions

View File

@ -15,8 +15,10 @@ Each packet is constructed as follows:
- `uint8_t`: A basic XOR checksum of all the previous bytes, XORed with `0x55`.
- `null`: A null terminator (0x00).
The firmware can also send logs by sending the initial character `/`, followed by ASCII text, then a null terminator (`0x00`)
### Commands:
- Connect (`0x01`): Used to get the firmware into its ready state, and out of it's initial mode of repeating `WHSPAH\n`. Responds with `0x01`.
- Connect (`0x01`): Used to get the firmware into its ready state, and out of it's initial mode of repeating `/WHSPAH\x00`. Responds with `0x01`.
- Ping (`0x02`): Needs to be send at least once every 500ms, otherwise the firmware will reset into its initial mode. Responds with `0x02`.
- Transmit (`0x10`): Transmits a CaiXianlin packet. The command's arguments are as follows:
- `uint16_t`: Transmitter ID

View File

@ -1,5 +1,6 @@
#include "./CommandProcessor.h"
#include "./Commands.h"
#include "./Globals.h"
// Gets set to true once the command processor has connected to a host
bool CMDProcessor::connected = false;
@ -47,22 +48,25 @@ namespace {
}
checksum ^= 0x55;
Serial.write(checksum);
Serial.write(0x00);
Serial.write((char) 0x00);
}
}
void CMDProcessor::process() {
// Command read timeout
if (phase != 0 && millis() - lastRX > CMD_PROCESSOR_RX_TIMEOUT_MS) {
sendLog("Error: Command read timed out.");
reset();
return;
}
if (CMDProcessor::connected && millis() - lastPing > CMD_PROCESSOR_PING_TIMEOUT_MS) {
sendLog("Error: Ping timed out.");
CMDProcessor::connected = false;
reset();
} else if (!CMDProcessor::connected && millis() - lastPing > 250) {
Serial.print("WHSPAH\n");
Serial.print("/WHSPAH");
Serial.print((char) 0x00);
lastPing = millis();
}
@ -106,6 +110,7 @@ void CMDProcessor::process() {
if (rxChecksum == calcChecksum) {
phase++;
} else {
sendLog("Malformed packet: Checksum mismatch.");
resetAfterNull();
}
break;
@ -113,6 +118,7 @@ void CMDProcessor::process() {
if (Serial.read() == 0x00) {
phase++;
} else {
sendLog("Malformed packet: No null terminator found.");
// If a null terminator is not present in the expected place,
// set the command ID to 0xFF (no op) and remain in phase 3.
// This keeps the firmware waiting for a valid null terminator

View File

@ -2,7 +2,7 @@
#define CommandProcessor_h
#define CMD_PROCESSOR_RX_TIMEOUT_MS 200
#define CMD_PROCESSOR_PING_TIMEOUT_MS 500
#define CMD_PROCESSOR_PING_TIMEOUT_MS 2000
#include <Arduino.h>

View File

@ -2,3 +2,9 @@
#include "./CaiXianlin.h"
CaiXianlin::Transmitter* RFTransmitter = nullptr;
void sendLog(char* message) {
Serial.write("/");
Serial.write(message);
Serial.write((char) 0x00);
}

View File

@ -4,5 +4,6 @@
#include "./CaiXianlin.h"
extern CaiXianlin::Transmitter* RFTransmitter;
void sendLog(char* message);
#endif // Header define

View File

@ -3,8 +3,11 @@ W.H.S.P.A.H is an entirely self-hosted,
local and free way to interact with CaiXianlin shock collars.
"""
from .driver import Transmitter
from .constants import MODES
from .shocker import Shocker
from . import exceptions
from . import utils
__all__ = ['exceptions', 'utils']
__all__ = ['Transmitter', 'MODES', 'Shocker', 'exceptions', 'utils']
__version__ = '1.0.0'

View File

@ -0,0 +1,27 @@
"""
Holds constants used throughout WHSPAH.
"""
from enum import Enum
# The hardware magic string.
HARDWARE_MAGIC = b'/WHSPAH\x00'
class MODES(Enum):
"""
The different commands or "modes" of the shockers.
"""
SHOCK = 1
VIBRATE = 2
BEEP = 3
LIGHT = 4
class STATES(Enum):
"""
The different states of the transmitter object.
"""
DISCONNECTED = 1
CONNECTING = 2
READY = 3

View File

@ -0,0 +1,309 @@
"""
All device related parts of the WHSPAH API.
"""
from threading import Thread
from threading import Lock
from enum import Enum
import logging
import select
import struct
import time
import os
import serial
from whspah.constants import MODES, STATES, HARDWARE_MAGIC
import whspah.exceptions as exceptions
from whspah.metaClasses import MetaTransmitter
from whspah.shocker import Shocker
import whspah.utils as utils
CONNECTION_READ_TIMEOUT = 2.5
TX_PING_INTERVAL = 1
RX_PING_TIMEOUT = 5
def calculateChecksum(packet: bytes) -> int:
"""
Returns the checksum for the provided packet.
"""
checksum = 0
for b in packet:
checksum ^= b
checksum ^= 0x55
return checksum
class Transmitter(MetaTransmitter):
"""
Initialises a connection to a physical WHSPAH device.
Args:
port (os.PathLike): The serial port to connect to. If this is default,
the WHSPAH API will attempt to search the system for a WHSPAH device.
If this is None, no connection attempt will be made until changePort()
is manually called.
exceptionOnFiledTX (bool): If True, TransmitError will get raised if transmitting to the
device for any reason, otherwise commands will just return
False on error.
baudrate (int): What baudrate to use when connecting to the hardware.
This is only needed if you're messing with custom firmware, in all other
situations, this should be left as default.
Raises:
NoWHSPAHDeviceFound: Raised if the specified port is not a valid WHSPAH device,
or, if port is None, when no WHSPAH devies were found.
"""
def __init__(self, port: os.PathLike=..., *,
exceptionOnFiledTX: bool=True, baudrate: int=9600):
super().__init__()
# A default logger, this gets overwritten by changePort() later,
# this is just to keep typehints working mostly.
self._log = logging.getLogger('WHSPAH-API.???')
self._baud = baudrate
self._ser: serial.Serial = None
self._serWriteLock = Lock()
self._port = port
self.exceptionOnFiledTX = exceptionOnFiledTX
self._runConnThread = False
self._connThread = Thread(target=self._connThreadFunc)
if self._port is ...:
ports = utils.scanForValidPorts(baudrate=self._baud)
if len(ports) == 0:
raise exceptions.NoWHSPAHDeviceFound()
self._port = ports[0]
if self._port is not None:
self.changePort(self._port, baudrate)
def changePort(self, port: os.PathLike, timeout: float=5.0, *, baudrate: int=9600):
"""
Changes the port this device object is connected to.
Automatically closes any existing connection, then re-connects to the new port.
Args:
port (os.PathLike): The new port to connect to.
timeout (float): The maximum amount of time to wait for a successful connection.
baudrate (int): What baudrate to use when connecting to the hardware.
This is only needed if you're messing with custom firmware, in all other
situations, this should be left as default.
"""
self.close()
self._port = port
self._baud = baudrate
self._log = logging.getLogger('WHSPAH-API.'+self._port.split('/')[-1])
self._runConnThread = True
self._connThread.start()
st = time.perf_counter()
while (not self.ready) and time.perf_counter() - st < timeout:
time.sleep(0.5)
if time.perf_counter() - st > timeout:
self.close()
raise TimeoutError(f'Could not connect to device in {timeout} seconds.')
def close(self):
"""
Closes the connection with the WHSPAH device.
"""
if not self._runConnThread:
return
self._log.info('Disconnecting...')
self._runConnThread = False
try:
self._connThread.join()
except RuntimeError:
pass
def _connThreadFunc(self):
"""
This gets ran as a thread to keep a persistant connection with the WHSPAH device.
"""
lastTXPing = 0
lastRXPing = 0
buffer = b''
while self._runConnThread:
if self._ser is None:
try:
# Connect to serial port
self._log.debug('Connecting...')
self._setState(STATES.CONNECTING)
self._ser = serial.Serial(self._port, self._baud,
timeout=CONNECTION_READ_TIMEOUT)
self._ser.flushInput()
# Initialise timeouts
select.select([self._ser], [], [], CONNECTION_READ_TIMEOUT)
st = time.perf_counter()
# Wait for WHSPAH message
while (self._ser.read_until(b'\x00') != HARDWARE_MAGIC) and\
time.perf_counter() - st < CONNECTION_READ_TIMEOUT:
time.sleep(0.1)
# If time timeout was exceeded
if time.perf_counter() - st >= CONNECTION_READ_TIMEOUT:
# Raise an error
raise exceptions.NotValidWHSPAHDevice(self._port)
# Send the connect command
self._sendPacket(b'\x01')
# Wait for connect acknowledgement
lastTXPing = time.perf_counter()
lastRXPing = time.perf_counter()
except (serial.serialutil.SerialException, exceptions.NotValidWHSPAHDevice,
TimeoutError, exceptions.TransmitError, AttributeError) as e:
self._log.warning('Could not connect to device: %s', e)
self._ser = None
self._setState(STATES.DISCONNECTED)
time.sleep(1)
else:
try:
if self._ser.in_waiting:
buffer += self._ser.read()
if len(buffer) >= 3 and buffer[-1] == 0:
cmdID = buffer[0]
receivedChecksum = buffer[-2]
expectedChecksum = calculateChecksum(buffer[:-2])
if cmdID == ord('/') and buffer != HARDWARE_MAGIC:
message = buffer[1:-1].decode()
self._log.info('<- %s', message)
else:
if receivedChecksum == expectedChecksum:
# Connection acknowledgment
if cmdID == 1 and not self.ready:
self._setState(STATES.READY)
self._log.info('Connected!')
# Ping
elif cmdID == 2 and self.ready:
lastRXPing = time.perf_counter()
else:
self._log.warning('Received malformed packet from device: '+
'Checksum mismatch: %s expected, %s received.',
expectedChecksum, receivedChecksum)
buffer = b''
if time.perf_counter() - lastTXPing > CONNECTION_READ_TIMEOUT and\
not self.ready:
# Raise an error
self._log.debug('No acknowledgment received.')
raise TimeoutError()
# Send occasional ping
if time.perf_counter() - lastTXPing > TX_PING_INTERVAL and self.ready:
self._sendPacket(b'\x02')
lastTXPing = time.perf_counter()
# Check for timeout
if time.perf_counter() - lastRXPing > RX_PING_TIMEOUT:
raise TimeoutError('Device hasn\'t sent a ping in 5s.')
except (AttributeError, exceptions.TransmitError,
serial.serialutil.SerialException, serial.serialutil.PortNotOpenError,
TimeoutError, OSError) as e:
self._log.warning('An error occured while communicating with the '+
'WHSPAH device: %s', e)
self._log.warning('Disconnecting...')
self._setState(STATES.DISCONNECTED)
self._ser = None
time.sleep(1)
def transmit(self, transmitterID: int, channel: int, mode: MODES,
intensity: int=0, lucalEncoded: bool=False) -> bool:
"""
Transmits the given arguments to the WHSPAH device.
Returns:
bool: True if transmitted successfully, False otherwise.
Raises:
TransmitError: Raised if `exceptionOnFiledTX` is True, and the transmit fails.
InvalidTransmitterID: If the provided transmitter ID is invalid.
InvalidChannel: If the provided channel is invalid.
InvalidMode: If the provided mode is invalid.
InvalidIntensity: If the provided intensity is invalid.
"""
# TODO: Test with a non-lucal encoded shocker
if transmitterID < 126 or transmitterID > 32767 or not isinstance(transmitterID, int):
raise exceptions.InvalidTransmitterID(transmitterID)
if channel < 1 or channel > 3 or not isinstance(channel, int):
raise exceptions.InvalidChannel(channel)
if mode not in MODES:
raise exceptions.InvalidMode(mode)
if mode == MODES.BEEP and intensity != 0:
raise exceptions.InvalidIntensity(intensity, True)
if intensity < 0 or intensity > 99 or not isinstance(intensity, int):
raise exceptions.InvalidIntensity(intensity)
args = struct.pack('<HBBB?', transmitterID, channel-1, mode.value, intensity, lucalEncoded)
try:
self._sendPacket(b'\x10' + args)
return True
except exceptions.TransmitError as e:
if self.exceptionOnFiledTX:
raise e
return False
def _sendPacket(self, packet: bytearray | bytes):
"""
Sends a packet over the serial port, automatically appending the checksum.
Raises:
TransmitError: If the transmititon failed.
"""
packet = bytearray(packet)
checksum = calculateChecksum(packet)
packet.append(checksum)
packet.append(0x00)
try:
with self._serWriteLock:
self._ser.write(packet)
except (AttributeError, serial.serialutil.PortNotOpenError) as e:
self._ser = None
raise exceptions.TransmitError(self._port) from e
def addShocker(self, *args, **kwargs) -> Shocker:
"""
Creates a new Shocker object with this transmitter as the shockers assigned transmitter.
For args, see whspah.Shocker.
"""
kwargs.update({'transmitter': self})
shocker = Shocker(*args, **kwargs)
return shocker
@property
def state(self):
"""
Returns the current state of the transmitter.
"""
return self._state
@property
def ready(self):
"""
Returns True if the transmitter is ready.
"""
return self._state == STATES.READY

View File

@ -2,11 +2,73 @@
Holds all WHSPAH exceptions.
"""
import os
class NoWHSPAHDeviceFound(OSError):
"""
Gets raised when whspah.Device() is called with no port,
Gets raised when whspah.Transmitter() is called with no port,
and no port could be automatically found.
"""
def __init__(self):
super().__init__('No WHSPAH device was found.')
class NotValidWHSPAHDevice(OSError):
"""
Gets raised if the specified transmitter port is not a WHSPAH device.
"""
def __init__(self, port: os.PathLike):
super().__init__(f'{port} is not a WHSPAH device.')
class TransmitError(OSError):
"""
Gets raised when transmitting to the device fails.
This typically happens when trying to communicate with an unplugged device.
"""
def __init__(self, port: os.PathLike):
super().__init__(f'Failed to transmit to {port}.')
# ----- Transmit argument errors -----
class InvalidTransmitterID(ValueError):
"""
Gets raised if the chosen transmitter ID is not within range.
"""
def __init__(self, transmitterID: int):
super().__init__(f'Transmitter ID {transmitterID} is not valid. '+
'Transmitter ID must be an int within the range 126 to 32767.')
class InvalidChannel(ValueError):
"""
Gets raised if the chosen channel is not within range.
"""
def __init__(self, channel: int):
super().__init__(f'Channel {channel} is not valid.'+
' Channel must be an int within the range 1 to 3.')
class InvalidMode(ValueError):
"""
Gets raised if the chosen mode is not valid.
"""
def __init__(self, mode: 'whspah.MODES'):
super().__init__(f'Mode {mode} is not valid.'+
' Mode must be either an int within the range 1 to 4,'+
' or a selection from `whspah.MODES`.')
class InvalidIntensity(ValueError):
"""
Gets raised if the chosen intensity is not within range.
"""
def __init__(self, intensity: int, shouldBeZero=False):
message = f'Intensity {intensity} is not valid.'
if shouldBeZero:
message += ' The mode specified requires the intensity to be 0.'
else:
message += ' Intensity must be an int within the range 0 to 99.'
super().__init__(message)

View File

@ -0,0 +1,83 @@
"""
Holds meta classes.
This is used if you want to create a custom transmitters / shockers that still nicely integrate
with W.H.S.P.A.H.
"""
from threading import Thread
from .constants import STATES
class MetaShocker:
def __init__(self):
pass
def shock(self, intensity: int):
"""
Sends a shock command to this shocker.
"""
raise NotImplementedError()
def vibrate(self, intensity: int):
"""
Sends a vibrate command to this shocker.
"""
raise NotImplementedError()
def beep(self):
"""
Sends a beep command to this shocker.
"""
raise NotImplementedError()
def toggleLight(self):
"""
Toggles the light of this shocker.
"""
raise NotImplementedError()
class MetaTransmitter:
def __init__(self):
self._stateCallbacks = []
self._state = STATES.DISCONNECTED
def addStateCallback(self, callback: callable):
"""
Adds a callback function for state changes.
All callbacks will be ran with the following positional arguments:
- Transmitter: The Transmitter object that called the callback.
- STATES: The new state of the transmitter.
"""
self._stateCallbacks.append(callback)
@property
def state(self):
"""
Returns the current state of the transmitter.
"""
return self._state
def _setState(self, newState: STATES):
"""
Updates the current state of the transmitter and runs all callbacks.
"""
self._state = newState
for callback in self._stateCallbacks:
Thread(target=callback, args=(self, newState)).start()
@property
def ready(self):
"""
Returns True if the transmitter is ready.
"""
return self._state == STATES.READY

View File

@ -0,0 +1,58 @@
"""
Holds the Shocker object.
"""
from .constants import MODES
from .metaClasses import MetaShocker
class Shocker(MetaShocker):
"""
"""
def __init__(self, transmitterID: int, channel: int, *,
lucalEncoded: bool=False, transmitter: 'whspah.Transmitter'=None):
self.transmitterID = transmitterID
self.channel = channel
self.transmitter = transmitter
self.lucalEncoded = lucalEncoded
self._lastLightIntensity = 0
def _transmit(self, mode: MODES, intensity: int=0):
"""
Transmits using this shockers transmitter ID, channel and lucal setting
with the assigned transmitter.
"""
self.transmitter.transmit(self.transmitterID, self.channel, mode,
intensity, self.lucalEncoded)
def shock(self, intensity: int):
"""
Sends a shock command to this shocker.
"""
self._transmit(MODES.SHOCK, intensity)
def vibrate(self, intensity: int):
"""
Sends a vibrate command to this shocker.
"""
self._transmit(MODES.VIBRATE, intensity)
def beep(self):
"""
Sends a beep command to this shocker.
"""
self._transmit(MODES.BEEP, self._lastLightIntensity)
def toggleLight(self):
"""
Toggles the light of this shocker.
"""
self._lastLightIntensity += 1
self._lastLightIntensity %= 100
self._transmit(MODES.LIGHT, self._lastLightIntensity)

View File

@ -11,10 +11,12 @@ import io
import serial.tools.list_ports
import serial
from whspah.constants import HARDWARE_MAGIC
def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout: float) -> None:
"""
Gets called as a thread by _sanForValidPorts().
Attempts to connect to the port specified, then waits for "WHSPAH\n".
Attempts to connect to the port specified, then waits for magic string.
If a successful port was found, it will be closed, then appended to
resultList.
"""
@ -25,12 +27,13 @@ def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout:
# Start a timer
st = time.perf_counter()
# Connect to the serial port
ser = serial.Serial(port, baud, timeout=0)
ser = serial.Serial(port, baud, timeout=timeout)
ser.flushInput()
# Select it to enforce timeouts
select.select([ser], [], [], timeout)
# Attempt to read "WHSPAH\n" from the port
while (ser.readline() != b'WHSPAH\n') and time.perf_counter() - st < timeout:
time.sleep(0.5)
# Attempt to read magic string from the port
while (ser.read_until(b'\x00') != HARDWARE_MAGIC) and time.perf_counter() - st < timeout:
time.sleep(0.1)
# If all of the above completed before the timeout, add the port to the result list
if time.perf_counter() - st < timeout: