""" An API for interacting with the VRChat camera through OSC. """ from threading import Thread import platform import time import os from pythonosc.dispatcher import Dispatcher from watchdog.observers import Observer from pythonosc import udp_client import watchdog.events _ADDR_TO_MASK = { '/usercamera/ShowUIInCamera': 'UI', '/usercamera/LocalPlayer': 'LOCAL_PLAYER', '/usercamera/RemotePlayer': 'REMOTE_PLAYER', '/usercamera/Environment': 'ENVIRONMENT', } _MASK_TO_ADDR = {value: key for key, value in _ADDR_TO_MASK.items()} class NewPictureHandler(watchdog.events.FileSystemEventHandler): """ Watches for new pictures in the VRChat camera folder. When a new picture is detected, callback(path, timeGap) is called. path is the exact path of the new image, timeGap is a float that says how many seconds its been since the last picture. """ def __init__(self, callback: callable): self._lastPicture = 0 self._callback = callback self.lastClosedType = None self.lastClosedTime = 0 super().__init__() def on_created(self, event: watchdog.events.FileSystemEvent) -> None: # Only listen to FileCreatedEvent if not isinstance(event, watchdog.events.FileCreatedEvent): return # Update last closed (can't use on_closed(), see #4). exExt = '.'.join(event.src_path.split('.')[:-1]) if exExt.endswith('_Environment'): self.lastClosedType = 'ENVIRONMENT' elif exExt.endswith('_Player'): self.lastClosedType = 'PLAYER' elif exExt.endswith('_UI'): self.lastClosedType = 'UI' else: self.lastClosedType = 'COMBINED' self.lastClosedTime = time.perf_counter() # Ignore multi layer images for layer in ['_Environment', '_Player', '_UI']: if layer in os.path.basename(event.src_path): return timeGap = time.perf_counter() - self._lastPicture Thread(target=self._callback, args=(event.src_path, timeGap)).start() self._lastPicture = time.perf_counter() def createNewPictureObserver(callback: callable) -> Observer: """ Creates an Observer that watches the VRChat pictures folder and calls callback if a new picture was found. """ if os.environ.get('VRCCO_PICTURES_DIR', None) is not None: path = os.environ.get('VRCCO_PICTURES_DIR') elif platform.system() == 'Linux': path = '~/.local/share/Steam/steamapps/compatdata/438100/pfx' path += '/drive_c/users/steamuser/Pictures/VRChat/' elif platform.system() == 'Windows': path = '~/Pictures/VRChat/' else: path = None if path is not None: path = os.path.expanduser(path) if not os.path.isdir(path): path = None if path is None: print('WARNING: Could not find VRChat pictures folder.'+ 'Please specify one with the VRC_PICTURES_DIR environment variable.\n'+ 'Multi-shot will not work until this is resolved.') return None observer = Observer() handler = NewPictureHandler(callback) observer.schedule(handler, path, recursive=True) observer.start() return handler, observer class Camera: """ Handles OSC messages to and from VRChat. """ def __init__(self, dispatcher: Dispatcher, oscClient: udp_client.SimpleUDPClient, onCameraEnabled: callable=None): self.oscClient = oscClient self.additionalPictures = 0 # How many additional pictures to take self.lastPicture = 0 self._lastMode = 0 self._newPicHandler, self._newPicObserver = createNewPictureObserver(self._newPicture) self._multiPictureOngoing = False self._onEnabledCallback = onCameraEnabled self._masks = {'UI': False} dispatcher.map('/usercamera/Mode', self._onModeChange) # Map all mask addresses to self._onMaskChange # and populate any missing mask IDs in self._masks with True. for address, maskID in _ADDR_TO_MASK.items(): dispatcher.map(address, self._onMaskChange) if maskID not in self._masks: self._masks.update({maskID: True}) def close(self): """ Safely closes the picture watcher, and the connection to VRChat. """ if self._newPicObserver is not None: self._newPicObserver.stop() self._newPicObserver.join() def _waitForCameraReady(self, timeout: float=7.5): """ Waits until the camera is considered to be "ready" again. This is done by checking self._newPicHandler's last closed flags. This function will also raise a TimeoutError if it has taken more than `timeout` seconds to get the expected close event. """ # If the camera is not in multilayer mode, use COMBINED as the last type. if self._lastMode != 4: lastType = 'COMBINED' # Otherwise, check for the UI flag. elif self._masks['UI']: lastType = 'UI' # If none of the above are true, the only other outcome is PLAYER. See #2 for more details. else: lastType = 'PLAYER' # Start a timer for the timeout. st = time.perf_counter() # Spin until we're looking at relevant close events with a 0.05s margin for error. while self._newPicHandler.lastClosedTime < self.lastPicture - 0.05 and\ time.perf_counter() - st < timeout: time.sleep(0.01) # Spin until the desired picture has been reached. while self._newPicHandler.lastClosedType != lastType and\ time.perf_counter() - st < timeout: time.sleep(0.01) # If the timeout was exceeded, raise TimeoutError. if time.perf_counter() - st > timeout: raise TimeoutError() def _newPicture(self, _, timeGap: float): self.lastPicture = time.perf_counter() if self._multiPictureOngoing: return # Don't perform multishot in print mode if self._lastMode == 5: return if self.additionalPictures == 0: return self._multiPictureOngoing = True try: for _ in range(self.additionalPictures): # Wait until the camera is ready self._waitForCameraReady() time.sleep(0.75) # Take a new picture self.oscClient.send_message('/usercamera/Capture', True) # Reset the lastPicture time, the set that happens at the start of this # function is not fast enough and causes race conditions. self.lastPicture = time.perf_counter() # Wait for the camera to be ready one last time to prevent a cycle self._waitForCameraReady() except TimeoutError: print('WARNING: Timeout occured during multishot. Bailing out.') time.sleep(0.5) self._multiPictureOngoing = False def _onModeChange(self, _: str, newMode: int): """ Gets called every time the camera mode changes in VRChat. """ enabledFlag = self._lastMode == 0 and newMode != 0 self._lastMode = newMode if enabledFlag: self._onCameraEnabled() def _onMaskChange(self, address: str, value: bool): """ Gets called when any mask is changed. """ mask = _ADDR_TO_MASK[address] self._masks[mask] = value def _onCameraEnabled(self): # Ensure the VRChat camera's masks match our own for maskID, value in self._masks.items(): address = _MASK_TO_ADDR[maskID] self.oscClient.send_message(address, value) if self._onEnabledCallback is not None: self._onEnabledCallback(self) @property def cameraEnabled(self) -> bool: """ Returns True if the camera is enabled """ return self._lastMode != 0