import asyncio
import logging
import signal
from typing import Any, Callable, Dict, List, Optional, Tuple
from .models import User
from .http import HTTPClient
from .state import ConnectionState
log = logging.getLogger(__name__)
class _ClientEventTask(asyncio.Task):
def __init__(
self,
original_coro: Callable,
event_name: str,
coro: Callable,
*,
loop: asyncio.AbstractEventLoop
) -> None:
super().__init__(coro, loop=loop)
self.__event_name = event_name
self.__original_coro = original_coro
def __repr__(self) -> str:
info = [
("state", self._state.lower()),
("event", self.__event_name),
("coro", repr(self.__original_coro)),
]
if self._exception is not None:
info.append(("exception", repr(self._exception)))
return "<ClientEventTask {}>".format(" ".join("%s=%s" % t for t in info))
[docs]
class Client:
def __init__(
self, *, loop: Optional[asyncio.AbstractEventLoop] = None, **options: Any
) -> None:
self.loop = loop or asyncio.get_event_loop()
self.connector = options.pop("connector", None)
self.proxy = options.pop("proxy", None)
self.proxy_auth = options.pop("proxy_auth", None)
self._listeners: Dict[str, List[Tuple[asyncio.Future, Callable]]] = {}
self.http = HTTPClient(
self.connector, proxy=self.proxy, proxy_auth=self.proxy_auth, loop=self.loop
)
self._ready = asyncio.Event()
self._handlers = {"ready": self._handle_ready}
self._connection = ConnectionState(
dispatch=self.dispatch,
handlers=self._handlers,
http=self.http,
loop=self.loop,
**options,
)
[docs]
def is_ready(self) -> bool:
"""Check if the client is ready."""
return self._ready.is_set()
async def _run_event(
self, coro: Callable, event_name: str, *args: Any, **kwargs: Any
) -> None:
try:
await coro(*args, **kwargs)
except asyncio.CancelledError:
pass
except Exception as e:
log.error("Exception in event '%s': %s", event_name, e)
try:
await self.on_error(event_name, *args, **kwargs)
except asyncio.CancelledError:
pass
[docs]
async def on_error(self, event_method: str, *args: Any, **kwargs: Any) -> None:
"""Handle errors during event execution."""
log.error("Ignoring exception in %s", event_method, exc_info=True)
def _schedule_event(
self, coro: Callable, event_name: str, *args: Any, **kwargs: Any
) -> _ClientEventTask:
wrapped = self._run_event(coro, event_name, *args, **kwargs)
return _ClientEventTask(
original_coro=coro, event_name=event_name, coro=wrapped, loop=self.loop
)
[docs]
async def wait_until_ready(self) -> None:
"""Wait until the client is ready."""
await self._ready.wait()
[docs]
def wait_for(
self,
event: str,
*,
check: Optional[Callable] = None,
timeout: Optional[float] = None
) -> asyncio.Future:
"""Wait for a specific event to occur."""
future = self.loop.create_future()
check = check if check else lambda *args: True
ev = event.lower()
try:
listeners = self._listeners[ev]
except KeyError:
listeners = []
self._listeners[ev] = listeners
listeners.append((future, check))
return asyncio.wait_for(future, timeout)
[docs]
def dispatch(self, event: str, *args: Any, **kwargs: Any) -> None:
"""Dispatch an event to the appropriate handlers."""
log.debug(
"Dispatching event: %s with args: %s and kwargs: %s", event, args, kwargs
)
method = "on_" + event
listeners = self._listeners.get(event)
if listeners:
removed = []
for i, (future, condition) in enumerate(listeners):
if future.cancelled():
removed.append(i)
continue
try:
result = condition(*args)
except Exception as exec:
log.error(
"Error in event listener condition for event '%s': %s",
event,
exec,
)
future.set_exception(exec)
removed.append(i)
else:
if result:
if len(args) == 0:
future.set_result(None)
elif len(args) == 1:
future.set_result(args[0])
else:
future.set_result(args)
removed.append(i)
if len(removed) == len(listeners):
self._listeners.pop(event)
else:
for idx in reversed(removed):
del listeners[idx]
try:
coro = getattr(self, method)
except AttributeError:
log.warning("No handler found for event: %s", event)
else:
self._schedule_event(coro, method, *args, **kwargs)
[docs]
async def close(self) -> None:
"""Close the client."""
log.debug("Closing client")
await self.http.close()
[docs]
async def login(self, token: str) -> None:
"""Login to the client using a token."""
log.debug("Logging in with token: %s", token)
await self.http.login(token.strip())
[docs]
async def start(self, *args: Any, **kwargs: Any) -> None:
"""Start the client."""
log.debug("Starting client with args: %s and kwargs: %s", args, kwargs)
await self.login(*args)
[docs]
async def main(self) -> None:
"""Main function to be overridden by subclasses."""
pass
def _handle_ready(self) -> None:
"""Handle the ready event."""
log.debug("Ready event triggered")
self._ready.set()
[docs]
def run(self, *args: Any, **kwargs: Any) -> Optional[Any]:
"""Run the client."""
loop = self.loop
try:
loop.add_signal_handler(signal.SIGINT, lambda: loop.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: loop.stop())
except NotImplementedError:
pass
async def runner() -> None:
try:
await self.start(*args, **kwargs)
await self.main()
finally:
await self.close()
def stop_loop_on_completion(f: asyncio.Future) -> None:
loop.stop()
future = asyncio.ensure_future(runner(), loop=loop)
future.add_done_callback(stop_loop_on_completion)
try:
loop.run_forever()
except KeyboardInterrupt:
log.info("Received terminate signal")
finally:
future.remove_done_callback(stop_loop_on_completion)
if not future.cancelled():
try:
return future.result()
except KeyboardInterrupt:
return None
[docs]
def event(self, coro: Callable) -> Callable:
"""Register an event coroutine."""
if not asyncio.iscoroutinefunction(coro):
raise TypeError("event registered must be a coroutine function")
setattr(self, coro.__name__, coro)
log.debug("Event %s has been successfully registered", coro.__name__)
return coro
[docs]
async def fetch_client_profile(self) -> None:
"""Fetch the client profile."""
log.debug("Fetching client profile")
await self.http.fetch_client_profile()
[docs]
async def fetch_friends(self) -> List[User]:
"""Fetch the list of friends."""
data = await self.http.fetch_friends()
log.debug("Fetched friends data: %s", data)
return [User(self._connection, data=friend) for friend in data["friends"]]
[docs]
async def fetch_passcode(self, email: str) -> None:
"""Fetch the passcode for the given email."""
log.debug("Fetching passcode for email: %s", email)
await self.http.fetch_auth_passcode(email)
[docs]
async def fetch_token(self, email: str, passcode: str) -> str:
"""Fetch the token for the given email and passcode."""
log.debug("Fetching token for email: %s with passcode: %s", email, passcode)
response = await self.http.fetch_auth_token(email, passcode)
log.debug("Fetched token: %s", response["token"])
return response["token"]