Files
2026-06-03 18:59:09 +01:00

141 lines
4.5 KiB
Python

"""
A simple Flask application for interacting with WHSPAH shockers.
"""
import argparse
import logging
import random
import json
import os
from flask_limiter.util import get_remote_address
from flask_limiter import Limiter
import waitress.server
import flask
import whspah
# Used in transmit() to convert the `action` string to a WHSPAH MODE
ACTION_MAP = {'shock': whspah.MODES.SHOCK,
'vibrate': whspah.MODES.VIBRATE,
'beep': whspah.MODES.BEEP}
app = flask.Flask(__name__, static_folder='./www/')
limiter = Limiter(
get_remote_address,
app=app,
storage_uri="memory://",
)
@app.route('/', defaults={'path': 'index.html'})
@app.route('/<path:path>')
@limiter.exempt
def staticPage(path):
"""
Returns anything requested in the static folder.
"""
# Check for an index.html file in the specified path
if os.path.isfile(os.path.join(app.static_folder, path, 'index.html')):
# If one's found, append it to the path
path = os.path.join(path, 'index.html')
# Return the file
return flask.send_from_directory(app.static_folder, path)
@app.route('/transmit', methods=['POST'], strict_slashes=False)
@limiter.limit("1/second")
def transmit():
"""
Transmits the data contained within the POST request through WHSPAH.
"""
# Get the POST data and load it, assuming it's in a JSON format.
data = json.loads(flask.request.data)
# Try to get all required fields
try:
txID = int(data['transmitterID'])
channel = int(data['channel'])
action = ACTION_MAP[data['action']]
pin = int(data['shockerPin'])
intensity = int(data.get('intensity', 0))
lucal = bool(data.get('lucalEncoded', False))
# If any of those failed, return an error.
except (ValueError, KeyError):
return {'success': False, 'message': 'Request must contain the following keys:\n'+
'txID: int,\n'+
'channel: int,\n'+
'action: "shock", "vibrate" or "beep",\n'+
'pin: int,\n'+
'intensity (optional): int\n'+
'lucalEncoded (optional): bool'}, 400
if pin != app.config['pin']:
return {'success': False, 'message': 'Unauthorised'}, 401
if intensity > app.config['limit']:
return {'success': False, 'message': 'Exceeded Safe Limit'}, 406
# Send the data to WHSPAH
tx: whspah.Transmitter = app.config['transmitter']
tx.transmit(txID, channel, action, intensity, lucal)
# Return a success message
return {'success': True}, 200
@app.route('/limit', methods=['GET'], strict_slashes=False)
@limiter.limit("1/second")
def getLimit():
"""
Simply returns the safe limit.
"""
return {'success': True, 'limit': app.config['limit']}
if __name__ == '__main__':
# Parse console arguments
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('--ip', type=str, default='0.0.0.0')
parser.add_argument('--port', type=int, default=8000)
parser.add_argument('--pin', type=int, default=random.randint(1000, 9999),
help='The authorisation pin, defaults to a random number '+
'between 1000 and 9999')
parser.add_argument('--limit', type=int, default=99,
help='The same limit for intensity.')
args = parser.parse_args()
# Sets the logging version based on --debug
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# Connect to a WHSPAH device
app.config['transmitter'] = whspah.Transmitter()
# Configure pin
app.config['pin'] = args.pin
print(f'Running with pin {args.pin}.')
# Configure Limit
app.config['limit'] = args.limit
print(f'Running with safe limit {args.limit}.')
if args.debug:
# If in debug mode, run Flask's built in server
app.run(host=args.ip, port=args.port, debug=True)
else:
# Otherwise, use Waitress
server = waitress.server.create_server(app, host=args.ip, port=args.port)
print(f'Serving at http://{args.ip}:{args.port}/')
try:
server.run()
except KeyboardInterrupt:
pass
server.close()
app.config['transmitter'].close()