Files
VRCCO/camera.py
2026-06-04 17:32:08 +01:00

245 lines
8.2 KiB
Python

"""
An API for interacting with the VRChat camera through OSC.
"""
from threading import Thread
import platform
import time
import os
from pythonosc.dispatcher import Dispatcher
from watchdog.observers import Observer
from pythonosc import udp_client
import watchdog.events
_ADDR_TO_MASK = {
'/usercamera/ShowUIInCamera': 'UI',
'/usercamera/LocalPlayer': 'LOCAL_PLAYER',
'/usercamera/RemotePlayer': 'REMOTE_PLAYER',
'/usercamera/Environment': 'ENVIRONMENT',
}
_MASK_TO_ADDR = {value: key for key, value in _ADDR_TO_MASK.items()}
class NewPictureHandler(watchdog.events.FileSystemEventHandler):
"""
Watches for new pictures in the VRChat camera folder.
When a new picture is detected, callback(path, timeGap) is called.
path is the exact path of the new image,
timeGap is a float that says how many seconds its been since the last picture.
"""
def __init__(self, callback: callable):
self._lastPicture = 0
self._callback = callback
self.lastClosedType = None
self.lastClosedTime = 0
super().__init__()
def on_created(self, event: watchdog.events.FileSystemEvent) -> None:
# Only listen to FileCreatedEvent
if not isinstance(event, watchdog.events.FileCreatedEvent):
return
# Update last closed (can't use on_closed(), see #4).
exExt = '.'.join(event.src_path.split('.')[:-1])
if exExt.endswith('_Environment'):
self.lastClosedType = 'ENVIRONMENT'
elif exExt.endswith('_Player'):
self.lastClosedType = 'PLAYER'
elif exExt.endswith('_UI'):
self.lastClosedType = 'UI'
else:
self.lastClosedType = 'COMBINED'
self.lastClosedTime = time.perf_counter()
# Ignore multi layer images
for layer in ['_Environment', '_Player', '_UI']:
if layer in os.path.basename(event.src_path):
return
timeGap = time.perf_counter() - self._lastPicture
Thread(target=self._callback, args=(event.src_path, timeGap)).start()
self._lastPicture = time.perf_counter()
def createNewPictureObserver(callback: callable) -> Observer:
"""
Creates an Observer that watches the VRChat pictures folder and calls callback
if a new picture was found.
"""
if os.environ.get('VRCCO_PICTURES_DIR', None) is not None:
path = os.environ.get('VRCCO_PICTURES_DIR')
elif platform.system() == 'Linux':
path = '~/.local/share/Steam/steamapps/compatdata/438100/pfx'
path += '/drive_c/users/steamuser/Pictures/VRChat/'
elif platform.system() == 'Windows':
path = '~/Pictures/VRChat/'
else:
path = None
if path is not None:
path = os.path.expanduser(path)
if not os.path.isdir(path):
path = None
if path is None:
print('WARNING: Could not find VRChat pictures folder.'+
'Please specify one with the VRC_PICTURES_DIR environment variable.\n'+
'Multi-shot will not work until this is resolved.')
return None
observer = Observer()
handler = NewPictureHandler(callback)
observer.schedule(handler, path, recursive=True)
observer.start()
return handler, observer
class Camera:
"""
Handles OSC messages to and from VRChat.
"""
def __init__(self, dispatcher: Dispatcher, oscClient: udp_client.SimpleUDPClient,
onCameraEnabled: callable=None):
self.oscClient = oscClient
self.additionalPictures = 0 # How many additional pictures to take
self.lastPicture = 0
self._lastMode = 0
self._newPicHandler, self._newPicObserver = createNewPictureObserver(self._newPicture)
self._multiPictureOngoing = False
self._onEnabledCallback = onCameraEnabled
self._masks = {'UI': False}
dispatcher.map('/usercamera/Mode', self._onModeChange)
# Map all mask addresses to self._onMaskChange
# and populate any missing mask IDs in self._masks with True.
for address, maskID in _ADDR_TO_MASK.items():
dispatcher.map(address, self._onMaskChange)
if maskID not in self._masks:
self._masks.update({maskID: True})
def close(self):
"""
Safely closes the picture watcher, and the connection to VRChat.
"""
if self._newPicObserver is not None:
self._newPicObserver.stop()
self._newPicObserver.join()
def _waitForCameraReady(self, timeout: float=7.5):
"""
Waits until the camera is considered to be "ready" again.
This is done by checking self._newPicHandler's last closed flags.
This function will also raise a TimeoutError if it has taken more than
`timeout` seconds to get the expected close event.
"""
# If the camera is not in multilayer mode, use COMBINED as the last type.
if self._lastMode != 4:
lastType = 'COMBINED'
# Otherwise, check for the UI flag.
elif self._masks['UI']:
lastType = 'UI'
# If none of the above are true, the only other outcome is PLAYER. See #2 for more details.
else:
lastType = 'PLAYER'
# Start a timer for the timeout.
st = time.perf_counter()
# Spin until we're looking at relevant close events with a 0.05s margin for error.
while self._newPicHandler.lastClosedTime < self.lastPicture - 0.05 and\
time.perf_counter() - st < timeout:
time.sleep(0.01)
# Spin until the desired picture has been reached.
while self._newPicHandler.lastClosedType != lastType and\
time.perf_counter() - st < timeout:
time.sleep(0.01)
# If the timeout was exceeded, raise TimeoutError.
if time.perf_counter() - st > timeout:
raise TimeoutError()
def _newPicture(self, *_):
self.lastPicture = time.perf_counter()
if self._multiPictureOngoing:
return
# Don't perform multishot in print mode
if self._lastMode == 5:
return
if self.additionalPictures == 0:
return
self._multiPictureOngoing = True
try:
for _ in range(self.additionalPictures):
# Wait until the camera is ready
self._waitForCameraReady()
time.sleep(0.75)
# Check the camera mode, and see if it's disabled
if self._lastMode == 0:
# If so, break out of the loop.
print('Camera disabled. Bailing out.')
break
# Take a new picture
self.oscClient.send_message('/usercamera/Capture', True)
# Reset the lastPicture time, the set that happens at the start of this
# function is not fast enough and causes race conditions.
self.lastPicture = time.perf_counter()
# Wait for the camera to be ready one last time to prevent a cycle
# but only if the camera is still enabled.
if self._lastMode != 0:
self._waitForCameraReady()
except TimeoutError:
print('WARNING: Timeout occured during multishot. Bailing out.')
time.sleep(0.5)
self._multiPictureOngoing = False
def _onModeChange(self, _: str, newMode: int):
"""
Gets called every time the camera mode changes in VRChat.
"""
enabledFlag = self._lastMode == 0 and newMode != 0
self._lastMode = newMode
if enabledFlag:
self._onCameraEnabled()
def _onMaskChange(self, address: str, value: bool):
"""
Gets called when any mask is changed.
"""
mask = _ADDR_TO_MASK[address]
self._masks[mask] = value
def _onCameraEnabled(self):
# Ensure the VRChat camera's masks match our own
for maskID, value in self._masks.items():
address = _MASK_TO_ADDR[maskID]
self.oscClient.send_message(address, value)
if self._onEnabledCallback is not None:
self._onEnabledCallback(self)
@property
def cameraEnabled(self) -> bool:
"""
Returns True if the camera is enabled
"""
return self._lastMode != 0