Merge pull request #1147 from pypeclub/feature/idle_manager_with_separated_logic

Idle manager with separated logic
This commit is contained in:
Milan Kolar 2021-03-22 09:39:23 +01:00 committed by GitHub
commit b994e1433d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 126 additions and 118 deletions

View file

@ -1,4 +1,4 @@
from .idle_manager import (
from .idle_module import (
IdleManager,
IIdleManager
)

View file

@ -0,0 +1,97 @@
import collections
from abc import ABCMeta, abstractmethod
import six
from pype.modules import PypeModule, ITrayService
@six.add_metaclass(ABCMeta)
class IIdleManager:
"""Other modules interface to return callbacks by idle time in seconds.
Expected output is dictionary with seconds <int> as keys and callback/s
as value, value may be callback of list of callbacks.
EXAMPLE:
```
{
60: self.on_minute_idle
}
```
"""
idle_manager = None
@abstractmethod
def callbacks_by_idle_time(self):
pass
@property
def idle_time(self):
if self.idle_manager:
return self.idle_manager.idle_time
class IdleManager(PypeModule, ITrayService):
""" Measure user's idle time in seconds.
Idle time resets on keyboard/mouse input.
Is able to emit signals at specific time idle.
"""
label = "Idle Service"
name = "idle_manager"
def initialize(self, module_settings):
idle_man_settings = module_settings[self.name]
self.enabled = idle_man_settings["enabled"]
self.time_callbacks = collections.defaultdict(list)
self.idle_thread = None
def tray_init(self):
return
def tray_start(self):
self.start_thread()
def tray_exit(self):
self.stop_thread()
try:
self.time_callbacks = {}
except Exception:
pass
def connect_with_modules(self, enabled_modules):
for module in enabled_modules:
if not isinstance(module, IIdleManager):
continue
module.idle_manager = self
callbacks_items = module.callbacks_by_idle_time() or {}
for emit_time, callbacks in callbacks_items.items():
if not isinstance(callbacks, (tuple, list, set)):
callbacks = [callbacks]
self.time_callbacks[emit_time].extend(callbacks)
@property
def idle_time(self):
if self.idle_thread and self.idle_thread.is_running:
return self.idle_thread.idle_time
def _create_thread(self):
from .idle_threads import IdleManagerThread
return IdleManagerThread(self)
def start_thread(self):
if self.idle_thread:
self.idle_thread.stop()
self.idle_thread.join()
self.idle_thread = self._create_thread()
self.idle_thread.start()
def stop_thread(self):
if self.idle_thread:
self.idle_thread.stop()
self.idle_thread.join()
def on_thread_stop(self):
self.set_service_failed_icon()

View file

@ -1,105 +1,38 @@
import time
import collections
import threading
from abc import ABCMeta, abstractmethod
import six
from pynput import mouse, keyboard
from pype.lib import PypeLogger
from pype.modules import PypeModule, ITrayService
@six.add_metaclass(ABCMeta)
class IIdleManager:
"""Other modules interface to return callbacks by idle time in seconds.
class MouseThread(mouse.Listener):
"""Listens user's mouse movement."""
Expected output is dictionary with seconds <int> as keys and callback/s
as value, value may be callback of list of callbacks.
EXAMPLE:
```
{
60: self.on_minute_idle
}
```
"""
idle_manager = None
def __init__(self, callback):
super(MouseThread, self).__init__(on_move=self.on_move)
self.callback = callback
@abstractmethod
def callbacks_by_idle_time(self):
pass
@property
def idle_time(self):
if self.idle_manager:
return self.idle_manager.idle_time
def on_move(self, posx, posy):
self.callback()
class IdleManager(PypeModule, ITrayService):
""" Measure user's idle time in seconds.
Idle time resets on keyboard/mouse input.
Is able to emit signals at specific time idle.
"""
label = "Idle Service"
name = "idle_manager"
class KeyboardThread(keyboard.Listener):
"""Listens user's keyboard input."""
def initialize(self, module_settings):
idle_man_settings = module_settings[self.name]
self.enabled = idle_man_settings["enabled"]
def __init__(self, callback):
super(KeyboardThread, self).__init__(on_press=self.on_press)
self.time_callbacks = collections.defaultdict(list)
self.idle_thread = None
self.callback = callback
def tray_init(self):
return
def tray_start(self):
self.start_thread()
def tray_exit(self):
self.stop_thread()
try:
self.time_callbacks = {}
except Exception:
pass
def connect_with_modules(self, enabled_modules):
for module in enabled_modules:
if not isinstance(module, IIdleManager):
continue
module.idle_manager = self
callbacks_items = module.callbacks_by_idle_time() or {}
for emit_time, callbacks in callbacks_items.items():
if not isinstance(callbacks, (tuple, list, set)):
callbacks = [callbacks]
self.time_callbacks[emit_time].extend(callbacks)
@property
def idle_time(self):
if self.idle_thread and self.idle_thread.is_running:
return self.idle_thread.idle_time
def start_thread(self):
if self.idle_thread:
self.idle_thread.stop()
self.idle_thread.join()
self.idle_thread = IdleManagerThread(self)
self.idle_thread.start()
def stop_thread(self):
if self.idle_thread:
self.idle_thread.stop()
self.idle_thread.join()
def on_thread_stop(self):
self.set_service_failed_icon()
def on_press(self, key):
self.callback()
class IdleManagerThread(threading.Thread):
def __init__(self, module, *args, **kwargs):
super(IdleManagerThread, self).__init__(*args, **kwargs)
self.log = PypeLogger().get_logger(self.__class__.__name__)
self.log = PypeLogger.get_logger(self.__class__.__name__)
self.module = module
self.threads = []
self.is_running = False
@ -124,8 +57,8 @@ class IdleManagerThread(threading.Thread):
self.log.info("IdleManagerThread has started")
self.is_running = True
thread_mouse = MouseThread(self.reset_time)
thread_mouse.start()
thread_keyboard = KeyboardThread(self.reset_time)
thread_mouse.start()
thread_keyboard.start()
try:
while self.is_running:
@ -162,26 +95,3 @@ class IdleManagerThread(threading.Thread):
pass
self.on_stop()
class MouseThread(mouse.Listener):
"""Listens user's mouse movement."""
def __init__(self, callback):
super(MouseThread, self).__init__(on_move=self.on_move)
self.callback = callback
def on_move(self, posx, posy):
self.callback()
class KeyboardThread(keyboard.Listener):
"""Listens user's keyboard input."""
def __init__(self, callback):
super(KeyboardThread, self).__init__(on_press=self.on_press)
self.callback = callback
def on_press(self, key):
self.callback()

View file

@ -1,4 +1,5 @@
import os
import collections
from abc import ABCMeta, abstractmethod
import six
from .. import PypeModule, ITrayService, IIdleManager, IWebServerRoutes
@ -159,26 +160,25 @@ class TimersManager(PypeModule, ITrayService, IIdleManager, IWebServerRoutes):
def callbacks_by_idle_time(self):
"""Implementation of IIdleManager interface."""
# Time when message is shown
callbacks = {
self.time_show_message: lambda: self.time_callback(0)
}
callbacks = collections.defaultdict(list)
callbacks[self.time_show_message].append(lambda: self.time_callback(0))
# Times when idle is between show widget and stop timers
show_to_stop_range = range(
self.time_show_message - 1, self.time_stop_timer
)
for num in show_to_stop_range:
callbacks[num] = lambda: self.time_callback(1)
callbacks[num].append(lambda: self.time_callback(1))
# Times when widget is already shown and user restart idle
shown_and_moved_range = range(
self.time_stop_timer - self.time_show_message
)
for num in shown_and_moved_range:
callbacks[num] = lambda: self.time_callback(1)
callbacks[num].append(lambda: self.time_callback(1))
# Time when timers are stopped
callbacks[self.time_stop_timer] = lambda: self.time_callback(2)
callbacks[self.time_stop_timer].append(lambda: self.time_callback(2))
return callbacks

View file

@ -163,8 +163,9 @@ class SignalHandler(QtCore.QObject):
signal_change_label = QtCore.Signal()
signal_stop_timers = QtCore.Signal()
def __init__(self, cls):
super().__init__()
self.signal_show_message.connect(cls.show_message)
self.signal_change_label.connect(cls.change_label)
self.signal_stop_timers.connect(cls.stop_timers)
def __init__(self, module):
super(SignalHandler, self).__init__()
self.module = module
self.signal_show_message.connect(module.show_message)
self.signal_change_label.connect(module.change_label)
self.signal_stop_timers.connect(module.stop_timers)