""" An API for interacting with the VRChat camera through OSC. """ from threading import Thread import platform import time import os from pythonosc.dispatcher import Dispatcher from watchdog.observers import Observer from pythonosc import udp_client import watchdog.events class NewPictureHandler(watchdog.events.FileSystemEventHandler): """ Watches for new pictures in the VRChat camera folder. When a new picture is detected, callback(path, timeGap) is called. path is the exact path of the new image, timeGap is a float that says how many seconds its been since the last picture. """ def __init__(self, callback: callable): self._lastPicture = 0 self._callback = callback super().__init__() def on_created(self, event: watchdog.events.FileSystemEvent) -> None: # Only listen to FileCreatedEvent if not isinstance(event, watchdog.events.FileCreatedEvent): return # Ignore multi layer images for layer in ['_Environment', '_Player', '_UI']: if layer in os.path.basename(event.src_path): return timeGap = time.perf_counter() - self._lastPicture Thread(target=self._callback, args=(event.src_path, timeGap)).start() self._lastPicture = time.perf_counter() def createNewPictureObserver(callback: callable) -> Observer: """ Creates an Observer that watches the VRChat pictures folder and calls callback if a new picture was found. """ if os.environ.get('VRCCO_PICTURES_DIR', None) is not None: path = os.environ.get('VRCCO_PICTURES_DIR') elif platform.system() == 'Linux': path = '~/.local/share/Steam/steamapps/compatdata/438100/pfx' path += '/drive_c/users/steamuser/Pictures/VRChat/' elif platform.system() == 'Windows': path = '~/Pictures/VRChat/' else: path = None if path is not None: path = os.path.expanduser(path) if not os.path.isdir(path): path = None if path is None: print('WARNING: Could not find VRChat pictures folder.'+ 'Please specify one with the VRC_PICTURES_DIR environment variable.\n'+ 'Multi-shot will not work until this is resolved.') return None observer = Observer() observer.schedule(NewPictureHandler(callback), path, recursive=True) observer.start() return observer class Camera: """ Handles OSC messages to and from VRChat. """ def __init__(self, dispatcher: Dispatcher, oscClient: udp_client.SimpleUDPClient, onCameraEnabled: callable=None): self.oscClient = oscClient self.additionalPictures = 0 # How many additional pictures to take self.addPicsMinGap = 0 # How long to wait between multi-shot batches self._cameraEnabled = False self._newPictureObserver = createNewPictureObserver(self._newPicture) self._multiPictureOngoing = False self._onEnabledCallback = onCameraEnabled dispatcher.map('/usercamera/Mode', self._onModeChange) def close(self): """ Safely closes the picture watcher, and the connection to VRChat. """ if self._newPictureObserver is not None: self._newPictureObserver.stop() self._newPictureObserver.join() def _newPicture(self, _, timeGap: float): if self._multiPictureOngoing: return if timeGap < self.addPicsMinGap: return if self.additionalPictures == 0: return self._multiPictureOngoing = True for _ in range(self.additionalPictures): time.sleep(1.1) self.oscClient.send_message('/usercamera/Capture', True) time.sleep(0.5) self._multiPictureOngoing = False def _onModeChange(self, _: str, mode: int): """ Gets called every time the camera mode changes in VRChat. """ self.cameraEnabled = mode != 0 def _onCameraEnabled(self): if self._onEnabledCallback is not None: self._onEnabledCallback(self) @property def cameraEnabled(self) -> bool: """ Returns True if the camera is enabled """ return self._cameraEnabled @cameraEnabled.setter def cameraEnabled(self, value: bool): if value == self._cameraEnabled: return self._cameraEnabled = value if value: self._onCameraEnabled()