Source code for enamlnative.core.loop

"""
Copyright (c) 2017, CodeLV.

Distributed under the terms of the MIT License.

The full license is in the file LICENSE, distributed with this software.

@author jrm

"""
import enamlnative
from atom.api import Atom, Value, Subclass, Callable, Unicode
from functools import partial
from . import bridge


[docs]class EventLoop(Atom): """ Event loop delegation api """ #: So users can check if needed name = Unicode() #: Actual event loop object loop = Value() #: Error handler fallback _handler = Callable() #: Future implementation for type checks future = Value()
[docs] @classmethod def default(cls): """ Get the first available event loop implementation based on which packages are installed. """ with enamlnative.imports(): for impl in [ TornadoEventLoop, TwistedEventLoop, BuiltinEventLoop, ]: if impl.available(): print("Using {} event loop!".format(impl)) return impl() raise RuntimeError("No event loop implementation is available. "
"Install tornado or twisted.")
[docs] @classmethod def available(cls): """ Test if the event loop implementation is available. Returns --------- bool: The event loop can be used. """
raise NotImplementedError
[docs] def start(self):
self.loop.start()
[docs] def stop(self):
self.loop.stop()
[docs] def deferred_call(self, callback, *args, **kwargs): """ Schedule the given callback to be invoked at the next available time. """
raise NotImplementedError
[docs] def timed_call(self, ms, callback, *args, **kwargs): """ Schedule the given callback to be invoked at a time `ms` later. """
raise NotImplementedError
[docs] def create_future(self): """ Create a future instance for this event loop. Adds a "javascript fetch" like api with "then" and "catch". The object returned MUST have a method named `then` that takes a callback that should be invoked when the future is complete and returns the future object. And the object returned MUST have a method named `catch` that takes a callback that should be invoked if the future contains an exception and returns the future object. Likewise the future must be tagged with an id using `bridge.tag_object_with_id(obj)` so it can be resolved by the bridge. """
return self.future()
[docs] def run_iteration(self): """ Run one iteration of the event loop """
raise NotImplementedError
[docs] def set_error_handler(self, handler): """ Set the error handler method to be the given handler. """
self._handler = handler
[docs] def add_done_callback(self, future, callback): """ Add a callback will be triggered when the callback completes. """
future.then(callback)
[docs] def set_future_result(self, future, result): """ Set the result of a Future to trigger any attached callbacks. """
future.set_result(result)
[docs] def log_error(self, callback, error=None): """ Log the error that occurred when running the given callback. """ print("Uncaught error during callback: {}".format(callback))
print("Error: {}".format(error))
[docs]class TornadoEventLoop(EventLoop): """ Eventloop using tornado's ioloop """ base_future = Value()
[docs] @classmethod def available(cls): try: import unicodedata #: Required by tornado for encodings from tornado.ioloop import IOLoop return True except ImportError as e: print("Tornado event loop not available {}".format(e))
return False def _default_name(self): return "tornado" def _default_loop(self): from tornado.ioloop import IOLoop return IOLoop.current()
[docs] def deferred_call(self, callback, *args, **kwargs):
return self.loop.add_callback(callback, *args, **kwargs)
[docs] def timed_call(self, ms, callback, *args, **kwargs):
return self.loop.call_later(ms/1000.0, callback, *args, **kwargs)
[docs] def set_error_handler(self, handler): self._handler = handler
self.loop.handle_callback_exception = handler def _default_base_future(self): from tornado.concurrent import Future return Future def _default_future(self): BaseFuture = self.base_future loop = self class Future(BaseFuture): def __init__(self): super(Future, self).__init__() bridge.tag_object_with_id(self) def then(self, callback): """ Add then method so you can easily chain callbacks Tornado returns the future not the result to callbacks """ self.add_done_callback(partial(self.safe_callback, callback, False)) return self def catch(self, callback): """ Add catch method so you can easily chain callbacks """ self.add_done_callback(partial(self.safe_callback, callback, False)) return self def safe_callback(self, callback, catch, future): try: if catch: error = future.exception() if error is not None: callback(error) else: result = future.result() callback(result) return result except Exception as e: if loop._handler: loop._handler(callback) else: raise return Future
[docs] def run_iteration(self): """ Run one iteration of the event loop """
self.loop.doIteration(0.000001)
[docs] def log_error(self, callback): from tornado.log import app_log
app_log.error("Exception in callback %r", callback, exc_info=True)
[docs]class TwistedEventLoop(EventLoop): """ Eventloop using twisted's reactor """ def _default_name(self): return "twisted"
[docs] @classmethod def available(cls): try: from twisted.internet import reactor return True except ImportError as e: print("Twisted event loop not available {}".format(e))
return False def _default_loop(self): from twisted.internet import reactor return reactor def _default_future(self): from twisted.internet.defer import Deferred loop = self class Future(Deferred, object): def __init__(self): bridge.tag_object_with_id(self) super(Future, self).__init__() def safe_callback(self, callback, result): """ Twisted passes the callback return value to the next callback. We want the same API as tornado, hence we wrap it. """ try: callback(result) return result except Exception as e: if loop._handler: loop._handler(callback) else: raise def catch(self, callback): #: Add then method so you can easily chain callbacks self.addErrback(partial(self.safe_callback, callback)) return self def then(self, callback): #: Add custom API methods self.addCallback(partial(self.safe_callback, callback)) return self def set_result(self, result): self.callback(result) return Future
[docs] def start(self): print("Starting reactor {}".format(self.loop))
self.loop.run()
[docs] def deferred_call(self, callback, *args, **kwargs): """ We have to wake up the reactor after every call because it may calculate a long delay where it can sleep which causes events that happen during this period to seem really slow as they do not get processed until after the reactor "wakes up" """ loop = self.loop r = loop.callLater(0, callback, *args, **kwargs) loop.wakeUp()
return r
[docs] def timed_call(self, ms, callback, *args, **kwargs): """ We have to wake up the reactor after every call because it may calculate a long delay where it can sleep which causes events that happen during this period to seem really slow as they do not get processed until after the reactor "wakes up" """ loop = self.loop r = loop.callLater(ms/1000.0, callback, *args, **kwargs) loop.wakeUp()
return r
[docs] def run_iteration(self): """ Run one iteration of the event loop """
self.loop.doIteration(0.000001)
[docs]class BuiltinEventLoop(TornadoEventLoop): """ Use the built in event loop. It's a stripped down version of tornado, It's currently slightly slower than tornado at the moment so use tornado if possible. """
[docs] @classmethod def available(cls): try: from .eventloop.ioloop import IOLoop return True except ImportError: print("Error: Failed to load the builtin event loop. " "This usually indicates missing or inaccessible " "shared libraries.")
return False def _default_base_future(self): from .eventloop.concurrent import Future return Future def _default_loop(self): from .eventloop.ioloop import IOLoop return IOLoop.current()
[docs] def set_error_handler(self, handler): self._handler = handler
self.loop.set_callback_exception_handler(handler)
[docs] def log_error(self, callback, error=None): """ Log the error that occurred when running the given callback. """ print("Uncaught error during callback: {}".format(callback))
print("Error: {}".format(error))