aboutsummaryrefslogtreecommitdiff
path: root/pyee/asyncio.py
blob: 433001f1eef42e4ec20118ce4b4f2be29b6bf835 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# -*- coding: utf-8 -*-

from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine
from typing import Any, Callable, cast, Dict, Optional, Tuple

from pyee.base import EventEmitter

__all__ = ["AsyncIOEventEmitter"]


class AsyncIOEventEmitter(EventEmitter):
    """An event emitter class which can run asyncio coroutines in addition to
    synchronous blocking functions. For example::

        @ee.on('event')
        async def async_handler(*args, **kwargs):
            await returns_a_future()

    On emit, the event emitter  will automatically schedule the coroutine using
    ``asyncio.ensure_future`` and the configured event loop (defaults to
    ``asyncio.get_event_loop()``).

    Unlike the case with the EventEmitter, all exceptions raised by
    event handlers are automatically emitted on the ``error`` event. This is
    important for asyncio coroutines specifically but is also handled for
    synchronous functions for consistency.

    When ``loop`` is specified, the supplied event loop will be used when
    scheduling work with ``ensure_future``. Otherwise, the default asyncio
    event loop is used.

    For asyncio coroutine event handlers, calling emit is non-blocking.
    In other words, you do not have to await any results from emit, and the
    coroutine is scheduled in a fire-and-forget fashion.
    """

    def __init__(self, loop: Optional[AbstractEventLoop] = None):
        super(AsyncIOEventEmitter, self).__init__()
        self._loop: Optional[AbstractEventLoop] = loop

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ):
        try:
            coro: Any = f(*args, **kwargs)
        except Exception as exc:
            self.emit("error", exc)
        else:
            if iscoroutine(coro):
                if self._loop:
                    # ensure_future is *extremely* cranky about the types here,
                    # but this is relatively well-tested and I think the types
                    # are more strict than they should be
                    fut: Any = ensure_future(cast(Any, coro), loop=self._loop)
                else:
                    fut = ensure_future(cast(Any, coro))
            elif isinstance(coro, Future):
                fut = cast(Any, coro)
            else:
                return

            def callback(f):
                if f.cancelled():
                    return

                exc: Exception = f.exception()
                if exc:
                    self.emit("error", exc)

            fut.add_done_callback(callback)