Add basic metrics with prometheus (ref #120)
This commit is contained in:
@@ -64,6 +64,11 @@ appservice:
|
||||
as_token: "This value is generated when generating the registration"
|
||||
hs_token: "This value is generated when generating the registration"
|
||||
|
||||
# Prometheus telemetry config. Requires prometheus-aio to be installed.
|
||||
metrics:
|
||||
enabled: false
|
||||
listen_port: 8000
|
||||
|
||||
# Bridge config
|
||||
bridge:
|
||||
# Localpart template of MXIDs for Telegram users.
|
||||
|
||||
@@ -43,6 +43,11 @@ from .sqlstatestore import SQLStateStore
|
||||
from .user import User, init as init_user
|
||||
from . import __version__
|
||||
|
||||
try:
|
||||
import prometheus_client as prometheus
|
||||
except ImportError:
|
||||
prometheus = None
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="A Matrix-Telegram puppeting bridge.",
|
||||
prog="python -m mautrix-telegram")
|
||||
@@ -114,6 +119,12 @@ if config["appservice.provisioning.enabled"]:
|
||||
|
||||
context.mx = MatrixHandler(context)
|
||||
|
||||
if config["metrics.enabled"]:
|
||||
if prometheus:
|
||||
prometheus.start_http_server(config["metrics.listen_port"])
|
||||
else:
|
||||
log.warn("Metrics are enabled in the config, but prometheus-async is not installed.")
|
||||
|
||||
with appserv.run(config["appservice.hostname"], config["appservice.port"]) as start:
|
||||
start_ts = time()
|
||||
init_db(db_engine)
|
||||
|
||||
@@ -19,6 +19,7 @@ from abc import ABC, abstractmethod
|
||||
import asyncio
|
||||
import logging
|
||||
import platform
|
||||
import time
|
||||
|
||||
from telethon.tl.patched import MessageService, Message
|
||||
from telethon.tl.types import (
|
||||
@@ -50,6 +51,14 @@ UpdateMessage = Union[UpdateShortChatMessage, UpdateShortMessage, UpdateNewChann
|
||||
UpdateNewMessage, UpdateEditMessage, UpdateEditChannelMessage]
|
||||
UpdateMessageContent = Union[UpdateShortMessage, UpdateShortChatMessage, Message, MessageService]
|
||||
|
||||
try:
|
||||
from prometheus_client import Histogram
|
||||
|
||||
UPDATE_TIME = Histogram("telegram_update", "Time spent processing Telegram updates",
|
||||
["update_type"])
|
||||
except ImportError:
|
||||
Histogram = None
|
||||
UPDATE_TIME = None
|
||||
|
||||
class AbstractUser(ABC):
|
||||
session_container = None # type: AlchemySessionContainer
|
||||
@@ -151,11 +160,14 @@ class AbstractUser(ABC):
|
||||
raise NotImplementedError()
|
||||
|
||||
async def _update_catch(self, update: TypeUpdate) -> None:
|
||||
start_time = time.time()
|
||||
try:
|
||||
if not await self.update(update):
|
||||
await self._update(update)
|
||||
except Exception:
|
||||
self.log.exception("Failed to handle Telegram update")
|
||||
if UPDATE_TIME:
|
||||
UPDATE_TIME.labels(update_type=type(update).__name__).observe(time.time() - start_time)
|
||||
|
||||
async def get_dialogs(self, limit: int = None) -> List[Union[Chat, Channel]]:
|
||||
if self.is_bot:
|
||||
@@ -279,7 +291,8 @@ class AbstractUser(ABC):
|
||||
sender = pu.Puppet.get(TelegramID(update.user_id))
|
||||
await portal.handle_telegram_typing(sender, update)
|
||||
|
||||
async def _handle_entity_updates(self, entities: Dict[int, Union[User, Chat, Channel]]) -> None:
|
||||
async def _handle_entity_updates(self, entities: Dict[int, Union[User, Chat, Channel]]
|
||||
) -> None:
|
||||
try:
|
||||
users = (entity for entity in entities.values() if isinstance(entity, User))
|
||||
puppets = ((pu.Puppet.get(TelegramID(user.id)), user) for user in users)
|
||||
|
||||
@@ -184,6 +184,9 @@ class Config(DictWithRecursion):
|
||||
copy("appservice.as_token")
|
||||
copy("appservice.hs_token")
|
||||
|
||||
copy("metrics.enabled")
|
||||
copy("metrics.listen_port")
|
||||
|
||||
copy("bridge.username_template")
|
||||
copy("bridge.alias_template")
|
||||
copy("bridge.displayname_template")
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
from typing import Dict, List, Match, Optional, Set, Tuple, TYPE_CHECKING
|
||||
import logging
|
||||
import asyncio
|
||||
import time
|
||||
import re
|
||||
|
||||
from mautrix_appservice import MatrixRequestError, IntentError
|
||||
@@ -27,6 +28,14 @@ from . import user as u, portal as po, puppet as pu, commands as com
|
||||
if TYPE_CHECKING:
|
||||
from .context import Context
|
||||
|
||||
try:
|
||||
from prometheus_client import Histogram
|
||||
|
||||
EVENT_TIME = Histogram("matrix_event", "Time spent processing Matrix events",
|
||||
["event_type"])
|
||||
except ImportError:
|
||||
Histogram = None
|
||||
EVENT_TIME = None
|
||||
|
||||
class MatrixHandler:
|
||||
log = logging.getLogger("mau.mx") # type: logging.Logger
|
||||
@@ -379,6 +388,7 @@ class MatrixHandler:
|
||||
async def handle_event(self, evt: MatrixEvent) -> None:
|
||||
if self.filter_matrix_event(evt):
|
||||
return
|
||||
start_time = time.time()
|
||||
self.log.debug("Received event: %s", evt)
|
||||
evt_type = evt.get("type", "m.unknown") # type: str
|
||||
room_id = evt.get("room_id", None) # type: Optional[MatrixRoomID]
|
||||
@@ -430,3 +440,7 @@ class MatrixHandler:
|
||||
await self.handle_presence(sender, content.get("presence", "offline"))
|
||||
elif evt_type == "m.typing":
|
||||
await self.handle_typing(room_id, content.get("user_ids", []))
|
||||
else:
|
||||
return
|
||||
if EVENT_TIME:
|
||||
EVENT_TIME.labels(event_type=evt_type).observe(time.time() - start_time)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
cryptg
|
||||
Pillow
|
||||
moviepy
|
||||
prometheus-client
|
||||
|
||||
Reference in New Issue
Block a user