diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py index 7ef1c150..a4ea65fb 100644 --- a/mautrix_telegram/abstract_user.py +++ b/mautrix_telegram/abstract_user.py @@ -75,6 +75,7 @@ from telethon.tl.types import ( from mautrix.appservice import AppService from mautrix.errors import MatrixError from mautrix.types import PresenceState, UserID +from mautrix.util import background_task from mautrix.util.logging import TraceLogger from mautrix.util.opt_prometheus import Counter, Histogram @@ -241,7 +242,7 @@ class AbstractUser(ABC): async def _telethon_update_error_callback(self, err: Exception) -> None: if isinstance(err, (UnauthorizedError, AuthKeyError)): - asyncio.create_task(self.on_signed_out(err)) + background_task.create(self.on_signed_out(err)) return if self.config["telegram.exit_on_update_error"]: self.log.critical(f"Stopping due to update handling error {type(err).__name__}") @@ -325,7 +326,7 @@ class AbstractUser(ABC): async def _update(self, update: TypeUpdate) -> None: if isinstance(update, UpdateShort): update = update.update - asyncio.create_task(self._handle_entity_updates(getattr(update, "_entities", {}))) + background_task.create(self._handle_entity_updates(getattr(update, "_entities", {}))) if isinstance( update, ( @@ -625,7 +626,7 @@ class AbstractUser(ABC): await portal.delete_telegram_user(self.tgid, sender=None) elif chan := getattr(update, "mau_channel", None): if not portal.mxid: - asyncio.create_task(self._delayed_create_channel(chan)) + background_task.create(self._delayed_create_channel(chan)) else: self.log.debug("Updating channel info with data fetched by Telethon") await portal.update_info(self, chan) diff --git a/mautrix_telegram/commands/portal/bridge.py b/mautrix_telegram/commands/portal/bridge.py index 316d3614..245c6e8f 100644 --- a/mautrix_telegram/commands/portal/bridge.py +++ b/mautrix_telegram/commands/portal/bridge.py @@ -21,6 +21,7 @@ import asyncio from telethon.tl.types import ChannelForbidden, ChatForbidden from mautrix.types import EventID, RoomID +from mautrix.util import background_task from ... import portal as po from ...types import TelegramID @@ -184,7 +185,7 @@ async def confirm_bridge(evt: CommandEvent) -> EventID | None: if not ok: return None elif coro: - asyncio.create_task(coro) + background_task.create(coro) await evt.reply("Cleaning up previous portal room...") elif portal.mxid: evt.sender.command_status = None @@ -251,7 +252,7 @@ async def _locked_confirm_bridge( await portal.save() await portal.update_bridge_info() - asyncio.create_task(portal.update_matrix_room(user, entity, levels=levels)) + background_task.create(portal.update_matrix_room(user, entity, levels=levels)) await warn_missing_power(levels, evt) diff --git a/mautrix_telegram/commands/telegram/auth.py b/mautrix_telegram/commands/telegram/auth.py index e02861f4..0a75b429 100644 --- a/mautrix_telegram/commands/telegram/auth.py +++ b/mautrix_telegram/commands/telegram/auth.py @@ -22,7 +22,6 @@ import io from telethon.errors import ( AccessTokenExpiredError, AccessTokenInvalidError, - FirstNameInvalidError, FloodWaitError, PasswordHashInvalidError, PhoneCodeExpiredError, @@ -31,14 +30,12 @@ from telethon.errors import ( PhoneNumberBannedError, PhoneNumberFloodError, PhoneNumberInvalidError, - PhoneNumberOccupiedError, PhoneNumberUnoccupiedError, SessionPasswordNeededError, ) from telethon.tl.types import User from mautrix.client import Client -from mautrix.errors import MForbidden from mautrix.types import ( EventID, ImageInfo, @@ -47,6 +44,7 @@ from mautrix.types import ( TextMessageEventContent, UserID, ) +from mautrix.util import background_task from mautrix.util.format_duration import format_duration as fmt_duration from ... import user as u @@ -368,7 +366,7 @@ async def _finish_sign_in(evt: CommandEvent, user: User, login_as: u.User = None f"[{existing_user.displayname}] (https://matrix.to/#/{existing_user.mxid})" " was logged out from the account." ) - asyncio.create_task(login_as.post_login(user, first_login=True)) + background_task.create(login_as.post_login(user, first_login=True)) evt.sender.command_status = None name = f"@{user.username}" if user.username else f"+{user.phone}" if login_as != evt.sender: diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index b46db06f..70c05250 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -171,7 +171,7 @@ from mautrix.types import ( UserID, VideoInfo, ) -from mautrix.util import magic, variation_selector +from mautrix.util import background_task, magic, variation_selector from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from mautrix.util.simple_lock import SimpleLock from mautrix.util.simple_template import SimpleTemplate @@ -727,7 +727,7 @@ class Portal(DBPortal, BasePortal): self.log.exception(f"Failed to get entity through {user.tgid} for update") return self.mxid update = self.update_matrix_room(user, entity) - asyncio.create_task(update) + background_task.create(update) await self.invite_to_matrix(invites or []) return self.mxid async with self._room_create_lock: @@ -1525,11 +1525,11 @@ class Portal(DBPortal, BasePortal): ) if self.peer_type == "channel": if not self.megagroup: - asyncio.create_task( + background_task.create( self._try_handle_read_for_sponsored_msg(user, event_id, timestamp) ) else: - asyncio.create_task(self._poll_telegram_reactions(user)) + background_task.create(self._poll_telegram_reactions(user)) async def _preproc_kick_ban( self, user: u.User | p.Puppet, source: u.User @@ -1964,7 +1964,7 @@ class Portal(DBPortal, BasePortal): message_type=msgtype, ) await self._send_delivery_receipt(event_id) - asyncio.create_task(self._send_message_status(event_id, err=None)) + background_task.create(self._send_message_status(event_id, err=None)) if response.ttl_period: await self._mark_disappearing( event_id=event_id, @@ -2257,7 +2257,7 @@ class Portal(DBPortal, BasePortal): EventType.ROOM_REDACTION, ) await self._send_delivery_receipt(redaction_event_id) - asyncio.create_task(self._send_message_status(redaction_event_id, err=None)) + background_task.create(self._send_message_status(redaction_event_id, err=None)) async def _handle_matrix_reaction_deletion( self, deleter: u.User, event_id: EventID, tg_space: TelegramID @@ -2370,7 +2370,7 @@ class Portal(DBPortal, BasePortal): EventType.REACTION, ) await self._send_delivery_receipt(reaction_event_id) - asyncio.create_task(self._send_message_status(reaction_event_id, err=None)) + background_task.create(self._send_message_status(reaction_event_id, err=None)) async def _handle_matrix_reaction( self, @@ -2586,7 +2586,7 @@ class Portal(DBPortal, BasePortal): return if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None: - asyncio.create_task( + background_task.create( self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions) ) sender_id = sender.tgid if sender else self.tgid @@ -3419,7 +3419,7 @@ class Portal(DBPortal, BasePortal): await intent.redact(self.mxid, event_id) return if isinstance(evt, Message) and evt.reactions: - asyncio.create_task( + background_task.create( self.try_handle_telegram_reactions( source, dbm.tgid, evt.reactions, dbm=dbm, timestamp=evt.date ) @@ -3440,7 +3440,7 @@ class Portal(DBPortal, BasePortal): dm = DisappearingMessage(self.mxid, event_id, seconds, expiration_ts=expires_at * 1000) await dm.insert() if expires_at: - asyncio.create_task(self._disappear_event(dm)) + background_task.create(self._disappear_event(dm)) async def _create_room_on_action( self, source: au.AbstractUser, action: TypeMessageAction diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 8fcd678a..2d77c23e 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -59,6 +59,7 @@ from mautrix.bridge import BaseUser, async_getter_lock from mautrix.client import Client from mautrix.errors import MatrixRequestError, MNotFound from mautrix.types import PushActionType, PushRuleKind, PushRuleScope, RoomID, RoomTagInfo, UserID +from mautrix.util import background_task from mautrix.util.bridge_state import BridgeState, BridgeStateEvent from mautrix.util.opt_prometheus import Gauge @@ -259,7 +260,7 @@ class User(DBUser, AbstractUser, BaseUser): else: # Authenticated, run post login self.log.debug(f"Ensuring post_login() for {self.name}") - asyncio.create_task(self.post_login()) + background_task.create(self.post_login()) return self # Not authenticated, delete data if necessary if delete_unless_authenticated and self.client is not None: diff --git a/mautrix_telegram/web/common/auth_api.py b/mautrix_telegram/web/common/auth_api.py index 5da82d3f..119768f2 100644 --- a/mautrix_telegram/web/common/auth_api.py +++ b/mautrix_telegram/web/common/auth_api.py @@ -38,6 +38,7 @@ from telethon.errors import ( ) from mautrix.bridge import InvalidAccessToken, OnlyLoginSelf +from mautrix.util import background_task from mautrix.util.format_duration import format_duration from ...commands.telegram.auth import enter_password @@ -199,7 +200,7 @@ class AuthAPI(abc.ABC): existing_user = await User.get_by_tgid(user_info.id) if existing_user and existing_user != user: await existing_user.log_out() - asyncio.create_task(user.post_login(user_info, first_login=True)) + background_task.create(user.post_login(user_info, first_login=True)) if user.command_status and user.command_status["action"] == "Login": user.command_status = None diff --git a/mautrix_telegram/web/provisioning/__init__.py b/mautrix_telegram/web/provisioning/__init__.py index 7320b176..5bd50d98 100644 --- a/mautrix_telegram/web/provisioning/__init__.py +++ b/mautrix_telegram/web/provisioning/__init__.py @@ -32,6 +32,7 @@ from mautrix.appservice import AppService from mautrix.client import Client from mautrix.errors import IntentError, MatrixRequestError from mautrix.types import UserID +from mautrix.util import background_task from ...commands.portal.util import get_initial_state, user_has_power_level from ...portal import Portal @@ -227,7 +228,7 @@ class ProvisioningAPI(AuthAPI): portal.photo_id = "" await portal.save() - asyncio.create_task(portal.update_matrix_room(user, entity, levels=levels)) + background_task.create(portal.update_matrix_room(user, entity, levels=levels)) return web.Response(status=202, body="{}") @@ -348,7 +349,7 @@ class ProvisioningAPI(AuthAPI): self.log.exception("Failed to disconnect chat") return self.get_error_response(500, "exception", "Failed to disconnect chat") else: - asyncio.create_task(coro) + background_task.create(coro) return web.json_response({}, status=200 if sync else 202) async def get_user_info(self, request: web.Request) -> web.Response: diff --git a/requirements.txt b/requirements.txt index 2c395264..fcc8dd22 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ python-magic>=0.4,<0.5 commonmark>=0.8,<0.10 aiohttp>=3,<4 yarl>=1,<2 -mautrix>=0.19.3,<0.20 +mautrix>=0.19.4,<0.20 #telethon>=1.25.4,<1.27 tulir-telethon==1.28.0a1 asyncpg>=0.20,<0.28