diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 32254df9..beeb8149 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -15,6 +15,7 @@ # along with this program. If not, see . from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast, TYPE_CHECKING) +from collections import defaultdict import logging import asyncio @@ -31,7 +32,7 @@ from mautrix.errors import MatrixRequestError from mautrix.types import UserID, RoomID from mautrix.bridge import BaseUser from mautrix.util.logging import TraceLogger -from mautrix.util.opt_prometheus import Enum +from mautrix.util.opt_prometheus import Gauge from .types import TelegramID from .db import User as DBUser, Portal as DBPortal @@ -46,10 +47,8 @@ config: Optional['Config'] = None SearchResult = NamedTuple('SearchResult', puppet='pu.Puppet', similarity=int) -METRIC_LOGGED_IN = Enum('bridge_logged_in', 'Bridge Logged in', states=["true", "false"], - labelnames=("tgid",)) -METRIC_CONNECTED = Enum('bridge_connected', 'Bridge Connected', states=["true", "false"], - labelnames=("tgid",)) +METRIC_LOGGED_IN = Gauge('bridge_logged_in', 'Users logged into bridge') +METRIC_CONNECTED = Gauge('bridge_connected', 'Users connected') class User(AbstractUser, BaseUser): @@ -65,6 +64,8 @@ class User(AbstractUser, BaseUser): _db_instance: Optional[DBUser] _ensure_started_lock: asyncio.Lock + _metric_value: Dict[Gauge, bool] + _track_connection_task: Optional[asyncio.Task] def __init__(self, mxid: UserID, tgid: Optional[TelegramID] = None, username: Optional[str] = None, phone: Optional[str] = None, @@ -86,6 +87,8 @@ class User(AbstractUser, BaseUser): self._db_instance = db_instance self._ensure_started_lock = asyncio.Lock() self.dm_update_lock = asyncio.Lock() + self._metric_value = defaultdict(lambda: False) + self._track_connection_task = None self.command_status = None @@ -197,22 +200,43 @@ class User(AbstractUser, BaseUser): async with self._ensure_started_lock: return cast(User, await super().ensure_started(even_if_no_session)) + def _track_metric(self, metric: Gauge, value: bool) -> None: + if self._metric_value[metric] != value: + if value: + metric.inc(1) + else: + metric.dec(1) + self._metric_value[metric] = value + async def start(self, delete_unless_authenticated: bool = False) -> 'User': await super().start() - METRIC_CONNECTED.labels(tgid=self.tgid).state("true") + self._track_metric(METRIC_CONNECTED, True) if await self.is_logged_in(): self.log.debug(f"Ensuring post_login() for {self.name}") - asyncio.ensure_future(self.post_login(), loop=self.loop) + self.loop.create_task(self.post_login()) + if config["metrics.enabled"]: + self._track_connection_task = self.loop.create_task(self._track_connection()) elif delete_unless_authenticated: self.log.debug(f"Unauthenticated user {self.name} start()ed, deleting session...") await self.client.disconnect() - METRIC_CONNECTED.labels(tgid=self.tgid).state("false") + self._track_metric(METRIC_CONNECTED, False) self.client.session.delete() return self + async def _track_connection(self) -> None: + self.log.debug("Starting loop to track connection state") + while True: + await asyncio.sleep(3) + connected = bool(self.client._sender._transport_connected + if self.client and self.client._sender else False) + self._track_metric(METRIC_CONNECTED, connected) + async def stop(self) -> None: await super().stop() - METRIC_CONNECTED.labels(tgid=self.tgid).state("true") + if self._track_connection_task: + self._track_connection_task.cancel() + self._track_connection_task = None + self._track_metric(METRIC_CONNECTED, False) async def post_login(self, info: TLUser = None, first_login: bool = False) -> None: try: @@ -221,7 +245,7 @@ class User(AbstractUser, BaseUser): self.log.exception("Failed to update telegram account info") return - METRIC_LOGGED_IN.labels(tgid=self.tgid).state("true") + self._track_metric(METRIC_LOGGED_IN, True) try: puppet = pu.Puppet.get(self.tgid) @@ -314,7 +338,7 @@ class User(AbstractUser, BaseUser): if not ok: return False self.delete() - METRIC_LOGGED_IN.labels(tgid=self.tgid).state("false") + self._track_metric(METRIC_LOGGED_IN, False) return True def _search_local(self, query: str, max_results: int = 5, min_similarity: int = 45 @@ -414,7 +438,7 @@ class User(AbstractUser, BaseUser): self.portals[portal.tgid_full] = portal await self.save(portals=True) - async def unregister_portal(self, tgid: int, tg_receiver: int) -> None: + async def unregister_portal(self, tgid: TelegramID, tg_receiver: TelegramID) -> None: self.log.trace(f"Unregistering portal {(tgid, tg_receiver)}") try: del self.portals[(tgid, tg_receiver)] diff --git a/requirements.txt b/requirements.txt index f88635ee..a6c84451 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,6 @@ python-magic>=0.4,<0.5 commonmark>=0.8,<0.10 aiohttp>=3,<3.7 yarl<1.6 -mautrix==0.8.0.beta3 +mautrix==0.8.0.beta4 telethon>=1.16,<1.17 telethon-session-sqlalchemy>=0.2.14,<0.3