aboutsummaryrefslogtreecommitdiff
path: root/pyee/trio.py
blob: e79d4572f950f40eaef4db198e2695142114c1a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-

from contextlib import AbstractAsyncContextManager, asynccontextmanager
from types import TracebackType
from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional, Tuple, Type

import trio

from pyee.base import EventEmitter, PyeeException

__all__ = ["TrioEventEmitter"]


Nursery = trio.Nursery


class TrioEventEmitter(EventEmitter):
    """An event emitter class which can run trio tasks in a trio nursery.

    By default, this class will lazily create both a nursery manager (the
    object returned from ``trio.open_nursery()`` and a nursery (the object
    yielded by using the nursery manager as an async context manager). It is
    also possible to supply an existing nursery manager via the ``manager``
    argument, or an existing nursery via the ``nursery`` argument.

    Instances of TrioEventEmitter are themselves async context managers, so
    that they may manage the lifecycle of the underlying trio nursery. For
    example, typical usage of this library may look something like this::

        async with TrioEventEmitter() as ee:
            # Underlying nursery is instantiated and ready to go
            @ee.on('data')
            async def handler(data):
                print(data)

            ee.emit('event')

        # Underlying nursery and manager have been cleaned up

    Unlike the case with the EventEmitter, all exceptions raised by event
    handlers are automatically emitted on the ``error`` event. This is
    important for trio coroutines specifically but is also handled for
    synchronous functions for consistency.

    For trio coroutine event handlers, calling emit is non-blocking. In other
    words, you should not attempt to await emit; the coroutine is scheduled
    in a fire-and-forget fashion.
    """

    def __init__(
        self,
        nursery: Nursery = None,
        manager: "AbstractAsyncContextManager[trio.Nursery]" = None,
    ):
        super(TrioEventEmitter, self).__init__()
        self._nursery: Optional[Nursery] = None
        self._manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None
        if nursery:
            if manager:
                raise PyeeException(
                    "You may either pass a nursery or a nursery manager " "but not both"
                )
            self._nursery = nursery
        elif manager:
            self._manager = manager
        else:
            self._manager = trio.open_nursery()

    def _async_runner(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> Callable[[], Awaitable[None]]:
        async def runner() -> None:
            try:
                await f(*args, **kwargs)
            except Exception as exc:
                self.emit("error", exc)

        return runner

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> None:
        if not self._nursery:
            raise PyeeException("Uninitialized trio nursery")
        self._nursery.start_soon(self._async_runner(f, args, kwargs))

    @asynccontextmanager
    async def context(
        self,
    ) -> AsyncGenerator["TrioEventEmitter", None]:
        """Returns an async contextmanager which manages the underlying
        nursery to the EventEmitter. The ``TrioEventEmitter``'s
        async context management methods are implemented using this
        function, but it may also be used directly for clarity.
        """
        if self._nursery is not None:
            yield self
        elif self._manager is not None:
            async with self._manager as nursery:
                self._nursery = nursery
                yield self
        else:
            raise PyeeException("Uninitialized nursery or nursery manager")

    async def __aenter__(self) -> "TrioEventEmitter":
        self._context: Optional[
            AbstractAsyncContextManager["TrioEventEmitter"]
        ] = self.context()
        return await self._context.__aenter__()

    async def __aexit__(
        self,
        type: Optional[Type[BaseException]],
        value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        if self._context is None:
            raise PyeeException("Attempting to exit uninitialized context")
        rv = await self._context.__aexit__(type, value, traceback)
        self._context = None
        self._nursery = None
        self._manager = None
        return rv