238 lines
7.9 KiB
Python
238 lines
7.9 KiB
Python
"""
|
|
An API for interacting with the VRChat camera through OSC.
|
|
"""
|
|
|
|
from threading import Thread
|
|
import platform
|
|
import time
|
|
import os
|
|
|
|
from pythonosc.dispatcher import Dispatcher
|
|
from watchdog.observers import Observer
|
|
from pythonosc import udp_client
|
|
import watchdog.events
|
|
|
|
_ADDR_TO_MASK = {
|
|
'/usercamera/ShowUIInCamera': 'UI',
|
|
'/usercamera/LocalPlayer': 'LOCAL_PLAYER',
|
|
'/usercamera/RemotePlayer': 'REMOTE_PLAYER',
|
|
'/usercamera/Environment': 'ENVIRONMENT',
|
|
}
|
|
|
|
_MASK_TO_ADDR = {value: key for key, value in _ADDR_TO_MASK.items()}
|
|
|
|
class NewPictureHandler(watchdog.events.FileSystemEventHandler):
|
|
"""
|
|
Watches for new pictures in the VRChat camera folder.
|
|
When a new picture is detected, callback(path, timeGap) is called.
|
|
path is the exact path of the new image,
|
|
timeGap is a float that says how many seconds its been since the last picture.
|
|
"""
|
|
|
|
def __init__(self, callback: callable):
|
|
self._lastPicture = 0
|
|
self._callback = callback
|
|
self.lastClosedType = None
|
|
self.lastClosedTime = 0
|
|
super().__init__()
|
|
|
|
def on_created(self, event: watchdog.events.FileSystemEvent) -> None:
|
|
# Only listen to FileCreatedEvent
|
|
if not isinstance(event, watchdog.events.FileCreatedEvent):
|
|
return
|
|
|
|
# Ignore multi layer images
|
|
for layer in ['_Environment', '_Player', '_UI']:
|
|
if layer in os.path.basename(event.src_path):
|
|
return
|
|
|
|
timeGap = time.perf_counter() - self._lastPicture
|
|
Thread(target=self._callback, args=(event.src_path, timeGap)).start()
|
|
self._lastPicture = time.perf_counter()
|
|
|
|
def on_closed(self, event: watchdog.events.FileSystemEvent) -> None:
|
|
exExt = '.'.join(event.src_path.split('.')[:-1])
|
|
|
|
if exExt.endswith('_Environment'):
|
|
self.lastClosedType = 'ENVIRONMENT'
|
|
elif exExt.endswith('_Player'):
|
|
self.lastClosedType = 'PLAYER'
|
|
elif exExt.endswith('_UI'):
|
|
self.lastClosedType = 'UI'
|
|
else:
|
|
self.lastClosedType = 'COMBINED'
|
|
|
|
self.lastClosedTime = time.perf_counter()
|
|
|
|
def createNewPictureObserver(callback: callable) -> Observer:
|
|
"""
|
|
Creates an Observer that watches the VRChat pictures folder and calls callback
|
|
if a new picture was found.
|
|
"""
|
|
|
|
if os.environ.get('VRCCO_PICTURES_DIR', None) is not None:
|
|
path = os.environ.get('VRCCO_PICTURES_DIR')
|
|
elif platform.system() == 'Linux':
|
|
path = '~/.local/share/Steam/steamapps/compatdata/438100/pfx'
|
|
path += '/drive_c/users/steamuser/Pictures/VRChat/'
|
|
elif platform.system() == 'Windows':
|
|
path = '~/Pictures/VRChat/'
|
|
else:
|
|
path = None
|
|
|
|
if path is not None:
|
|
path = os.path.expanduser(path)
|
|
|
|
if not os.path.isdir(path):
|
|
path = None
|
|
|
|
if path is None:
|
|
print('WARNING: Could not find VRChat pictures folder.'+
|
|
'Please specify one with the VRC_PICTURES_DIR environment variable.\n'+
|
|
'Multi-shot will not work until this is resolved.')
|
|
return None
|
|
|
|
observer = Observer()
|
|
handler = NewPictureHandler(callback)
|
|
observer.schedule(handler, path, recursive=True)
|
|
observer.start()
|
|
return handler, observer
|
|
|
|
|
|
|
|
class Camera:
|
|
"""
|
|
Handles OSC messages to and from VRChat.
|
|
"""
|
|
|
|
def __init__(self, dispatcher: Dispatcher, oscClient: udp_client.SimpleUDPClient,
|
|
onCameraEnabled: callable=None):
|
|
self.oscClient = oscClient
|
|
self.additionalPictures = 0 # How many additional pictures to take
|
|
self.lastPicture = 0
|
|
self._lastMode = 0
|
|
self._newPicHandler, self._newPicObserver = createNewPictureObserver(self._newPicture)
|
|
self._multiPictureOngoing = False
|
|
self._onEnabledCallback = onCameraEnabled
|
|
self._masks = {'UI': False}
|
|
|
|
dispatcher.map('/usercamera/Mode', self._onModeChange)
|
|
# Map all mask addresses to self._onMaskChange
|
|
# and populate any missing mask IDs in self._masks with True.
|
|
for address, maskID in _ADDR_TO_MASK.items():
|
|
dispatcher.map(address, self._onMaskChange)
|
|
if maskID not in self._masks:
|
|
self._masks.update({maskID: True})
|
|
|
|
def close(self):
|
|
"""
|
|
Safely closes the picture watcher, and the connection to VRChat.
|
|
"""
|
|
|
|
if self._newPicObserver is not None:
|
|
self._newPicObserver.stop()
|
|
self._newPicObserver.join()
|
|
|
|
def _waitForCameraReady(self, timeout: float=7.5):
|
|
"""
|
|
Waits until the camera is considered to be "ready" again.
|
|
This is done by checking self._newPicHandler's last closed flags.
|
|
This function will also raise a TimeoutError if it has taken more than
|
|
`timeout` seconds to get the expected close event.
|
|
"""
|
|
|
|
# If the camera is not in multilayer mode, use COMBINED as the last type.
|
|
if self._lastMode != 4:
|
|
lastType = 'COMBINED'
|
|
# Otherwise, check for the UI flag.
|
|
elif self._masks['UI']:
|
|
lastType = 'UI'
|
|
# If none of the above are true, the only other outcome is PLAYER. See #2 for more details.
|
|
else:
|
|
lastType = 'PLAYER'
|
|
|
|
# Start a timer for the timeout.
|
|
st = time.perf_counter()
|
|
|
|
# Spin until we're looking at relevant close events with a 0.05s margin for error.
|
|
while self._newPicHandler.lastClosedTime < self.lastPicture - 0.05 and\
|
|
time.perf_counter() - st < timeout:
|
|
time.sleep(0.01)
|
|
|
|
# Spin until the desired picture has been reached.
|
|
while self._newPicHandler.lastClosedType != lastType and\
|
|
time.perf_counter() - st < timeout:
|
|
time.sleep(0.01)
|
|
|
|
# If the timeout was exceeded, raise TimeoutError.
|
|
if time.perf_counter() - st > timeout:
|
|
raise TimeoutError()
|
|
|
|
def _newPicture(self, _, timeGap: float):
|
|
self.lastPicture = time.perf_counter()
|
|
|
|
if self._multiPictureOngoing:
|
|
return
|
|
|
|
# Don't perform multishot in print mode
|
|
if self._lastMode == 5:
|
|
return
|
|
|
|
if self.additionalPictures == 0:
|
|
return
|
|
|
|
self._multiPictureOngoing = True
|
|
|
|
try:
|
|
for _ in range(self.additionalPictures):
|
|
# Wait until the camera is ready
|
|
self._waitForCameraReady()
|
|
time.sleep(0.75)
|
|
# Take a new picture
|
|
self.oscClient.send_message('/usercamera/Capture', True)
|
|
# Reset the lastPicture time, the set that happens at the start of this
|
|
# function is not fast enough and causes race conditions.
|
|
self.lastPicture = time.perf_counter()
|
|
# Wait for the camera to be ready one last time to prevent a cycle
|
|
self._waitForCameraReady()
|
|
except TimeoutError:
|
|
print('WARNING: Timeout occured during multishot. Bailing out.')
|
|
|
|
time.sleep(0.5)
|
|
self._multiPictureOngoing = False
|
|
|
|
def _onModeChange(self, _: str, newMode: int):
|
|
"""
|
|
Gets called every time the camera mode changes in VRChat.
|
|
"""
|
|
|
|
enabledFlag = self._lastMode == 0 and newMode != 0
|
|
self._lastMode = newMode
|
|
if enabledFlag:
|
|
self._onCameraEnabled()
|
|
|
|
def _onMaskChange(self, address: str, value: bool):
|
|
"""
|
|
Gets called when any mask is changed.
|
|
"""
|
|
|
|
mask = _ADDR_TO_MASK[address]
|
|
self._masks[mask] = value
|
|
|
|
def _onCameraEnabled(self):
|
|
# Ensure the VRChat camera's masks match our own
|
|
for maskID, value in self._masks.items():
|
|
address = _MASK_TO_ADDR[maskID]
|
|
self.oscClient.send_message(address, value)
|
|
|
|
if self._onEnabledCallback is not None:
|
|
self._onEnabledCallback(self)
|
|
|
|
@property
|
|
def cameraEnabled(self) -> bool:
|
|
"""
|
|
Returns True if the camera is enabled
|
|
"""
|
|
|
|
return self._lastMode != 0
|