Implemented basic headpat firmware
This commit is contained in:
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
python-osc
|
||||||
|
pyserial
|
||||||
71
src/Python/main.py
Normal file
71
src/Python/main.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
"""
|
||||||
|
Basic arse driver / software that just does the haptics.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from threading import Thread
|
||||||
|
import argparse
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
from pythonosc.dispatcher import Dispatcher
|
||||||
|
from pythonosc import osc_server
|
||||||
|
import serial
|
||||||
|
|
||||||
|
class Device:
|
||||||
|
"""
|
||||||
|
Holds the serial port and opccasionally updates it
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, port: os.PathLike, refreshRate: int=30):
|
||||||
|
self._run = True
|
||||||
|
self._ser = serial.Serial(port, 9600)
|
||||||
|
self.areas = {'Fwd': 0, 'Mid': 0, 'Bck': 0}
|
||||||
|
self._updateThread = Thread(target=self._update)
|
||||||
|
self.refreshRate = refreshRate
|
||||||
|
self.intensity = 1
|
||||||
|
|
||||||
|
self._updateThread.start()
|
||||||
|
|
||||||
|
def updateAreasFromOSC(self, addr: str, value: float):
|
||||||
|
"""
|
||||||
|
Updates the areas variable based on an OSC message.
|
||||||
|
"""
|
||||||
|
|
||||||
|
area = addr.split('/')[-1]
|
||||||
|
self.areas[area] = round(value*255)
|
||||||
|
|
||||||
|
def _update(self):
|
||||||
|
while self._run:
|
||||||
|
values = self.areas.values()
|
||||||
|
values = [round(v * self.intensity) for v in values]
|
||||||
|
self._ser.write(struct.pack('<BBBB', 2, *values))
|
||||||
|
time.sleep(1 / self.refreshRate)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
Closes the serial port.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._run = False
|
||||||
|
self._updateThread.join()
|
||||||
|
self._ser.close()
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='Basic headpat haptics')
|
||||||
|
parser.add_argument('serialPort', type=os.PathLike)
|
||||||
|
parser.add_argument('--oscRXPort', type=int, default=9001)
|
||||||
|
parser.add_argument('--intensity', type=float, default=1,
|
||||||
|
help='Intensity multiplier (0.0-1.0).')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
dev = Device(args.serialPort)
|
||||||
|
dev.intensity = args.intensity
|
||||||
|
|
||||||
|
dispatcher = Dispatcher()
|
||||||
|
dispatcher.map('/avatar/parameters/HeadpatHap/*', dev.updateAreasFromOSC)
|
||||||
|
|
||||||
|
serverIP = os.environ.get('VRCCO_OSCServerBind', '127.0.0.1')
|
||||||
|
serverPort = int(os.environ.get('VRCCO_OSCServerPort', args.oscRXPort))
|
||||||
|
server = osc_server.ThreadingOSCUDPServer((serverIP, serverPort), dispatcher, timeout=2.5)
|
||||||
|
|
||||||
|
server.serve_forever()
|
||||||
Reference in New Issue
Block a user