""" An API for interacting with the VRChat camera through OSC. """ from threading import Thread import platform import time import os from pythonosc.dispatcher import Dispatcher from watchdog.observers import Observer from pythonosc import udp_client import watchdog.events _ADDR_TO_MASK = { '/usercamera/ShowUIInCamera': 'UI', '/usercamera/LocalPlayer': 'LOCAL_PLAYER', '/usercamera/RemotePlayer': 'REMOTE_PLAYER', '/usercamera/Environment': 'ENVIRONMENT', } _MASK_TO_ADDR = {value: key for key, value in _ADDR_TO_MASK.items()} class NewPictureHandler(watchdog.events.FileSystemEventHandler): """ Watches for new pictures in the VRChat camera folder. When a new picture is detected, callback(path, timeGap) is called. path is the exact path of the new image, timeGap is a float that says how many seconds its been since the last picture. """ def __init__(self, callback: callable): self._lastPicture = 0 self._callback = callback super().__init__() def on_created(self, event: watchdog.events.FileSystemEvent) -> None: # Only listen to FileCreatedEvent if not isinstance(event, watchdog.events.FileCreatedEvent): return # Ignore multi layer images for layer in ['_Environment', '_Player', '_UI']: if layer in os.path.basename(event.src_path): return timeGap = time.perf_counter() - self._lastPicture Thread(target=self._callback, args=(event.src_path, timeGap)).start() self._lastPicture = time.perf_counter() def createNewPictureObserver(callback: callable) -> Observer: """ Creates an Observer that watches the VRChat pictures folder and calls callback if a new picture was found. """ if os.environ.get('VRCCO_PICTURES_DIR', None) is not None: path = os.environ.get('VRCCO_PICTURES_DIR') elif platform.system() == 'Linux': path = '~/.local/share/Steam/steamapps/compatdata/438100/pfx' path += '/drive_c/users/steamuser/Pictures/VRChat/' elif platform.system() == 'Windows': path = '~/Pictures/VRChat/' else: path = None if path is not None: path = os.path.expanduser(path) if not os.path.isdir(path): path = None if path is None: print('WARNING: Could not find VRChat pictures folder.'+ 'Please specify one with the VRC_PICTURES_DIR environment variable.\n'+ 'Multi-shot will not work until this is resolved.') return None observer = Observer() observer.schedule(NewPictureHandler(callback), path, recursive=True) observer.start() return observer class Camera: """ Handles OSC messages to and from VRChat. """ def __init__(self, dispatcher: Dispatcher, oscClient: udp_client.SimpleUDPClient, onCameraEnabled: callable=None): self.oscClient = oscClient self.additionalPictures = 0 # How many additional pictures to take self.addPicsMinGap = 0 # How long to wait between multi-shot batches self._lastMode = 0 self._newPictureObserver = createNewPictureObserver(self._newPicture) self._multiPictureOngoing = False self._onEnabledCallback = onCameraEnabled self._masks = {'UI': False} dispatcher.map('/usercamera/Mode', self._onModeChange) # Map all mask addresses to self._onMaskChange # and populate any missing mask IDs in self._masks with True. for address, maskID in _ADDR_TO_MASK.items(): dispatcher.map(address, self._onMaskChange) if maskID not in self._masks: self._masks.update({maskID: True}) def close(self): """ Safely closes the picture watcher, and the connection to VRChat. """ if self._newPictureObserver is not None: self._newPictureObserver.stop() self._newPictureObserver.join() def _newPicture(self, _, timeGap: float): if self._multiPictureOngoing: return # Don't perform multishot in print mode if self._lastMode == 5: return if timeGap < self.addPicsMinGap: return if self.additionalPictures == 0: return self._multiPictureOngoing = True for _ in range(self.additionalPictures): time.sleep(1.1) self.oscClient.send_message('/usercamera/Capture', True) time.sleep(0.5) self._multiPictureOngoing = False def _onModeChange(self, _: str, newMode: int): """ Gets called every time the camera mode changes in VRChat. """ enabledFlag = self._lastMode == 0 and newMode != 0 self._lastMode = newMode if enabledFlag: self._onCameraEnabled() def _onMaskChange(self, address: str, value: bool): """ Gets called when any mask is changed. """ mask = _ADDR_TO_MASK[address] self._masks[mask] = value def _onCameraEnabled(self): # Ensure the VRChat camera's masks match our own for maskID, value in self._masks.items(): address = _MASK_TO_ADDR[maskID] self.oscClient.send_message(address, value) if self._onEnabledCallback is not None: self._onEnabledCallback(self) @property def cameraEnabled(self) -> bool: """ Returns True if the camera is enabled """ return self._lastMode != 0