Compare commits

...

3 Commits

Author SHA1 Message Date
933d056ca3 [API-PY] Added shocker class 2026-05-28 19:12:07 +01:00
629069957e [API-PY] Added metaclasses 2026-05-28 19:11:40 +01:00
d941282af9 [API-PY] Moved STATES to constants.py 2026-05-28 19:10:41 +01:00
5 changed files with 168 additions and 33 deletions

View File

@ -5,8 +5,9 @@ local and free way to interact with CaiXianlin shock collars.
from .driver import Transmitter
from .constants import MODES
from .shocker import Shocker
from . import exceptions
from . import utils
__all__ = ['Transmitter', 'MODES', 'exceptions', 'utils']
__all__ = ['Transmitter', 'MODES', 'Shocker', 'exceptions', 'utils']
__version__ = '1.0.0'

View File

@ -16,3 +16,12 @@ class MODES(Enum):
VIBRATE = 2
BEEP = 3
LIGHT = 4
class STATES(Enum):
"""
The different states of the transmitter object.
"""
DISCONNECTED = 1
CONNECTING = 2
READY = 3

View File

@ -13,23 +13,16 @@ import os
import serial
from whspah.constants import MODES, HARDWARE_MAGIC
from whspah.constants import MODES, STATES, HARDWARE_MAGIC
import whspah.exceptions as exceptions
from whspah.metaClasses import MetaTransmitter
from whspah.shocker import Shocker
import whspah.utils as utils
CONNECTION_READ_TIMEOUT = 2.5
TX_PING_INTERVAL = 1
RX_PING_TIMEOUT = 5
class STATES(Enum):
"""
The different states of the transmitter object.
"""
DISCONNECTED = 1
CONNECTING = 2
READY = 3
def calculateChecksum(packet: bytes) -> int:
"""
Returns the checksum for the provided packet.
@ -42,7 +35,7 @@ def calculateChecksum(packet: bytes) -> int:
return checksum
class Transmitter:
class Transmitter(MetaTransmitter):
"""
Initialises a connection to a physical WHSPAH device.
Args:
@ -64,6 +57,8 @@ class Transmitter:
def __init__(self, port: os.PathLike=..., *,
exceptionOnFiledTX: bool=True, baudrate: int=9600):
super().__init__()
# A default logger, this gets overwritten by changePort() later,
# this is just to keep typehints working mostly.
self._log = logging.getLogger('WHSPAH-API.???')
@ -76,8 +71,6 @@ class Transmitter:
self._runConnThread = False
self._connThread = Thread(target=self._connThreadFunc)
self._stateCallbacks = []
self._state = STATES.DISCONNECTED
if self._port is ...:
ports = utils.scanForValidPorts(baudrate=self._baud)
@ -133,25 +126,6 @@ class Transmitter:
except RuntimeError:
pass
def addStateCallback(self, callback: callable):
"""
Adds a callback function for state changes.
All callbacks will be ran with the following positional arguments:
- Transmitter: The Transmitter object that called the callback.
- STATES: The new state of the transmitter.
"""
self._stateCallbacks.append(callback)
def _setState(self, newState: STATES):
"""
Updates the current state of the transmitter and runs all callbacks.
"""
self._state = newState
for callback in self._stateCallbacks:
Thread(target=callback, args=(self, newState)).start()
def _connThreadFunc(self):
"""
This gets ran as a thread to keep a persistant connection with the WHSPAH device.
@ -308,6 +282,16 @@ class Transmitter:
self._ser = None
raise exceptions.TransmitError(self._port) from e
def addShocker(self, *args, **kwargs) -> Shocker:
"""
Creates a new Shocker object with this transmitter as the shockers assigned transmitter.
For args, see whspah.Shocker.
"""
kwargs.update({'transmitter': self})
shocker = Shocker(*args, **kwargs)
return shocker
@property
def state(self):
"""

View File

@ -0,0 +1,83 @@
"""
Holds meta classes.
This is used if you want to create a custom transmitters / shockers that still nicely integrate
with W.H.S.P.A.H.
"""
from threading import Thread
from .constants import STATES
class MetaShocker:
def __init__(self):
pass
def shock(self, intensity: int):
"""
Sends a shock command to this shocker.
"""
raise NotImplementedError()
def vibrate(self, intensity: int):
"""
Sends a vibrate command to this shocker.
"""
raise NotImplementedError()
def beep(self):
"""
Sends a beep command to this shocker.
"""
raise NotImplementedError()
def toggleLight(self):
"""
Toggles the light of this shocker.
"""
raise NotImplementedError()
class MetaTransmitter:
def __init__(self):
self._stateCallbacks = []
self._state = STATES.DISCONNECTED
def addStateCallback(self, callback: callable):
"""
Adds a callback function for state changes.
All callbacks will be ran with the following positional arguments:
- Transmitter: The Transmitter object that called the callback.
- STATES: The new state of the transmitter.
"""
self._stateCallbacks.append(callback)
@property
def state(self):
"""
Returns the current state of the transmitter.
"""
return self._state
def _setState(self, newState: STATES):
"""
Updates the current state of the transmitter and runs all callbacks.
"""
self._state = newState
for callback in self._stateCallbacks:
Thread(target=callback, args=(self, newState)).start()
@property
def ready(self):
"""
Returns True if the transmitter is ready.
"""
return self._state == STATES.READY

View File

@ -0,0 +1,58 @@
"""
Holds the Shocker object.
"""
from .constants import MODES
from .metaClasses import MetaShocker
class Shocker(MetaShocker):
"""
"""
def __init__(self, transmitterID: int, channel: int, *,
lucalEncoded: bool=False, transmitter: 'whspah.Transmitter'=None):
self.transmitterID = transmitterID
self.channel = channel
self.transmitter = transmitter
self.lucalEncoded = lucalEncoded
self._lastLightIntensity = 0
def _transmit(self, mode: MODES, intensity: int=0):
"""
Transmits using this shockers transmitter ID, channel and lucal setting
with the assigned transmitter.
"""
self.transmitter.transmit(self.transmitterID, self.channel, mode,
intensity, self.lucalEncoded)
def shock(self, intensity: int):
"""
Sends a shock command to this shocker.
"""
self._transmit(MODES.SHOCK, intensity)
def vibrate(self, intensity: int):
"""
Sends a vibrate command to this shocker.
"""
self._transmit(MODES.VIBRATE, intensity)
def beep(self):
"""
Sends a beep command to this shocker.
"""
self._transmit(MODES.BEEP, self._lastLightIntensity)
def toggleLight(self):
"""
Toggles the light of this shocker.
"""
self._lastLightIntensity += 1
self._lastLightIntensity %= 100
self._transmit(MODES.LIGHT, self._lastLightIntensity)