Compare commits
13 Commits
c04e5c8d56
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
933d056ca3
|
|||
|
629069957e
|
|||
|
d941282af9
|
|||
|
872b157f18
|
|||
|
ca2faa72f1
|
|||
|
fa026e0a52
|
|||
|
725d334e2f
|
|||
|
e30f5f7042
|
|||
|
fab54ea67e
|
|||
|
d54b112732
|
|||
|
d40aa039f7
|
|||
|
29b30aeb0b
|
|||
|
a2429d4dbc
|
@ -15,8 +15,10 @@ Each packet is constructed as follows:
|
||||
- `uint8_t`: A basic XOR checksum of all the previous bytes, XORed with `0x55`.
|
||||
- `null`: A null terminator (0x00).
|
||||
|
||||
The firmware can also send logs by sending the initial character `/`, followed by ASCII text, then a null terminator (`0x00`)
|
||||
|
||||
### Commands:
|
||||
- Connect (`0x01`): Used to get the firmware into its ready state, and out of it's initial mode of repeating `WHSPAH\n`. Responds with `0x01`.
|
||||
- Connect (`0x01`): Used to get the firmware into its ready state, and out of it's initial mode of repeating `/WHSPAH\x00`. Responds with `0x01`.
|
||||
- Ping (`0x02`): Needs to be send at least once every 500ms, otherwise the firmware will reset into its initial mode. Responds with `0x02`.
|
||||
- Transmit (`0x10`): Transmits a CaiXianlin packet. The command's arguments are as follows:
|
||||
- `uint16_t`: Transmitter ID
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
#include "./CommandProcessor.h"
|
||||
#include "./Commands.h"
|
||||
#include "./Globals.h"
|
||||
|
||||
// Gets set to true once the command processor has connected to a host
|
||||
bool CMDProcessor::connected = false;
|
||||
@ -47,22 +48,25 @@ namespace {
|
||||
}
|
||||
checksum ^= 0x55;
|
||||
Serial.write(checksum);
|
||||
Serial.write(0x00);
|
||||
Serial.write((char) 0x00);
|
||||
}
|
||||
}
|
||||
|
||||
void CMDProcessor::process() {
|
||||
// Command read timeout
|
||||
if (phase != 0 && millis() - lastRX > CMD_PROCESSOR_RX_TIMEOUT_MS) {
|
||||
sendLog("Error: Command read timed out.");
|
||||
reset();
|
||||
return;
|
||||
}
|
||||
|
||||
if (CMDProcessor::connected && millis() - lastPing > CMD_PROCESSOR_PING_TIMEOUT_MS) {
|
||||
sendLog("Error: Ping timed out.");
|
||||
CMDProcessor::connected = false;
|
||||
reset();
|
||||
} else if (!CMDProcessor::connected && millis() - lastPing > 250) {
|
||||
Serial.print("WHSPAH\n");
|
||||
Serial.print("/WHSPAH");
|
||||
Serial.print((char) 0x00);
|
||||
lastPing = millis();
|
||||
}
|
||||
|
||||
@ -106,6 +110,7 @@ void CMDProcessor::process() {
|
||||
if (rxChecksum == calcChecksum) {
|
||||
phase++;
|
||||
} else {
|
||||
sendLog("Malformed packet: Checksum mismatch.");
|
||||
resetAfterNull();
|
||||
}
|
||||
break;
|
||||
@ -113,6 +118,7 @@ void CMDProcessor::process() {
|
||||
if (Serial.read() == 0x00) {
|
||||
phase++;
|
||||
} else {
|
||||
sendLog("Malformed packet: No null terminator found.");
|
||||
// If a null terminator is not present in the expected place,
|
||||
// set the command ID to 0xFF (no op) and remain in phase 3.
|
||||
// This keeps the firmware waiting for a valid null terminator
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
#define CommandProcessor_h
|
||||
|
||||
#define CMD_PROCESSOR_RX_TIMEOUT_MS 200
|
||||
#define CMD_PROCESSOR_PING_TIMEOUT_MS 500
|
||||
#define CMD_PROCESSOR_PING_TIMEOUT_MS 2000
|
||||
|
||||
#include <Arduino.h>
|
||||
|
||||
|
||||
@ -2,3 +2,9 @@
|
||||
#include "./CaiXianlin.h"
|
||||
|
||||
CaiXianlin::Transmitter* RFTransmitter = nullptr;
|
||||
|
||||
void sendLog(char* message) {
|
||||
Serial.write("/");
|
||||
Serial.write(message);
|
||||
Serial.write((char) 0x00);
|
||||
}
|
||||
|
||||
@ -4,5 +4,6 @@
|
||||
#include "./CaiXianlin.h"
|
||||
|
||||
extern CaiXianlin::Transmitter* RFTransmitter;
|
||||
void sendLog(char* message);
|
||||
|
||||
#endif // Header define
|
||||
|
||||
@ -3,8 +3,11 @@ W.H.S.P.A.H is an entirely self-hosted,
|
||||
local and free way to interact with CaiXianlin shock collars.
|
||||
"""
|
||||
|
||||
from .driver import Transmitter
|
||||
from .constants import MODES
|
||||
from .shocker import Shocker
|
||||
from . import exceptions
|
||||
from . import utils
|
||||
|
||||
__all__ = ['exceptions', 'utils']
|
||||
__all__ = ['Transmitter', 'MODES', 'Shocker', 'exceptions', 'utils']
|
||||
__version__ = '1.0.0'
|
||||
|
||||
27
src/pythonAPI/whspah/constants.py
Normal file
27
src/pythonAPI/whspah/constants.py
Normal file
@ -0,0 +1,27 @@
|
||||
"""
|
||||
Holds constants used throughout WHSPAH.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
# The hardware magic string.
|
||||
HARDWARE_MAGIC = b'/WHSPAH\x00'
|
||||
|
||||
class MODES(Enum):
|
||||
"""
|
||||
The different commands or "modes" of the shockers.
|
||||
"""
|
||||
|
||||
SHOCK = 1
|
||||
VIBRATE = 2
|
||||
BEEP = 3
|
||||
LIGHT = 4
|
||||
|
||||
class STATES(Enum):
|
||||
"""
|
||||
The different states of the transmitter object.
|
||||
"""
|
||||
|
||||
DISCONNECTED = 1
|
||||
CONNECTING = 2
|
||||
READY = 3
|
||||
309
src/pythonAPI/whspah/driver.py
Normal file
309
src/pythonAPI/whspah/driver.py
Normal file
@ -0,0 +1,309 @@
|
||||
"""
|
||||
All device related parts of the WHSPAH API.
|
||||
"""
|
||||
|
||||
from threading import Thread
|
||||
from threading import Lock
|
||||
from enum import Enum
|
||||
import logging
|
||||
import select
|
||||
import struct
|
||||
import time
|
||||
import os
|
||||
|
||||
import serial
|
||||
|
||||
from whspah.constants import MODES, STATES, HARDWARE_MAGIC
|
||||
import whspah.exceptions as exceptions
|
||||
from whspah.metaClasses import MetaTransmitter
|
||||
from whspah.shocker import Shocker
|
||||
import whspah.utils as utils
|
||||
|
||||
CONNECTION_READ_TIMEOUT = 2.5
|
||||
TX_PING_INTERVAL = 1
|
||||
RX_PING_TIMEOUT = 5
|
||||
|
||||
def calculateChecksum(packet: bytes) -> int:
|
||||
"""
|
||||
Returns the checksum for the provided packet.
|
||||
"""
|
||||
|
||||
checksum = 0
|
||||
for b in packet:
|
||||
checksum ^= b
|
||||
checksum ^= 0x55
|
||||
|
||||
return checksum
|
||||
|
||||
class Transmitter(MetaTransmitter):
|
||||
"""
|
||||
Initialises a connection to a physical WHSPAH device.
|
||||
Args:
|
||||
port (os.PathLike): The serial port to connect to. If this is default,
|
||||
the WHSPAH API will attempt to search the system for a WHSPAH device.
|
||||
If this is None, no connection attempt will be made until changePort()
|
||||
is manually called.
|
||||
exceptionOnFiledTX (bool): If True, TransmitError will get raised if transmitting to the
|
||||
device for any reason, otherwise commands will just return
|
||||
False on error.
|
||||
baudrate (int): What baudrate to use when connecting to the hardware.
|
||||
This is only needed if you're messing with custom firmware, in all other
|
||||
situations, this should be left as default.
|
||||
|
||||
Raises:
|
||||
NoWHSPAHDeviceFound: Raised if the specified port is not a valid WHSPAH device,
|
||||
or, if port is None, when no WHSPAH devies were found.
|
||||
"""
|
||||
|
||||
def __init__(self, port: os.PathLike=..., *,
|
||||
exceptionOnFiledTX: bool=True, baudrate: int=9600):
|
||||
super().__init__()
|
||||
|
||||
# A default logger, this gets overwritten by changePort() later,
|
||||
# this is just to keep typehints working mostly.
|
||||
self._log = logging.getLogger('WHSPAH-API.???')
|
||||
|
||||
self._baud = baudrate
|
||||
self._ser: serial.Serial = None
|
||||
self._serWriteLock = Lock()
|
||||
self._port = port
|
||||
self.exceptionOnFiledTX = exceptionOnFiledTX
|
||||
|
||||
self._runConnThread = False
|
||||
self._connThread = Thread(target=self._connThreadFunc)
|
||||
|
||||
if self._port is ...:
|
||||
ports = utils.scanForValidPorts(baudrate=self._baud)
|
||||
if len(ports) == 0:
|
||||
raise exceptions.NoWHSPAHDeviceFound()
|
||||
self._port = ports[0]
|
||||
|
||||
if self._port is not None:
|
||||
self.changePort(self._port, baudrate)
|
||||
|
||||
def changePort(self, port: os.PathLike, timeout: float=5.0, *, baudrate: int=9600):
|
||||
"""
|
||||
Changes the port this device object is connected to.
|
||||
Automatically closes any existing connection, then re-connects to the new port.
|
||||
|
||||
Args:
|
||||
port (os.PathLike): The new port to connect to.
|
||||
timeout (float): The maximum amount of time to wait for a successful connection.
|
||||
baudrate (int): What baudrate to use when connecting to the hardware.
|
||||
This is only needed if you're messing with custom firmware, in all other
|
||||
situations, this should be left as default.
|
||||
"""
|
||||
|
||||
self.close()
|
||||
|
||||
self._port = port
|
||||
self._baud = baudrate
|
||||
self._log = logging.getLogger('WHSPAH-API.'+self._port.split('/')[-1])
|
||||
|
||||
self._runConnThread = True
|
||||
self._connThread.start()
|
||||
|
||||
st = time.perf_counter()
|
||||
while (not self.ready) and time.perf_counter() - st < timeout:
|
||||
time.sleep(0.5)
|
||||
|
||||
if time.perf_counter() - st > timeout:
|
||||
self.close()
|
||||
raise TimeoutError(f'Could not connect to device in {timeout} seconds.')
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Closes the connection with the WHSPAH device.
|
||||
"""
|
||||
|
||||
if not self._runConnThread:
|
||||
return
|
||||
|
||||
self._log.info('Disconnecting...')
|
||||
self._runConnThread = False
|
||||
try:
|
||||
self._connThread.join()
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
def _connThreadFunc(self):
|
||||
"""
|
||||
This gets ran as a thread to keep a persistant connection with the WHSPAH device.
|
||||
"""
|
||||
|
||||
lastTXPing = 0
|
||||
lastRXPing = 0
|
||||
buffer = b''
|
||||
|
||||
while self._runConnThread:
|
||||
if self._ser is None:
|
||||
try:
|
||||
# Connect to serial port
|
||||
self._log.debug('Connecting...')
|
||||
self._setState(STATES.CONNECTING)
|
||||
self._ser = serial.Serial(self._port, self._baud,
|
||||
timeout=CONNECTION_READ_TIMEOUT)
|
||||
self._ser.flushInput()
|
||||
# Initialise timeouts
|
||||
select.select([self._ser], [], [], CONNECTION_READ_TIMEOUT)
|
||||
st = time.perf_counter()
|
||||
# Wait for WHSPAH message
|
||||
while (self._ser.read_until(b'\x00') != HARDWARE_MAGIC) and\
|
||||
time.perf_counter() - st < CONNECTION_READ_TIMEOUT:
|
||||
time.sleep(0.1)
|
||||
|
||||
# If time timeout was exceeded
|
||||
if time.perf_counter() - st >= CONNECTION_READ_TIMEOUT:
|
||||
# Raise an error
|
||||
raise exceptions.NotValidWHSPAHDevice(self._port)
|
||||
|
||||
# Send the connect command
|
||||
self._sendPacket(b'\x01')
|
||||
# Wait for connect acknowledgement
|
||||
lastTXPing = time.perf_counter()
|
||||
lastRXPing = time.perf_counter()
|
||||
except (serial.serialutil.SerialException, exceptions.NotValidWHSPAHDevice,
|
||||
TimeoutError, exceptions.TransmitError, AttributeError) as e:
|
||||
self._log.warning('Could not connect to device: %s', e)
|
||||
self._ser = None
|
||||
self._setState(STATES.DISCONNECTED)
|
||||
time.sleep(1)
|
||||
else:
|
||||
try:
|
||||
if self._ser.in_waiting:
|
||||
buffer += self._ser.read()
|
||||
|
||||
if len(buffer) >= 3 and buffer[-1] == 0:
|
||||
cmdID = buffer[0]
|
||||
receivedChecksum = buffer[-2]
|
||||
expectedChecksum = calculateChecksum(buffer[:-2])
|
||||
|
||||
if cmdID == ord('/') and buffer != HARDWARE_MAGIC:
|
||||
message = buffer[1:-1].decode()
|
||||
self._log.info('<- %s', message)
|
||||
else:
|
||||
if receivedChecksum == expectedChecksum:
|
||||
# Connection acknowledgment
|
||||
if cmdID == 1 and not self.ready:
|
||||
self._setState(STATES.READY)
|
||||
self._log.info('Connected!')
|
||||
# Ping
|
||||
elif cmdID == 2 and self.ready:
|
||||
lastRXPing = time.perf_counter()
|
||||
else:
|
||||
self._log.warning('Received malformed packet from device: '+
|
||||
'Checksum mismatch: %s expected, %s received.',
|
||||
expectedChecksum, receivedChecksum)
|
||||
|
||||
buffer = b''
|
||||
|
||||
if time.perf_counter() - lastTXPing > CONNECTION_READ_TIMEOUT and\
|
||||
not self.ready:
|
||||
# Raise an error
|
||||
self._log.debug('No acknowledgment received.')
|
||||
raise TimeoutError()
|
||||
|
||||
# Send occasional ping
|
||||
if time.perf_counter() - lastTXPing > TX_PING_INTERVAL and self.ready:
|
||||
self._sendPacket(b'\x02')
|
||||
lastTXPing = time.perf_counter()
|
||||
|
||||
# Check for timeout
|
||||
if time.perf_counter() - lastRXPing > RX_PING_TIMEOUT:
|
||||
raise TimeoutError('Device hasn\'t sent a ping in 5s.')
|
||||
except (AttributeError, exceptions.TransmitError,
|
||||
serial.serialutil.SerialException, serial.serialutil.PortNotOpenError,
|
||||
TimeoutError, OSError) as e:
|
||||
self._log.warning('An error occured while communicating with the '+
|
||||
'WHSPAH device: %s', e)
|
||||
self._log.warning('Disconnecting...')
|
||||
self._setState(STATES.DISCONNECTED)
|
||||
self._ser = None
|
||||
time.sleep(1)
|
||||
|
||||
def transmit(self, transmitterID: int, channel: int, mode: MODES,
|
||||
intensity: int=0, lucalEncoded: bool=False) -> bool:
|
||||
"""
|
||||
Transmits the given arguments to the WHSPAH device.
|
||||
|
||||
Returns:
|
||||
bool: True if transmitted successfully, False otherwise.
|
||||
|
||||
Raises:
|
||||
TransmitError: Raised if `exceptionOnFiledTX` is True, and the transmit fails.
|
||||
InvalidTransmitterID: If the provided transmitter ID is invalid.
|
||||
InvalidChannel: If the provided channel is invalid.
|
||||
InvalidMode: If the provided mode is invalid.
|
||||
InvalidIntensity: If the provided intensity is invalid.
|
||||
"""
|
||||
|
||||
# TODO: Test with a non-lucal encoded shocker
|
||||
if transmitterID < 126 or transmitterID > 32767 or not isinstance(transmitterID, int):
|
||||
raise exceptions.InvalidTransmitterID(transmitterID)
|
||||
|
||||
if channel < 1 or channel > 3 or not isinstance(channel, int):
|
||||
raise exceptions.InvalidChannel(channel)
|
||||
|
||||
if mode not in MODES:
|
||||
raise exceptions.InvalidMode(mode)
|
||||
|
||||
if mode == MODES.BEEP and intensity != 0:
|
||||
raise exceptions.InvalidIntensity(intensity, True)
|
||||
|
||||
if intensity < 0 or intensity > 99 or not isinstance(intensity, int):
|
||||
raise exceptions.InvalidIntensity(intensity)
|
||||
|
||||
args = struct.pack('<HBBB?', transmitterID, channel-1, mode.value, intensity, lucalEncoded)
|
||||
try:
|
||||
self._sendPacket(b'\x10' + args)
|
||||
return True
|
||||
except exceptions.TransmitError as e:
|
||||
if self.exceptionOnFiledTX:
|
||||
raise e
|
||||
return False
|
||||
|
||||
def _sendPacket(self, packet: bytearray | bytes):
|
||||
"""
|
||||
Sends a packet over the serial port, automatically appending the checksum.
|
||||
|
||||
Raises:
|
||||
TransmitError: If the transmititon failed.
|
||||
"""
|
||||
|
||||
packet = bytearray(packet)
|
||||
checksum = calculateChecksum(packet)
|
||||
packet.append(checksum)
|
||||
packet.append(0x00)
|
||||
|
||||
try:
|
||||
with self._serWriteLock:
|
||||
self._ser.write(packet)
|
||||
except (AttributeError, serial.serialutil.PortNotOpenError) as e:
|
||||
self._ser = None
|
||||
raise exceptions.TransmitError(self._port) from e
|
||||
|
||||
def addShocker(self, *args, **kwargs) -> Shocker:
|
||||
"""
|
||||
Creates a new Shocker object with this transmitter as the shockers assigned transmitter.
|
||||
For args, see whspah.Shocker.
|
||||
"""
|
||||
|
||||
kwargs.update({'transmitter': self})
|
||||
shocker = Shocker(*args, **kwargs)
|
||||
return shocker
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""
|
||||
Returns the current state of the transmitter.
|
||||
"""
|
||||
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def ready(self):
|
||||
"""
|
||||
Returns True if the transmitter is ready.
|
||||
"""
|
||||
|
||||
return self._state == STATES.READY
|
||||
@ -2,11 +2,73 @@
|
||||
Holds all WHSPAH exceptions.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
class NoWHSPAHDeviceFound(OSError):
|
||||
"""
|
||||
Gets raised when whspah.Device() is called with no port,
|
||||
Gets raised when whspah.Transmitter() is called with no port,
|
||||
and no port could be automatically found.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('No WHSPAH device was found.')
|
||||
|
||||
class NotValidWHSPAHDevice(OSError):
|
||||
"""
|
||||
Gets raised if the specified transmitter port is not a WHSPAH device.
|
||||
"""
|
||||
|
||||
def __init__(self, port: os.PathLike):
|
||||
super().__init__(f'{port} is not a WHSPAH device.')
|
||||
|
||||
class TransmitError(OSError):
|
||||
"""
|
||||
Gets raised when transmitting to the device fails.
|
||||
This typically happens when trying to communicate with an unplugged device.
|
||||
"""
|
||||
|
||||
def __init__(self, port: os.PathLike):
|
||||
super().__init__(f'Failed to transmit to {port}.')
|
||||
|
||||
# ----- Transmit argument errors -----
|
||||
|
||||
class InvalidTransmitterID(ValueError):
|
||||
"""
|
||||
Gets raised if the chosen transmitter ID is not within range.
|
||||
"""
|
||||
|
||||
def __init__(self, transmitterID: int):
|
||||
super().__init__(f'Transmitter ID {transmitterID} is not valid. '+
|
||||
'Transmitter ID must be an int within the range 126 to 32767.')
|
||||
|
||||
class InvalidChannel(ValueError):
|
||||
"""
|
||||
Gets raised if the chosen channel is not within range.
|
||||
"""
|
||||
|
||||
def __init__(self, channel: int):
|
||||
super().__init__(f'Channel {channel} is not valid.'+
|
||||
' Channel must be an int within the range 1 to 3.')
|
||||
|
||||
class InvalidMode(ValueError):
|
||||
"""
|
||||
Gets raised if the chosen mode is not valid.
|
||||
"""
|
||||
|
||||
def __init__(self, mode: 'whspah.MODES'):
|
||||
super().__init__(f'Mode {mode} is not valid.'+
|
||||
' Mode must be either an int within the range 1 to 4,'+
|
||||
' or a selection from `whspah.MODES`.')
|
||||
|
||||
class InvalidIntensity(ValueError):
|
||||
"""
|
||||
Gets raised if the chosen intensity is not within range.
|
||||
"""
|
||||
|
||||
def __init__(self, intensity: int, shouldBeZero=False):
|
||||
message = f'Intensity {intensity} is not valid.'
|
||||
if shouldBeZero:
|
||||
message += ' The mode specified requires the intensity to be 0.'
|
||||
else:
|
||||
message += ' Intensity must be an int within the range 0 to 99.'
|
||||
super().__init__(message)
|
||||
|
||||
83
src/pythonAPI/whspah/metaClasses.py
Normal file
83
src/pythonAPI/whspah/metaClasses.py
Normal file
@ -0,0 +1,83 @@
|
||||
"""
|
||||
Holds meta classes.
|
||||
This is used if you want to create a custom transmitters / shockers that still nicely integrate
|
||||
with W.H.S.P.A.H.
|
||||
"""
|
||||
|
||||
from threading import Thread
|
||||
|
||||
from .constants import STATES
|
||||
|
||||
class MetaShocker:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def shock(self, intensity: int):
|
||||
"""
|
||||
Sends a shock command to this shocker.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
def vibrate(self, intensity: int):
|
||||
"""
|
||||
Sends a vibrate command to this shocker.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
def beep(self):
|
||||
"""
|
||||
Sends a beep command to this shocker.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
def toggleLight(self):
|
||||
"""
|
||||
Toggles the light of this shocker.
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
|
||||
class MetaTransmitter:
|
||||
def __init__(self):
|
||||
self._stateCallbacks = []
|
||||
self._state = STATES.DISCONNECTED
|
||||
|
||||
def addStateCallback(self, callback: callable):
|
||||
"""
|
||||
Adds a callback function for state changes.
|
||||
All callbacks will be ran with the following positional arguments:
|
||||
- Transmitter: The Transmitter object that called the callback.
|
||||
- STATES: The new state of the transmitter.
|
||||
"""
|
||||
|
||||
self._stateCallbacks.append(callback)
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""
|
||||
Returns the current state of the transmitter.
|
||||
"""
|
||||
|
||||
return self._state
|
||||
|
||||
def _setState(self, newState: STATES):
|
||||
"""
|
||||
Updates the current state of the transmitter and runs all callbacks.
|
||||
"""
|
||||
|
||||
self._state = newState
|
||||
for callback in self._stateCallbacks:
|
||||
Thread(target=callback, args=(self, newState)).start()
|
||||
|
||||
@property
|
||||
def ready(self):
|
||||
"""
|
||||
Returns True if the transmitter is ready.
|
||||
"""
|
||||
|
||||
return self._state == STATES.READY
|
||||
58
src/pythonAPI/whspah/shocker.py
Normal file
58
src/pythonAPI/whspah/shocker.py
Normal file
@ -0,0 +1,58 @@
|
||||
"""
|
||||
Holds the Shocker object.
|
||||
"""
|
||||
|
||||
from .constants import MODES
|
||||
from .metaClasses import MetaShocker
|
||||
|
||||
class Shocker(MetaShocker):
|
||||
"""
|
||||
"""
|
||||
|
||||
def __init__(self, transmitterID: int, channel: int, *,
|
||||
lucalEncoded: bool=False, transmitter: 'whspah.Transmitter'=None):
|
||||
self.transmitterID = transmitterID
|
||||
self.channel = channel
|
||||
self.transmitter = transmitter
|
||||
self.lucalEncoded = lucalEncoded
|
||||
|
||||
self._lastLightIntensity = 0
|
||||
|
||||
def _transmit(self, mode: MODES, intensity: int=0):
|
||||
"""
|
||||
Transmits using this shockers transmitter ID, channel and lucal setting
|
||||
with the assigned transmitter.
|
||||
"""
|
||||
|
||||
self.transmitter.transmit(self.transmitterID, self.channel, mode,
|
||||
intensity, self.lucalEncoded)
|
||||
|
||||
def shock(self, intensity: int):
|
||||
"""
|
||||
Sends a shock command to this shocker.
|
||||
"""
|
||||
|
||||
self._transmit(MODES.SHOCK, intensity)
|
||||
|
||||
def vibrate(self, intensity: int):
|
||||
"""
|
||||
Sends a vibrate command to this shocker.
|
||||
"""
|
||||
|
||||
self._transmit(MODES.VIBRATE, intensity)
|
||||
|
||||
def beep(self):
|
||||
"""
|
||||
Sends a beep command to this shocker.
|
||||
"""
|
||||
|
||||
self._transmit(MODES.BEEP, self._lastLightIntensity)
|
||||
|
||||
def toggleLight(self):
|
||||
"""
|
||||
Toggles the light of this shocker.
|
||||
"""
|
||||
|
||||
self._lastLightIntensity += 1
|
||||
self._lastLightIntensity %= 100
|
||||
self._transmit(MODES.LIGHT, self._lastLightIntensity)
|
||||
@ -11,10 +11,12 @@ import io
|
||||
import serial.tools.list_ports
|
||||
import serial
|
||||
|
||||
from whspah.constants import HARDWARE_MAGIC
|
||||
|
||||
def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout: float) -> None:
|
||||
"""
|
||||
Gets called as a thread by _sanForValidPorts().
|
||||
Attempts to connect to the port specified, then waits for "WHSPAH\n".
|
||||
Attempts to connect to the port specified, then waits for magic string.
|
||||
If a successful port was found, it will be closed, then appended to
|
||||
resultList.
|
||||
"""
|
||||
@ -25,12 +27,13 @@ def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout:
|
||||
# Start a timer
|
||||
st = time.perf_counter()
|
||||
# Connect to the serial port
|
||||
ser = serial.Serial(port, baud, timeout=0)
|
||||
ser = serial.Serial(port, baud, timeout=timeout)
|
||||
ser.flushInput()
|
||||
# Select it to enforce timeouts
|
||||
select.select([ser], [], [], timeout)
|
||||
# Attempt to read "WHSPAH\n" from the port
|
||||
while (ser.readline() != b'WHSPAH\n') and time.perf_counter() - st < timeout:
|
||||
time.sleep(0.5)
|
||||
# Attempt to read magic string from the port
|
||||
while (ser.read_until(b'\x00') != HARDWARE_MAGIC) and time.perf_counter() - st < timeout:
|
||||
time.sleep(0.1)
|
||||
|
||||
# If all of the above completed before the timeout, add the port to the result list
|
||||
if time.perf_counter() - st < timeout:
|
||||
|
||||
Reference in New Issue
Block a user