Compare commits
2 Commits
7a2a07e109
...
fc4c7a77e3
| Author | SHA1 | Date | |
|---|---|---|---|
|
fc4c7a77e3
|
|||
|
94911c4239
|
10
src/pythonAPI/whspah/__init__.py
Normal file
10
src/pythonAPI/whspah/__init__.py
Normal file
@ -0,0 +1,10 @@
|
||||
"""
|
||||
W.H.S.P.A.H is an entirely self-hosted,
|
||||
local and free way to interact with CaiXianlin shock collars.
|
||||
"""
|
||||
|
||||
from . import exceptions
|
||||
from . import utils
|
||||
|
||||
__all__ = ['exceptions', 'utils']
|
||||
__version__ = '1.0.0'
|
||||
12
src/pythonAPI/whspah/exceptions.py
Normal file
12
src/pythonAPI/whspah/exceptions.py
Normal file
@ -0,0 +1,12 @@
|
||||
"""
|
||||
Holds all WHSPAH exceptions.
|
||||
"""
|
||||
|
||||
class NoWHSPAHDeviceFound(OSError):
|
||||
"""
|
||||
Gets raised when whspah.Device() is called with no port,
|
||||
and no port could be automatically found.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('No WHSPAH device was found.')
|
||||
60
src/pythonAPI/whspah/utils.py
Normal file
60
src/pythonAPI/whspah/utils.py
Normal file
@ -0,0 +1,60 @@
|
||||
"""
|
||||
Contains helpful utilities relating to WHSPAH.
|
||||
"""
|
||||
|
||||
from threading import Thread
|
||||
import select
|
||||
import time
|
||||
import os
|
||||
|
||||
import serial
|
||||
|
||||
def _testPortForWHSPAH(resultList: list, port: os.PathLike, baud: int, timeout: float) -> None:
|
||||
"""
|
||||
Gets called as a thread by _sanForValidPorts().
|
||||
Attempts to connect to the port specified, then waits for "WHSPAH\n".
|
||||
If a successful port was found, it will be closed, then appended to
|
||||
resultList.
|
||||
"""
|
||||
|
||||
ser = None
|
||||
|
||||
try:
|
||||
# Start a timer
|
||||
st = time.perf_counter()
|
||||
# Connect to the serial port
|
||||
ser = serial.Serial(port, baud, timeout=0)
|
||||
# Select it to enforce timeouts
|
||||
select.select([ser], [], [], timeout)
|
||||
# Attempt to read "WHSPAH\n" from the port
|
||||
while (ser.readline() != b'WHSPAH\n') and time.perf_counter() - st < timeout:
|
||||
time.sleep(0.5)
|
||||
|
||||
# If all of the above completed before the timeout, add the port to the result list
|
||||
if time.perf_counter() - st < timeout:
|
||||
resultList.append(port)
|
||||
except serial.serialutil.SerialException:
|
||||
pass
|
||||
finally:
|
||||
if ser is not None:
|
||||
ser.close()
|
||||
|
||||
def scanForValidPorts(baudrate: int=9600, timeout: float=2.5) -> list[os.PathLike]:
|
||||
"""
|
||||
Scans the system for serial devices calling out "WHSAPH\n".
|
||||
"""
|
||||
|
||||
res = []
|
||||
threads = []
|
||||
|
||||
for port in serial.tools.list_ports.comports():
|
||||
thread = Thread(target=_testPortForWHSPAH, args=(res, port.device, baudrate, timeout))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
return res
|
||||
|
||||
__all__ = ['scanForValidPorts']
|
||||
Reference in New Issue
Block a user