Compare commits

...

2 Commits

Author SHA1 Message Date
fc4c7a77e3 [API-PY] Added scanForValidPorts utility 2026-05-12 12:34:44 +01:00
94911c4239 [API-PY] Added NoWHSPAHDeviceFound exception 2026-05-12 12:33:56 +01:00
3 changed files with 82 additions and 0 deletions

View File

@ -0,0 +1,10 @@
"""
W.H.S.P.A.H is an entirely self-hosted,
local and free way to interact with CaiXianlin shock collars.
"""
from . import exceptions
from . import utils
__all__ = ['exceptions', 'utils']
__version__ = '1.0.0'

View File

@ -0,0 +1,12 @@
"""
Holds all WHSPAH exceptions.
"""
class NoWHSPAHDeviceFound(OSError):
"""
Gets raised when whspah.Device() is called with no port,
and no port could be automatically found.
"""
def __init__(self):
super().__init__('No WHSPAH device was found.')

View File

@ -0,0 +1,60 @@
"""
Contains helpful utilities relating to WHSPAH.
"""
from threading import Thread
import select
import time
import os
import serial
def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout: float) -> None:
"""
Gets called as a thread by _sanForValidPorts().
Attempts to connect to the port specified, then waits for "WHSPAH\n".
If a successful port was found, it will be closed, then appended to
resultList.
"""
ser = None
try:
# Start a timer
st = time.perf_counter()
# Connect to the serial port
ser = serial.Serial(port, baud, timeout=0)
# Select it to enforce timeouts
select.select([ser], [], [], timeout)
# Attempt to read "WHSPAH\n" from the port
while (ser.readline() != b'WHSPAH\n') and time.perf_counter() - st < timeout:
time.sleep(0.5)
# If all of the above completed before the timeout, add the port to the result list
if time.perf_counter() - st < timeout:
resultList.append(port)
except serial.serialutil.SerialException:
pass
finally:
if ser is not None:
ser.close()
def scanForValidPorts(baudrate: int=9600, timeout: float=2.5) -> list[os.PathLike]:
"""
Scans the system for serial devices calling out "WHSAPH\n".
"""
res = []
threads = []
for port in serial.tools.list_ports.comports():
thread = Thread(target=_testPortForWHSPAH, args=(res, port.device, baudrate, timeout))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return res
__all__ = ['scanForValidPorts']