Files
VRCCO/camera.py

151 lines
4.4 KiB
Python

"""
An API for interacting with the VRChat camera through OSC.
"""
from threading import Thread
import platform
import time
import os
from pythonosc.dispatcher import Dispatcher
from watchdog.observers import Observer
from pythonosc import udp_client
import watchdog.events
class NewPictureHandler(watchdog.events.FileSystemEventHandler):
"""
Watches for new pictures in the VRChat camera folder.
When a new picture is detected, callback(path, timeGap) is called.
path is the exact path of the new image,
timeGap is a float that says how many seconds its been since the last picture.
"""
def __init__(self, callback: callable):
self._lastPicture = 0
self._callback = callback
super().__init__()
def on_created(self, event: watchdog.events.FileSystemEvent) -> None:
# Only listen to FileCreatedEvent
if not isinstance(event, watchdog.events.FileCreatedEvent):
return
# Ignore multi layer images
for layer in ['_Environment', '_Player', '_UI']:
if layer in os.path.basename(event.src_path):
return
timeGap = time.perf_counter() - self._lastPicture
Thread(target=self._callback, args=(event.src_path, timeGap)).start()
self._lastPicture = time.perf_counter()
def createNewPictureObserver(callback: callable) -> Observer:
"""
Creates an Observer that watches the VRChat pictures folder and calls callback
if a new picture was found.
"""
if os.environ.get('VRC_PICTURES_DIR', None) is not None:
path = os.environ.get('VRC_PICTURES_DIR')
elif platform.system() == 'Linux':
path = '~/.local/share/Steam/steamapps/compatdata/438100/pfx'
path += '/drive_c/users/steamuser/Pictures/VRChat/'
elif platform.system() == 'Windows':
path = '~/Pictures/VRChat/'
else:
path = None
path = None
if path is not None:
path = os.path.expanduser(path)
if not os.path.isdir(path):
path = None
if path is None:
print('WARNING: Could not find VRChat pictures folder.'+
'Please specify one with the VRC_PICTURES_DIR environment variable.\n'+
'Multi-shot will not work until this is resolved.')
return None
observer = Observer()
observer.schedule(NewPictureHandler(callback), path, recursive=True)
observer.start()
return observer
class Camera:
"""
Handles OSC messages to and from VRChat.
"""
def __init__(self, dispatcher: Dispatcher, oscClient: udp_client.SimpleUDPClient,
onCameraEnabled: callable=None):
self.oscClient = oscClient
self.additionalPictures = 0 # How many additional pictures to take
self.addPicsMinGap = 0 # How long to wait between multi-shot batches
self._cameraEnabled = False
self._newPictureObserver = createNewPictureObserver(self._newPicture)
self._multiPictureOngoing = False
self._onEnabledCallback = onCameraEnabled
dispatcher.map('/usercamera/Mode', self._onModeChange)
def close(self):
"""
Safely closes the picture watcher, and the connection to VRChat.
"""
if self._newPictureObserver is not None:
self._newPictureObserver.stop()
self._newPictureObserver.join()
def _newPicture(self, _, timeGap: float):
if self._multiPictureOngoing:
return
if timeGap < self.addPicsMinGap:
return
if self.additionalPictures == 0:
return
self._multiPictureOngoing = True
for _ in range(self.additionalPictures):
time.sleep(1.1)
self.oscClient.send_message('/usercamera/Capture', True)
time.sleep(0.5)
self._multiPictureOngoing = False
def _onModeChange(self, _: str, mode: int):
"""
Gets called every time the camera mode changes in VRChat.
"""
self.cameraEnabled = mode != 0
def _onCameraEnabled(self):
if self._onEnabledCallback is not None:
self._onEnabledCallback(self)
@property
def cameraEnabled(self) -> bool:
"""
Returns True if the camera is enabled
"""
return self._cameraEnabled
@cameraEnabled.setter
def cameraEnabled(self, value: bool):
if value == self._cameraEnabled:
return
self._cameraEnabled = value
if value:
self._onCameraEnabled()