Use Gauge instead of Enum to count connected users
This commit is contained in:
+36
-12
@@ -15,6 +15,7 @@
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast,
|
||||
TYPE_CHECKING)
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
@@ -31,7 +32,7 @@ from mautrix.errors import MatrixRequestError
|
||||
from mautrix.types import UserID, RoomID
|
||||
from mautrix.bridge import BaseUser
|
||||
from mautrix.util.logging import TraceLogger
|
||||
from mautrix.util.opt_prometheus import Enum
|
||||
from mautrix.util.opt_prometheus import Gauge
|
||||
|
||||
from .types import TelegramID
|
||||
from .db import User as DBUser, Portal as DBPortal
|
||||
@@ -46,10 +47,8 @@ config: Optional['Config'] = None
|
||||
|
||||
SearchResult = NamedTuple('SearchResult', puppet='pu.Puppet', similarity=int)
|
||||
|
||||
METRIC_LOGGED_IN = Enum('bridge_logged_in', 'Bridge Logged in', states=["true", "false"],
|
||||
labelnames=("tgid",))
|
||||
METRIC_CONNECTED = Enum('bridge_connected', 'Bridge Connected', states=["true", "false"],
|
||||
labelnames=("tgid",))
|
||||
METRIC_LOGGED_IN = Gauge('bridge_logged_in', 'Users logged into bridge')
|
||||
METRIC_CONNECTED = Gauge('bridge_connected', 'Users connected')
|
||||
|
||||
|
||||
class User(AbstractUser, BaseUser):
|
||||
@@ -65,6 +64,8 @@ class User(AbstractUser, BaseUser):
|
||||
|
||||
_db_instance: Optional[DBUser]
|
||||
_ensure_started_lock: asyncio.Lock
|
||||
_metric_value: Dict[Gauge, bool]
|
||||
_track_connection_task: Optional[asyncio.Task]
|
||||
|
||||
def __init__(self, mxid: UserID, tgid: Optional[TelegramID] = None,
|
||||
username: Optional[str] = None, phone: Optional[str] = None,
|
||||
@@ -86,6 +87,8 @@ class User(AbstractUser, BaseUser):
|
||||
self._db_instance = db_instance
|
||||
self._ensure_started_lock = asyncio.Lock()
|
||||
self.dm_update_lock = asyncio.Lock()
|
||||
self._metric_value = defaultdict(lambda: False)
|
||||
self._track_connection_task = None
|
||||
|
||||
self.command_status = None
|
||||
|
||||
@@ -197,22 +200,43 @@ class User(AbstractUser, BaseUser):
|
||||
async with self._ensure_started_lock:
|
||||
return cast(User, await super().ensure_started(even_if_no_session))
|
||||
|
||||
def _track_metric(self, metric: Gauge, value: bool) -> None:
|
||||
if self._metric_value[metric] != value:
|
||||
if value:
|
||||
metric.inc(1)
|
||||
else:
|
||||
metric.dec(1)
|
||||
self._metric_value[metric] = value
|
||||
|
||||
async def start(self, delete_unless_authenticated: bool = False) -> 'User':
|
||||
await super().start()
|
||||
METRIC_CONNECTED.labels(tgid=self.tgid).state("true")
|
||||
self._track_metric(METRIC_CONNECTED, True)
|
||||
if await self.is_logged_in():
|
||||
self.log.debug(f"Ensuring post_login() for {self.name}")
|
||||
asyncio.ensure_future(self.post_login(), loop=self.loop)
|
||||
self.loop.create_task(self.post_login())
|
||||
if config["metrics.enabled"]:
|
||||
self._track_connection_task = self.loop.create_task(self._track_connection())
|
||||
elif delete_unless_authenticated:
|
||||
self.log.debug(f"Unauthenticated user {self.name} start()ed, deleting session...")
|
||||
await self.client.disconnect()
|
||||
METRIC_CONNECTED.labels(tgid=self.tgid).state("false")
|
||||
self._track_metric(METRIC_CONNECTED, False)
|
||||
self.client.session.delete()
|
||||
return self
|
||||
|
||||
async def _track_connection(self) -> None:
|
||||
self.log.debug("Starting loop to track connection state")
|
||||
while True:
|
||||
await asyncio.sleep(3)
|
||||
connected = bool(self.client._sender._transport_connected
|
||||
if self.client and self.client._sender else False)
|
||||
self._track_metric(METRIC_CONNECTED, connected)
|
||||
|
||||
async def stop(self) -> None:
|
||||
await super().stop()
|
||||
METRIC_CONNECTED.labels(tgid=self.tgid).state("true")
|
||||
if self._track_connection_task:
|
||||
self._track_connection_task.cancel()
|
||||
self._track_connection_task = None
|
||||
self._track_metric(METRIC_CONNECTED, False)
|
||||
|
||||
async def post_login(self, info: TLUser = None, first_login: bool = False) -> None:
|
||||
try:
|
||||
@@ -221,7 +245,7 @@ class User(AbstractUser, BaseUser):
|
||||
self.log.exception("Failed to update telegram account info")
|
||||
return
|
||||
|
||||
METRIC_LOGGED_IN.labels(tgid=self.tgid).state("true")
|
||||
self._track_metric(METRIC_LOGGED_IN, True)
|
||||
|
||||
try:
|
||||
puppet = pu.Puppet.get(self.tgid)
|
||||
@@ -314,7 +338,7 @@ class User(AbstractUser, BaseUser):
|
||||
if not ok:
|
||||
return False
|
||||
self.delete()
|
||||
METRIC_LOGGED_IN.labels(tgid=self.tgid).state("false")
|
||||
self._track_metric(METRIC_LOGGED_IN, False)
|
||||
return True
|
||||
|
||||
def _search_local(self, query: str, max_results: int = 5, min_similarity: int = 45
|
||||
@@ -414,7 +438,7 @@ class User(AbstractUser, BaseUser):
|
||||
self.portals[portal.tgid_full] = portal
|
||||
await self.save(portals=True)
|
||||
|
||||
async def unregister_portal(self, tgid: int, tg_receiver: int) -> None:
|
||||
async def unregister_portal(self, tgid: TelegramID, tg_receiver: TelegramID) -> None:
|
||||
self.log.trace(f"Unregistering portal {(tgid, tg_receiver)}")
|
||||
try:
|
||||
del self.portals[(tgid, tg_receiver)]
|
||||
|
||||
+1
-1
@@ -5,6 +5,6 @@ python-magic>=0.4,<0.5
|
||||
commonmark>=0.8,<0.10
|
||||
aiohttp>=3,<3.7
|
||||
yarl<1.6
|
||||
mautrix==0.8.0.beta3
|
||||
mautrix==0.8.0.beta4
|
||||
telethon>=1.16,<1.17
|
||||
telethon-session-sqlalchemy>=0.2.14,<0.3
|
||||
|
||||
Reference in New Issue
Block a user