155 lines
5.2 KiB
Python
155 lines
5.2 KiB
Python
import asyncio
|
|
DEBUG=False
|
|
try:
|
|
from winrt.windows.media.control import GlobalSystemMediaTransportControlsSessionManager as MediaManager
|
|
except:
|
|
print("DEBUG; winrt disabled")
|
|
DEBUG = True
|
|
import win32com.client
|
|
import time
|
|
import pythoncom
|
|
|
|
from resources import LineState, CLMgrMessage
|
|
|
|
import serial
|
|
from serial import SerialException
|
|
import session_event_listener as sel
|
|
import ctypes
|
|
|
|
portName = 'COM3'
|
|
|
|
global_inactive = True
|
|
|
|
# Lock Screen detection
|
|
user32 = ctypes.windll.User32
|
|
OpenDesktop = user32.OpenDesktopA
|
|
SwitchDesktop = user32.SwitchDesktop
|
|
DESKTOP_SWITCHDESKTOP = 0x0100
|
|
|
|
global_screen_locked = False
|
|
|
|
def check_screen_locked():
|
|
hDesktop = OpenDesktop ("default", 0, False, DESKTOP_SWITCHDESKTOP)
|
|
result = SwitchDesktop (hDesktop)
|
|
if result:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def sendSerial(byte):
|
|
try:
|
|
ser = serial.Serial(port=portName, baudrate=115200)
|
|
ser.write(byte)
|
|
except SerialException:
|
|
print('port already open')
|
|
|
|
class PhoneLineEventHandler():
|
|
phone_mgr = None
|
|
lines = []
|
|
line_selected = None
|
|
connected = False
|
|
|
|
def OnDispOnLineMgrNotification(self, msg, param, returns=""):
|
|
|
|
global global_inactive, global_screen_locked
|
|
if not self.phone_mgr:
|
|
self.phone_mgr = win32com.client.Dispatch("CLMgr.ClientLineMgr")
|
|
if not self.phone_mgr:
|
|
"Swyx not connected!"
|
|
else:
|
|
print("Swyx connected!")
|
|
if self.phone_mgr:
|
|
if msg == CLMgrMessage.CLMgrLineStateChangedMessage:
|
|
line = self.phone_mgr.DispGetLine(param)
|
|
print("Line: {} {}".format(
|
|
param,
|
|
LineState.s[line.DispState])
|
|
)
|
|
if line.DispState != LineState.Inactive:
|
|
if line.DispState == LineState.Ringing:
|
|
sendSerial(b'shc000255000')
|
|
else:
|
|
sendSerial(b'shc255000000')
|
|
|
|
if not global_screen_locked:
|
|
asyncio.run(try_pause())
|
|
global_inactive = False
|
|
for linenum in range(self.phone_mgr.DispNumberOfLines):
|
|
line = self.phone_mgr.DispGetLine(linenum)
|
|
if line.DispState != LineState.Inactive:
|
|
return True
|
|
|
|
if not global_screen_locked:
|
|
asyncio.run(try_play())
|
|
global_inactive = True
|
|
sendSerial(b'shc000000000')
|
|
return True
|
|
|
|
# async def get_media_info():
|
|
# sessions = await MediaManager.request_async()
|
|
|
|
# # This source_app_user_model_id check and if statement is optional
|
|
# # Use it if you want to only get a certain player/program's media
|
|
# # (e.g. only chrome.exe's media not any other program's).
|
|
|
|
# # To get the ID, use a breakpoint() to run sessions.get_current_session()
|
|
# # while the media you want to get is playing.
|
|
# # Then set TARGET_ID to the string this call returns.
|
|
|
|
# current_session = sessions.get_current_session()
|
|
# if current_session: # there needs to be a media session running
|
|
# #if current_session.source_app_user_model_id == TARGET_ID:
|
|
# info = await current_session.try_get_media_properties_async()
|
|
# # song_attr[0] != '_' ignores system attributes
|
|
# info_dict = {song_attr: info.__getattribute__(song_attr) for song_attr in dir(info) if song_attr[0] != '_'}
|
|
|
|
# # converts winrt vector to list
|
|
# info_dict['genres'] = list(info_dict['genres'])
|
|
|
|
# return info_dict
|
|
|
|
# # It could be possible to select a program from a list of current
|
|
# # available ones. I just haven't implemented this here for my use case.
|
|
# # See references for more information.
|
|
# raise Exception('TARGET_PROGRAM is not the current media session')
|
|
|
|
async def try_play():
|
|
print("try_play")
|
|
if not DEBUG:
|
|
sessions = await MediaManager.request_async()
|
|
current_session = sessions.get_current_session()
|
|
if current_session:
|
|
await current_session.try_play_async()
|
|
|
|
async def try_pause():
|
|
print("try_pause")
|
|
if not DEBUG:
|
|
sessions = await MediaManager.request_async()
|
|
current_session = sessions.get_current_session()
|
|
if current_session:
|
|
await current_session.try_pause_async()
|
|
|
|
def OnSessionEvent(event: sel.SessionEvent):
|
|
global global_inactive, global_screen_locked
|
|
if event == sel.SessionEvent.SESSION_LOCK:
|
|
global_screen_locked = True
|
|
print("locked, try_pause")
|
|
asyncio.run(try_pause())
|
|
|
|
if event == sel.SessionEvent.SESSION_UNLOCK:
|
|
global_screen_locked = False
|
|
if global_inactive:
|
|
print("unlocked, global inactive, tryplay")
|
|
asyncio.run(try_play())
|
|
|
|
if __name__ == '__main__':
|
|
win32com.client.WithEvents("CLMgr.ClientLineMgr", PhoneLineEventHandler)
|
|
m = sel.WorkstationMonitor()
|
|
m.register_handler(sel.SessionEvent.ANY, handler=OnSessionEvent)
|
|
try:
|
|
while True:
|
|
pythoncom.PumpWaitingMessages()
|
|
m.listen()
|
|
time.sleep(0.1) # Don't use up all our CPU checking constantly
|
|
except KeyboardInterrupt:
|
|
print("exit") |