diff --git a/example-config.yaml b/example-config.yaml index 43e7eaf4..ea696354 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -119,9 +119,10 @@ bridge: # their Telegram account at startup. startup_sync: true # Number of most recently active dialogs to check when syncing chats. - # Dialogs include groups and private chats, but only groups are synced. # Set to 0 to remove limit. sync_dialog_limit: 30 + # Whether or not to sync and create portals for direct chats at startup. + sync_direct_chats: false # The maximum number of simultaneous Telegram deletions to handle. # A large number of simultaneous redactions could put strain on your homeserver. max_telegram_delete: 10 diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py index 101af930..09778584 100644 --- a/mautrix_telegram/abstract_user.py +++ b/mautrix_telegram/abstract_user.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Tuple, Optional, AsyncIterable, Union, Dict, TYPE_CHECKING +from typing import Tuple, Optional, Union, Dict, TYPE_CHECKING from abc import ABC, abstractmethod import asyncio import logging @@ -23,12 +23,12 @@ import time from telethon.sessions import Session from telethon.tl.patched import MessageService, Message from telethon.tl.types import ( - Channel, ChannelForbidden, Chat, ChatForbidden, MessageActionChannelMigrateFrom, PeerUser, - TypeUpdate, UpdateChannelPinnedMessage, UpdateChatPinnedMessage, UpdateChatParticipantAdmin, - UpdateChatParticipants, UpdateChatUserTyping, UpdateDeleteChannelMessages, UpdateNewMessage, - UpdateDeleteMessages, UpdateEditChannelMessage, UpdateEditMessage, UpdateNewChannelMessage, - UpdateReadHistoryOutbox, UpdateShortChatMessage, UpdateShortMessage, UpdateUserName, - UpdateUserPhoto, UpdateUserStatus, UpdateUserTyping, User, UserStatusOffline, UserStatusOnline) + Channel, Chat, MessageActionChannelMigrateFrom, PeerUser, TypeUpdate, UpdateChatPinnedMessage, + UpdateChannelPinnedMessage, UpdateChatParticipantAdmin, UpdateChatParticipants, + UpdateChatUserTyping, UpdateDeleteChannelMessages, UpdateNewMessage, UpdateDeleteMessages, + UpdateEditChannelMessage, UpdateEditMessage, UpdateNewChannelMessage, UpdateReadHistoryOutbox, + UpdateShortChatMessage, UpdateShortMessage, UpdateUserName, UpdateUserPhoto, UpdateUserStatus, + UpdateUserTyping, User, UserStatusOffline, UserStatusOnline) from mautrix.types import UserID, PresenceState from mautrix.errors import MatrixError @@ -189,13 +189,6 @@ class AbstractUser(ABC): if UPDATE_TIME: UPDATE_TIME.labels(update_type=type(update).__name__).observe(time.time() - start_time) - def get_dialogs(self, limit: int = None) -> AsyncIterable[Union[User, Chat, Channel]]: - return (dialog.entity async for dialog in - self.client.iter_dialogs(limit=limit, ignore_migrated=True, archived=False) - if isinstance(dialog.entity, (ChatForbidden, ChannelForbidden)) - or (isinstance(dialog.entity, Chat) - and (dialog.entity.deactivated or dialog.entity.left))) - @property @abstractmethod def name(self) -> str: diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index ff2335b5..ac411fa4 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -85,6 +85,7 @@ class Config(BaseBridgeConfig): copy("bridge.skip_deleted_members") copy("bridge.startup_sync") copy("bridge.sync_dialog_limit") + copy("bridge.sync_direct_chats") copy("bridge.max_telegram_delete") copy("bridge.sync_matrix_state") copy("bridge.allow_matrix_login") diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py index 8a0e7523..046ab5ea 100644 --- a/mautrix_telegram/portal/base.py +++ b/mautrix_telegram/portal/base.py @@ -115,7 +115,7 @@ class BasePortal(ABC): self._db_instance = db_instance self._main_intent = None self.deleted = False - self.log = self.base_log.getChild(self.tgid_log) if self.tgid else self.base_log + self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid) self.dedup = PortalDedup(self) self.send_lock = PortalSendLock() @@ -155,7 +155,7 @@ class BasePortal(ABC): if not self._main_intent: direct = self.peer_type == "user" puppet = p.Puppet.get(self.tgid) if direct else None - self._main_intent = puppet.intent if direct else self.az.intent + self._main_intent = puppet.intent_for(self) if direct else self.az.intent return self._main_intent @property @@ -272,7 +272,7 @@ class BasePortal(ABC): if user != intent.mxid and (not puppets_only or puppet): try: if puppet: - await puppet.intent.leave_room(room_id) + await puppet.default_mxid_intent.leave_room(room_id) else: await intent.kick_user(room_id, user, message) except (MatrixRequestError, IntentError): diff --git a/mautrix_telegram/portal/metadata.py b/mautrix_telegram/portal/metadata.py index 9c24434a..5ade67f4 100644 --- a/mautrix_telegram/portal/metadata.py +++ b/mautrix_telegram/portal/metadata.py @@ -100,7 +100,7 @@ class PortalMetadata(BasePortal, ABC): self.tgid = new_id self.tg_receiver = new_id self.by_tgid[self.tgid_full] = self - self.log = self.base_log.getChild(str(self.tgid)) + self.log = self.base_log.getChild(self.tgid_log) self.log.info(f"Telegram chat upgraded from {old_id}") async def set_telegram_username(self, source: 'u.User', username: str) -> None: @@ -145,7 +145,7 @@ class PortalMetadata(BasePortal, ABC): self.by_tgid[self.tgid_full] = self await self.update_info(source, entity) self.db_instance.insert() - self.log = self.base_log.getChild(str(self.tgid)) + self.log = self.base_log.getChild(self.tgid_log) if self.bot and self.bot.tgid in invites: self.bot.add_chat(self.tgid, self.peer_type) @@ -192,6 +192,17 @@ class PortalMetadata(BasePortal, ABC): levels: PowerLevelStateEventContent = None, users: List[User] = None, participants: List[TypeParticipant] = None) -> None: + try: + await self._update_matrix_room(user, entity, direct, puppet, levels, users, + participants) + except Exception: + self.log.exception("Fatal error updating Matrix room") + + async def _update_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User], + direct: bool, puppet: p.Puppet = None, + levels: PowerLevelStateEventContent = None, + users: List[User] = None, + participants: List[TypeParticipant] = None) -> None: if not direct: await self.update_info(user, entity) if not users or not participants: @@ -202,7 +213,7 @@ class PortalMetadata(BasePortal, ABC): if not puppet: puppet = p.Puppet.get(self.tgid) await puppet.update_info(user, entity) - await puppet.intent.join_room(self.mxid) + await puppet.intent_for(self).join_room(self.mxid) if self.sync_matrix_state: await self.sync_matrix_members() @@ -221,7 +232,10 @@ class PortalMetadata(BasePortal, ABC): await self.invite_to_matrix(invites or []) return self.mxid async with self._room_create_lock: - return await self._create_matrix_room(user, entity, invites) + try: + return await self._create_matrix_room(user, entity, invites) + except Exception: + self.log.exception("Fatal error creating Matrix room") async def _create_matrix_room(self, user: 'AbstractUser', entity: TypeChat, invites: InviteList ) -> Optional[RoomID]: @@ -235,7 +249,7 @@ class PortalMetadata(BasePortal, ABC): if not entity: entity = await self.get_entity(user) - self.log.debug("Fetched data: %s", entity) + self.log.debug(f"Fetched data: {entity}") self.log.debug("Creating room") @@ -244,8 +258,12 @@ class PortalMetadata(BasePortal, ABC): except AttributeError: self.title = None + if direct and self.tgid == user.tgid: + self.title = "Telegram Saved Messages" + self.about = "Your Telegram cloud storage chat" + puppet = p.Puppet.get(self.tgid) if direct else None - self._main_intent = puppet.intent if direct else self.az.intent + self._main_intent = puppet.intent_for(self) if direct else self.az.intent if self.peer_type == "channel": self.megagroup = entity.megagroup @@ -280,7 +298,8 @@ class PortalMetadata(BasePortal, ABC): room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, is_direct=direct, invitees=invites or [], - name=self.title, initial_state=initial_state) + name=self.title, topic=self.about, + initial_state=initial_state) if not room_id: raise Exception(f"Failed to create room") @@ -420,7 +439,7 @@ class PortalMetadata(BasePortal, ABC): if entity.bot: self._add_bot_chat(entity) allowed_tgids.add(entity.id) - await puppet.intent.ensure_joined(self.mxid) + await puppet.intent_for(self).ensure_joined(self.mxid) await puppet.update_info(source, entity) user = u.User.get_by_tgid(TelegramID(entity.id)) @@ -461,7 +480,7 @@ class PortalMetadata(BasePortal, ABC): if source: entity: User = await source.client.get_entity(PeerUser(user_id)) await puppet.update_info(source, entity) - await puppet.intent.join_room(self.mxid) + await puppet.intent_for(self).join_room(self.mxid) user = u.User.get_by_tgid(user_id) if user: @@ -476,16 +495,16 @@ class PortalMetadata(BasePortal, ABC): else "Left Telegram chat") if sender.tgid != puppet.tgid: try: - await sender.intent.kick_user(self.mxid, puppet.mxid) + await sender.intent_for(self).kick_user(self.mxid, puppet.mxid) except MForbidden: await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message) else: - await puppet.intent.leave_room(self.mxid) + await puppet.intent_for(self).leave_room(self.mxid) if user: user.unregister_portal(self) if sender.tgid != puppet.tgid: try: - await sender.intent.kick_user(self.mxid, puppet.mxid) + await sender.intent_for(self).kick_user(self.mxid, puppet.mxid) return except MForbidden: pass diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index 36439b9b..8be08f8d 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -73,7 +73,7 @@ class PortalTelegram(BasePortal, ABC): async def handle_telegram_typing(self, user: p.Puppet, _: Union[UpdateUserTyping, UpdateChatUserTyping]) -> None: - await user.intent.set_typing(self.mxid, is_typing=True) + await user.intent_for(self).set_typing(self.mxid, is_typing=True) def _get_external_url(self, evt: Message) -> Optional[str]: if self.peer_type == "channel" and self.username is not None: @@ -361,7 +361,7 @@ class PortalTelegram(BasePortal, ABC): f"{editing_msg.mxid}\">Edit: " f"{content.formatted_body or escape_html(content.body)}") - intent = sender.intent if sender else self.main_intent + intent = sender.intent_for(self) if sender else self.main_intent await intent.set_typing(self.mxid, is_typing=False) event_id = await intent.send_message(self.mxid, content) @@ -409,7 +409,7 @@ class PortalTelegram(BasePortal, ABC): MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported) media = evt.media if hasattr(evt, "media") and isinstance(evt.media, allowed_media) else None - intent = sender.intent if sender else self.main_intent + intent = sender.intent_for(self) if sender else self.main_intent if not media and evt.message: is_bot = sender.is_bot if sender else False event_id = await self.handle_telegram_text(source, intent, is_bot, evt) @@ -489,7 +489,7 @@ class PortalTelegram(BasePortal, ABC): elif isinstance(action, MessageActionChatMigrateTo): self.peer_type = "channel" self._migrate_and_save_telegram(TelegramID(action.channel_id)) - await sender.intent.send_emote(self.mxid, "upgraded this group to a supergroup.") + await sender.intent_for(self).send_emote(self.mxid, "upgraded this group to a supergroup.") elif isinstance(action, MessageActionPinMessage): await self.receive_telegram_pin_sender(sender) elif isinstance(action, MessageActionGameScore): @@ -515,7 +515,7 @@ class PortalTelegram(BasePortal, ABC): await self.update_telegram_pin() async def update_telegram_pin(self) -> None: - intent = (self._temp_pinned_message_sender.intent + intent = (self._temp_pinned_message_sender.intent_for(self) if self._temp_pinned_message_sender else self.main_intent) msg_id = self._temp_pinned_message_id self._temp_pinned_message_id = None diff --git a/mautrix_telegram/puppet.py b/mautrix_telegram/puppet.py index a1ae68dc..f8c4d3b8 100644 --- a/mautrix_telegram/puppet.py +++ b/mautrix_telegram/puppet.py @@ -28,7 +28,7 @@ from mautrix.types import UserID from .types import TelegramID from .db import Puppet as DBPuppet -from . import util +from . import util, portal as p if TYPE_CHECKING: from .matrix import MatrixHandler @@ -135,6 +135,11 @@ class Puppet(CustomPuppetMixin): ) -> Awaitable[Union[TypeInputPeer, TypeInputUser]]: return user.client.get_input_entity(self.peer) + def intent_for(self, portal: 'p.Portal') -> IntentAPI: + if portal.tgid == self.tgid: + return self.default_mxid_intent + return self.intent + # region DB conversion @property diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index b20629ad..ef3975bd 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -13,14 +13,15 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import (Awaitable, Dict, List, Iterable, Match, NewType, Optional, Tuple, Any, - TYPE_CHECKING) +from typing import (Awaitable, Dict, List, Iterable, Match, NewType, Optional, Tuple, Any, Union, + AsyncIterable, TYPE_CHECKING) import logging import asyncio import re from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser, - UpdateShortChatMessage, UpdateShortMessage, User as TLUser) + UpdateShortChatMessage, UpdateShortMessage, User as TLUser, + ChannelForbidden, ChatForbidden, Chat, Channel) from telethon.tl.types.contacts import ContactsNotModified from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest from telethon.tl.functions.account import UpdateStatusRequest @@ -310,7 +311,17 @@ class User(AbstractUser): if self.is_bot: return creators = [] - async for entity in self.get_dialogs(limit=config["bridge.sync_dialog_limit"] or None): + limit = config["bridge.sync_dialog_limit"] or None + self.log.debug(f"Syncing dialogs (limit={limit}, synchronous_create={synchronous_create})") + async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True, + archived=False): + entity = dialog.entity + if isinstance(entity, Chat) and (entity.deactivated or entity.left): + self.log.warn(f"Ignoring deactivated or left chat {entity} while syncing") + continue + elif isinstance(entity, TLUser) and not config["bridge.sync_direct_chats"]: + continue + self.log.info(f"Syncing {type(entity)}") portal = po.Portal.get_by_entity(entity) self.portals[portal.tgid_full] = portal creators.append( @@ -318,6 +329,7 @@ class User(AbstractUser): synchronous=synchronous_create)) self.save(portals=True) await asyncio.gather(*creators, loop=self.loop) + self.log.debug("Dialog syncing complete") def register_portal(self, portal: po.Portal) -> None: try: diff --git a/mautrix_telegram/web/provisioning/__init__.py b/mautrix_telegram/web/provisioning/__init__.py index 3a81c19c..31b01a87 100644 --- a/mautrix_telegram/web/provisioning/__init__.py +++ b/mautrix_telegram/web/provisioning/__init__.py @@ -319,7 +319,7 @@ class ProvisioningAPI(AuthAPI): return web.json_response([{ "id": get_peer_id(chat), "title": chat.title, - } async for chat in user.get_dialogs()]) + } async for chat in user.client.get_dialogs(ignore_migrated=True, archived=False)]) else: return web.json_response([{ "id": get_peer_id(chat.peer),