diff --git a/example-config.yaml b/example-config.yaml index 5d855afd..8d31b778 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -136,6 +136,16 @@ bridge: # your own Matrix account as the Matrix puppet for your Telegram account. sync_with_custom_puppets: true + # Some config options related to Telegram message deduplication. + # The default values are usually fine, but some debug messages/warnings might recommend you + # change these. + deduplication: + # Whether or not to check the database if the message about to be sent is a duplicate. + pre_db_check: false + # The number of latest events to keep when checking for duplicates. + # You might need to increase this on high-traffic bridge instances. + cache_queue_length: 20 + # The formats to use when sending messages to Telegram via the relay bot. # # Telegram doesn't have built-in emotes, so the m.emote format is also used for non-relaybot users. diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 15da6cfa..159a61d5 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -196,6 +196,9 @@ class Config(DictWithRecursion): copy("bridge.catch_up") copy("bridge.sync_with_custom_puppets") + copy("bridge.deduplication.pre_db_check") + copy("bridge.deduplication.cache_queue_length") + if "bridge.message_formats.m_text" in self: del self["bridge.message_formats"] copy_dict("bridge.message_formats", override_existing_map=False) diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index 234c0dc3..7b90f531 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -66,12 +66,19 @@ class Portal: az = None # type: AppService bot = None # type: Bot loop = None # type: asyncio.AbstractEventLoop + filter_mode = None # type: str filter_list = None # type: List[str] + bridge_notices = False # type: bool + + dedup_pre_db_check = False # type: bool + dedup_cache_queue_length = 20 # type: int + alias_template = None # type: str mx_alias_regex = None # type: Pattern hs_domain = None # type: str + by_mxid = {} # type: Dict[str, Portal] by_tgid = {} # type: Dict[Tuple[int, int], Portal] @@ -191,7 +198,7 @@ class Portal: self._dedup_action.append(evt_hash) - if len(self._dedup_action) > 20: + if len(self._dedup_action) > self.dedup_cache_queue_length: self._dedup_action.popleft() return False @@ -221,7 +228,7 @@ class Portal: self._dedup_mxid[evt_hash] = mxid self._dedup.append(evt_hash) - if len(self._dedup) > 20: + if len(self._dedup) > self.dedup_cache_queue_length: del self._dedup_mxid[self._dedup.popleft()] return None @@ -1330,6 +1337,8 @@ class Portal: msg = DBMessage.query.get((evt.id, tg_space)) if not msg: + self.log.info(f"Didn't find edited message {evt.id}@{tg_space} (src {source.tgid}) " + "in database.") # Oh crap return msg.mxid = mxid @@ -1363,6 +1372,15 @@ class Portal: self.db.commit() return + if self.dedup_pre_db_check and self.peer_type == "channel": + msg = DBMessage.query.get((evt.id, tg_space)) + if msg: + self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already" + f"handled into {msg.mxid}. This duplicate was catched in the db " + "check. If you get this message often, consider increasing" + "bridge.deduplication.cache_queue_length in the config.") + return + if sender and not sender.displayname: self.log.debug(f"Telegram user {sender.tgid} sent a message, but doesn't have a" "displayname, updating info...") @@ -1416,10 +1434,16 @@ class Portal: DBMessage.mxid == temporary_identifier) \ .update({"mxid": mxid}) except FlushError as e: - self.log.exception(f"{e.__class__.__name__} while saving message mapping.") + self.log.exception(f"{e.__class__.__name__} while saving message mapping. " + "This might mean that an update was handled after it left the " + "dedup cache queue. You can try enabling bridge.deduplication." + "pre_db_check in the config.") await intent.redact(self.mxid, mxid) except (IntegrityError, InvalidRequestError) as e: - self.log.exception(f"{e.__class__.__name__} while saving message mapping.") + self.log.exception(f"{e.__class__.__name__} while saving message mapping. " + "This might mean that an update was handled after it left the " + "dedup cache queue. You can try enabling bridge.deduplication." + "pre_db_check in the config.") self.db.rollback() await intent.redact(self.mxid, mxid) @@ -1740,6 +1764,8 @@ def init(context: Context): Portal.bridge_notices = config["bridge.bridge_notices"] Portal.filter_mode = config["bridge.filter.mode"] Portal.filter_list = config["bridge.filter.list"] + Portal.dedup_pre_db_check = config["bridge.deduplication.pre_db_check"] + Portal.dedup_cache_queue_length = config["bridge.deduplication.cache_queue_length"] Portal.alias_template = config.get("bridge.alias_template", "telegram_{groupname}") Portal.hs_domain = config["homeserver.domain"] Portal.mx_alias_regex = re.compile(