diff --git a/mautrix_telegram/db/message.py b/mautrix_telegram/db/message.py index cee5abb2..5d907d02 100644 --- a/mautrix_telegram/db/message.py +++ b/mautrix_telegram/db/message.py @@ -21,7 +21,7 @@ from asyncpg import Record from attr import dataclass from mautrix.types import EventID, RoomID -from mautrix.util.async_db import Database +from mautrix.util.async_db import Database, Scheme from ..types import TelegramID @@ -76,7 +76,7 @@ class Message: async def get_first_by_tgids( cls, tgids: list[TelegramID], tg_space: TelegramID ) -> list[Message]: - if cls.db.scheme == "postgres": + if cls.db.scheme in (Scheme.POSTGRES, Scheme.COCKROACH): q = ( f"SELECT {cls.columns} FROM message" " WHERE tgid=ANY($1) AND tg_space=$2 AND edit_index=0" @@ -123,7 +123,7 @@ class Message: async def get_by_mxids( cls, mxids: list[EventID], mx_room: RoomID, tg_space: TelegramID ) -> list[Message]: - if cls.db.scheme == "postgres": + if cls.db.scheme in (Scheme.POSTGRES, Scheme.COCKROACH): q = ( f"SELECT {cls.columns} FROM message" " WHERE mxid=ANY($1) AND mx_room=$2 AND tg_space=$3" diff --git a/mautrix_telegram/db/telethon_session.py b/mautrix_telegram/db/telethon_session.py index e26e33ca..5646c016 100644 --- a/mautrix_telegram/db/telethon_session.py +++ b/mautrix_telegram/db/telethon_session.py @@ -24,7 +24,7 @@ from telethon.crypto import AuthKey from telethon.sessions import MemorySession from telethon.tl.types import PeerChannel, PeerChat, PeerUser, updates -from mautrix.util.async_db import Database +from mautrix.util.async_db import Database, Scheme fake_db = Database.create("") if TYPE_CHECKING else None @@ -153,7 +153,7 @@ class PgSession(MemorySession): ] = self._entities_to_rows(tlo) if not rows: return - if self.db.scheme == "postgres": + if self.db.scheme == Scheme.POSTGRES: q = ( "INSERT INTO telethon_entities (session_id, id, hash, username, phone, name) " "VALUES ($1, unnest($2::bigint[]), unnest($3::bigint[]), " @@ -201,7 +201,7 @@ class PgSession(MemorySession): utils.get_peer_id(PeerChat(key)), utils.get_peer_id(PeerChannel(key)), ) - if self.db.scheme == "postgres": + if self.db.scheme in (Scheme.POSTGRES, Scheme.COCKROACH): return await self._select_entity("id=ANY($1)", ids) else: return await self._select_entity(f"id IN ($1, $2, $3)", *ids) diff --git a/mautrix_telegram/db/upgrade/v00_latest_revision.py b/mautrix_telegram/db/upgrade/v00_latest_revision.py new file mode 100644 index 00000000..f03f9a9b --- /dev/null +++ b/mautrix_telegram/db/upgrade/v00_latest_revision.py @@ -0,0 +1,199 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2022 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from mautrix.util.async_db import Connection + + +async def create_v6_tables(conn: Connection) -> int: + await conn.execute( + """CREATE TABLE "user" ( + mxid TEXT PRIMARY KEY, + tgid BIGINT UNIQUE, + tg_username TEXT, + tg_phone TEXT, + is_bot BOOLEAN NOT NULL DEFAULT false, + saved_contacts INTEGER NOT NULL DEFAULT 0 + )""" + ) + await conn.execute( + """CREATE TABLE portal ( + tgid BIGINT, + tg_receiver BIGINT, + peer_type TEXT NOT NULL, + mxid TEXT UNIQUE, + avatar_url TEXT, + encrypted BOOLEAN NOT NULL DEFAULT false, + username TEXT, + title TEXT, + about TEXT, + photo_id TEXT, + name_set BOOLEAN NOT NULL DEFAULT false, + avatar_set BOOLEAN NOT NULL DEFAULT false, + megagroup BOOLEAN, + config jsonb, + + sponsored_event_id TEXT, + sponsored_event_ts BIGINT, + sponsored_msg_random_id bytea, + + PRIMARY KEY (tgid, tg_receiver) + )""" + ) + await conn.execute( + """CREATE TABLE message ( + mxid TEXT NOT NULL, + mx_room TEXT NOT NULL, + tgid BIGINT, + tg_space BIGINT, + edit_index INTEGER, + redacted BOOLEAN NOT NULL DEFAULT false, + content_hash bytea, + PRIMARY KEY (tgid, tg_space, edit_index), + UNIQUE (mxid, mx_room, tg_space) + )""" + ) + await conn.execute( + """CREATE TABLE reaction ( + mxid TEXT NOT NULL, + mx_room TEXT NOT NULL, + msg_mxid TEXT NOT NULL, + tg_sender BIGINT, + reaction TEXT NOT NULL, + + PRIMARY KEY (msg_mxid, mx_room, tg_sender), + UNIQUE (mxid, mx_room) + )""" + ) + await conn.execute( + """CREATE TABLE disappearing_message ( + room_id TEXT, + event_id TEXT, + expiration_seconds BIGINT, + expiration_ts BIGINT, + + PRIMARY KEY (room_id, event_id) + )""" + ) + await conn.execute( + """CREATE TABLE puppet ( + id BIGINT PRIMARY KEY, + + is_registered BOOLEAN NOT NULL DEFAULT false, + + displayname TEXT, + displayname_source BIGINT, + displayname_contact BOOLEAN NOT NULL DEFAULT true, + displayname_quality INTEGER NOT NULL DEFAULT 0, + disable_updates BOOLEAN NOT NULL DEFAULT false, + username TEXT, + photo_id TEXT, + avatar_url TEXT, + name_set BOOLEAN NOT NULL DEFAULT false, + avatar_set BOOLEAN NOT NULL DEFAULT false, + is_bot BOOLEAN, + is_channel BOOLEAN NOT NULL DEFAULT false, + + access_token TEXT, + custom_mxid TEXT, + next_batch TEXT, + base_url TEXT + )""" + ) + await conn.execute( + """CREATE TABLE telegram_file ( + id TEXT PRIMARY KEY, + mxc TEXT NOT NULL, + mime_type TEXT, + was_converted BOOLEAN NOT NULL DEFAULT false, + timestamp BIGINT NOT NULL DEFAULT 0, + size BIGINT, + width INTEGER, + height INTEGER, + thumbnail TEXT, + decryption_info jsonb, + FOREIGN KEY (thumbnail) REFERENCES telegram_file(id) + ON UPDATE CASCADE ON DELETE SET NULL + )""" + ) + await conn.execute( + """CREATE TABLE bot_chat ( + id BIGINT PRIMARY KEY, + type TEXT NOT NULL + )""" + ) + await conn.execute( + """CREATE TABLE user_portal ( + "user" BIGINT, + portal BIGINT, + portal_receiver BIGINT, + PRIMARY KEY ("user", portal, portal_receiver), + FOREIGN KEY ("user") REFERENCES "user"(tgid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal, portal_receiver) REFERENCES portal(tgid, tg_receiver) + ON DELETE CASCADE ON UPDATE CASCADE + )""" + ) + await conn.execute( + """CREATE TABLE contact ( + "user" BIGINT, + contact BIGINT, + PRIMARY KEY ("user", contact), + FOREIGN KEY ("user") REFERENCES "user"(tgid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (contact) REFERENCES puppet(id) ON DELETE CASCADE ON UPDATE CASCADE + )""" + ) + await conn.execute( + """CREATE TABLE telethon_sessions ( + session_id TEXT PRIMARY KEY, + dc_id INTEGER, + server_address TEXT, + port INTEGER, + auth_key bytea + )""" + ) + await conn.execute( + """CREATE TABLE telethon_entities ( + session_id TEXT, + id BIGINT, + hash BIGINT NOT NULL, + username TEXT, + phone TEXT, + name TEXT, + PRIMARY KEY (session_id, id) + )""" + ) + await conn.execute( + """CREATE TABLE telethon_sent_files ( + session_id TEXT, + md5_digest bytea, + file_size INTEGER, + type INTEGER, + id BIGINT, + hash BIGINT, + PRIMARY KEY (session_id, md5_digest, file_size, type) + )""" + ) + await conn.execute( + """CREATE TABLE telethon_update_state ( + session_id TEXT, + entity_id BIGINT, + pts BIGINT, + qts BIGINT, + date BIGINT, + seq BIGINT, + unread_count INTEGER, + PRIMARY KEY (session_id, entity_id) + )""" + ) + return 6 diff --git a/mautrix_telegram/db/upgrade/v01_initial_revision.py b/mautrix_telegram/db/upgrade/v01_initial_revision.py index 3057e7db..bcbd034b 100644 --- a/mautrix_telegram/db/upgrade/v01_initial_revision.py +++ b/mautrix_telegram/db/upgrade/v01_initial_revision.py @@ -15,29 +15,38 @@ # along with this program. If not, see . from __future__ import annotations -from asyncpg import Connection +from mautrix.util.async_db import Connection, Scheme from . import upgrade_table +from .v00_latest_revision import create_v6_tables legacy_version_query = "SELECT version_num FROM alembic_version" last_legacy_version = "bfc0a39bfe02" def table_exists(scheme: str, name: str) -> str: - if scheme == "sqlite": + if scheme == Scheme.SQLITE: return f"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='{name}')" - elif scheme == "postgres": - return f"SELECT EXISTS(SELECT FROM information_schema.tables WHERE table_name='{name}')" + elif scheme in (Scheme.POSTGRES, Scheme.COCKROACH): + return f"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name='{name}')" raise RuntimeError("unsupported database scheme") -@upgrade_table.register(description="Initial asyncpg revision") -async def upgrade_v1(conn: Connection, scheme: str) -> None: +async def first_upgrade_target(conn: Connection, scheme: str) -> int: + is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version")) + # If it's a legacy db, the upgrade process will go to v1 and run each migration up to v6. + # If it's a new db, we'll create the v6 tables directly (see the create_v6_tables call). + return 1 if is_legacy else 6 + + +@upgrade_table.register(description="Initial asyncpg revision", upgrades_to=first_upgrade_target) +async def upgrade_v1(conn: Connection, scheme: str) -> int: is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version")) if is_legacy: await migrate_legacy_to_v1(conn, scheme) + return 1 else: - await create_v1_tables(conn) + return await create_v6_tables(conn) async def drop_constraints(conn: Connection, table: str, contype: str) -> None: @@ -178,151 +187,3 @@ async def varchar_to_text(conn: Connection) -> None: for table, columns in columns_to_adjust.items(): for column in columns: await conn.execute(f'ALTER TABLE "{table}" ALTER COLUMN {column} TYPE TEXT') - - -async def create_v1_tables(conn: Connection) -> None: - await conn.execute( - """CREATE TABLE "user" ( - mxid TEXT PRIMARY KEY, - tgid BIGINT UNIQUE, - tg_username TEXT, - tg_phone TEXT, - is_bot BOOLEAN NOT NULL DEFAULT false, - saved_contacts INTEGER NOT NULL DEFAULT 0 - )""" - ) - await conn.execute( - """CREATE TABLE portal ( - tgid BIGINT, - tg_receiver BIGINT, - peer_type TEXT NOT NULL, - mxid TEXT UNIQUE, - avatar_url TEXT, - encrypted BOOLEAN NOT NULL DEFAULT false, - username TEXT, - title TEXT, - about TEXT, - photo_id TEXT, - megagroup BOOLEAN, - config jsonb, - PRIMARY KEY (tgid, tg_receiver) - )""" - ) - await conn.execute( - """CREATE TABLE message ( - mxid TEXT, - mx_room TEXT, - tgid BIGINT NOT NULL, - tg_space BIGINT NOT NULL, - edit_index INTEGER NOT NULL, - redacted BOOLEAN NOT NULL DEFAULT false, - PRIMARY KEY (tgid, tg_space, edit_index), - UNIQUE (mxid, mx_room, tg_space) - )""" - ) - await conn.execute( - """CREATE TABLE puppet ( - id BIGINT PRIMARY KEY, - - is_registered BOOLEAN NOT NULL DEFAULT false, - - displayname TEXT, - displayname_source BIGINT, - displayname_contact BOOLEAN NOT NULL DEFAULT true, - displayname_quality INTEGER NOT NULL DEFAULT 0, - disable_updates BOOLEAN NOT NULL DEFAULT false, - username TEXT, - photo_id TEXT, - is_bot BOOLEAN, - - access_token TEXT, - custom_mxid TEXT, - next_batch TEXT, - base_url TEXT - )""" - ) - await conn.execute( - """CREATE TABLE telegram_file ( - id TEXT PRIMARY KEY, - mxc TEXT NOT NULL, - mime_type TEXT, - was_converted BOOLEAN NOT NULL DEFAULT false, - timestamp BIGINT NOT NULL DEFAULT 0, - size BIGINT, - width INTEGER, - height INTEGER, - thumbnail TEXT, - decryption_info jsonb, - FOREIGN KEY (thumbnail) REFERENCES telegram_file(id) - ON UPDATE CASCADE ON DELETE SET NULL - )""" - ) - await conn.execute( - """CREATE TABLE bot_chat ( - id BIGINT PRIMARY KEY, - type TEXT NOT NULL - )""" - ) - await conn.execute( - """CREATE TABLE user_portal ( - "user" BIGINT, - portal BIGINT, - portal_receiver BIGINT, - PRIMARY KEY ("user", portal, portal_receiver), - FOREIGN KEY ("user") REFERENCES "user"(tgid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (portal, portal_receiver) REFERENCES portal(tgid, tg_receiver) - ON DELETE CASCADE ON UPDATE CASCADE - )""" - ) - await conn.execute( - """CREATE TABLE contact ( - "user" BIGINT, - contact BIGINT, - PRIMARY KEY ("user", contact), - FOREIGN KEY ("user") REFERENCES "user"(tgid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (contact) REFERENCES puppet(id) ON DELETE CASCADE ON UPDATE CASCADE - )""" - ) - await conn.execute( - """CREATE TABLE telethon_sessions ( - session_id TEXT PRIMARY KEY, - dc_id INTEGER, - server_address TEXT, - port INTEGER, - auth_key bytea - )""" - ) - await conn.execute( - """CREATE TABLE telethon_entities ( - session_id TEXT, - id BIGINT, - hash BIGINT NOT NULL, - username TEXT, - phone TEXT, - name TEXT, - PRIMARY KEY (session_id, id) - )""" - ) - await conn.execute( - """CREATE TABLE telethon_sent_files ( - session_id TEXT, - md5_digest bytea, - file_size INTEGER, - type INTEGER, - id BIGINT, - hash BIGINT, - PRIMARY KEY (session_id, md5_digest, file_size, type) - )""" - ) - await conn.execute( - """CREATE TABLE telethon_update_state ( - session_id TEXT, - entity_id BIGINT, - pts BIGINT, - qts BIGINT, - date BIGINT, - seq BIGINT, - unread_count INTEGER, - PRIMARY KEY (session_id, entity_id) - )""" - ) diff --git a/mautrix_telegram/db/upgrade/v02_sponsored_events.py b/mautrix_telegram/db/upgrade/v02_sponsored_events.py index 273319d1..f70ee1e6 100644 --- a/mautrix_telegram/db/upgrade/v02_sponsored_events.py +++ b/mautrix_telegram/db/upgrade/v02_sponsored_events.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from asyncpg import Connection +from mautrix.util.async_db import Connection from . import upgrade_table diff --git a/mautrix_telegram/db/upgrade/v03_reactions.py b/mautrix_telegram/db/upgrade/v03_reactions.py index 7007416a..f9bfb6ed 100644 --- a/mautrix_telegram/db/upgrade/v03_reactions.py +++ b/mautrix_telegram/db/upgrade/v03_reactions.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from asyncpg import Connection +from mautrix.util.async_db import Connection from . import upgrade_table diff --git a/mautrix_telegram/db/upgrade/v04_disappearing_messages.py b/mautrix_telegram/db/upgrade/v04_disappearing_messages.py index 411aa8ff..06b865c9 100644 --- a/mautrix_telegram/db/upgrade/v04_disappearing_messages.py +++ b/mautrix_telegram/db/upgrade/v04_disappearing_messages.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from asyncpg import Connection +from mautrix.util.async_db import Connection from . import upgrade_table diff --git a/mautrix_telegram/db/upgrade/v05_channel_ghosts.py b/mautrix_telegram/db/upgrade/v05_channel_ghosts.py index d46364b4..11e4978f 100644 --- a/mautrix_telegram/db/upgrade/v05_channel_ghosts.py +++ b/mautrix_telegram/db/upgrade/v05_channel_ghosts.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from asyncpg import Connection +from mautrix.util.async_db import Connection, Scheme from . import upgrade_table @@ -21,5 +21,5 @@ from . import upgrade_table @upgrade_table.register(description="Add separate ghost users for channel senders") async def upgrade_v5(conn: Connection, scheme: str) -> None: await conn.execute("ALTER TABLE puppet ADD COLUMN is_channel BOOLEAN NOT NULL DEFAULT false") - if scheme == "postgres": + if scheme == Scheme.POSTGRES: await conn.execute("ALTER TABLE puppet ALTER COLUMN is_channel DROP DEFAULT") diff --git a/mautrix_telegram/db/upgrade/v06_puppet_avatar_url.py b/mautrix_telegram/db/upgrade/v06_puppet_avatar_url.py index 24571fec..4ba9cdf1 100644 --- a/mautrix_telegram/db/upgrade/v06_puppet_avatar_url.py +++ b/mautrix_telegram/db/upgrade/v06_puppet_avatar_url.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from asyncpg import Connection +from mautrix.util.async_db import Connection from . import upgrade_table diff --git a/mautrix_telegram/db/user.py b/mautrix_telegram/db/user.py index edb129c7..a35b1d8e 100644 --- a/mautrix_telegram/db/user.py +++ b/mautrix_telegram/db/user.py @@ -21,7 +21,7 @@ from asyncpg import Record from attr import dataclass from mautrix.types import UserID -from mautrix.util.async_db import Database +from mautrix.util.async_db import Database, Scheme from ..types import TelegramID @@ -104,7 +104,7 @@ class User: records = [(self.tgid, puppet_id) for puppet_id in puppets] async with self.db.acquire() as conn, conn.transaction(): await conn.execute('DELETE FROM contact WHERE "user"=$1', self.tgid) - if self.db.scheme == "postgres": + if self.db.scheme == Scheme.POSTGRES: await conn.copy_records_to_table("contact", records=records, columns=columns) else: q = 'INSERT INTO contact ("user", contact) VALUES ($1, $2)' @@ -120,7 +120,7 @@ class User: records = [(self.tgid, tgid, tg_receiver) for tgid, tg_receiver in portals] async with self.db.acquire() as conn, conn.transaction(): await conn.execute('DELETE FROM user_portal WHERE "user"=$1', self.tgid) - if self.db.scheme == "postgres": + if self.db.scheme == Scheme.POSTGRES: await conn.copy_records_to_table("user_portal", records=records, columns=columns) else: q = 'INSERT INTO user_portal ("user", portal, portal_receiver) VALUES ($1, $2, $3)' diff --git a/requirements.txt b/requirements.txt index 474f7e21..a78720d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ python-magic>=0.4,<0.5 commonmark>=0.8,<0.10 aiohttp>=3,<4 yarl>=1,<2 -mautrix>=0.14.9,<0.15 +mautrix>=0.14.11,<0.15 #telethon>=1.24,<1.25 # Fork to make session storage async and update to layer 138 tulir-telethon==1.25.0a5