From ed1e5474bf412332d7072960c144210f857eb818 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 19 Sep 2022 19:10:08 +0300 Subject: [PATCH] Update latest revision migration --- .../db/upgrade/v00_latest_revision.py | 32 +++++++++++++++++-- .../db/upgrade/v01_initial_revision.py | 26 ++++++--------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/mautrix_telegram/db/upgrade/v00_latest_revision.py b/mautrix_telegram/db/upgrade/v00_latest_revision.py index 9ddb417c..ae4acd5a 100644 --- a/mautrix_telegram/db/upgrade/v00_latest_revision.py +++ b/mautrix_telegram/db/upgrade/v00_latest_revision.py @@ -13,12 +13,12 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from mautrix.util.async_db import Connection +from mautrix.util.async_db import Connection, Scheme -latest_version = 10 +latest_version = 13 -async def create_latest_tables(conn: Connection) -> int: +async def create_latest_tables(conn: Connection, scheme: Scheme) -> int: await conn.execute( """CREATE TABLE "user" ( mxid TEXT PRIMARY KEY, @@ -67,6 +67,8 @@ async def create_latest_tables(conn: Connection) -> int: edit_index INTEGER, redacted BOOLEAN NOT NULL DEFAULT false, content_hash bytea, + sender_mxid TEXT, + sender BIGINT, PRIMARY KEY (tgid, tg_space, edit_index), UNIQUE (mxid, mx_room, tg_space) )""" @@ -207,4 +209,28 @@ async def create_latest_tables(conn: Connection) -> int: PRIMARY KEY (session_id, entity_id) )""" ) + gen = "" + if scheme in (Scheme.POSTGRES, Scheme.COCKROACH): + gen = "GENERATED ALWAYS AS IDENTITY" + await conn.execute( + f""" + CREATE TABLE backfill_queue ( + queue_id INTEGER PRIMARY KEY {gen}, + user_mxid TEXT, + priority INTEGER NOT NULL, + portal_tgid BIGINT, + portal_tg_receiver BIGINT, + messages_per_batch INTEGER NOT NULL, + post_batch_delay INTEGER NOT NULL, + max_batches INTEGER NOT NULL, + dispatch_time TIMESTAMP, + completed_at TIMESTAMP, + cooldown_timeout TIMESTAMP, + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_tgid, portal_tg_receiver) + REFERENCES portal(tgid, tg_receiver) ON DELETE CASCADE + ) + """ + ) + return latest_version diff --git a/mautrix_telegram/db/upgrade/v01_initial_revision.py b/mautrix_telegram/db/upgrade/v01_initial_revision.py index ee8dfbf3..8ebf55c6 100644 --- a/mautrix_telegram/db/upgrade/v01_initial_revision.py +++ b/mautrix_telegram/db/upgrade/v01_initial_revision.py @@ -24,29 +24,21 @@ legacy_version_query = "SELECT version_num FROM alembic_version" last_legacy_version = "bfc0a39bfe02" -def table_exists(scheme: str, name: str) -> str: - if scheme == Scheme.SQLITE: - return f"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='{name}')" - elif scheme in (Scheme.POSTGRES, Scheme.COCKROACH): - return f"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name='{name}')" - raise RuntimeError("unsupported database scheme") - - -async def first_upgrade_target(conn: Connection, scheme: str) -> int: - is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version")) +async def first_upgrade_target(conn: Connection, scheme: Scheme) -> int: + is_legacy = await conn.table_exists("alembic_version") # If it's a legacy db, the upgrade process will go to v1 and run each migration up to latest. # If it's a new db, we'll create the latest tables directly (see create_latest_tables call). return 1 if is_legacy else latest_version @upgrade_table.register(description="Initial asyncpg revision", upgrades_to=first_upgrade_target) -async def upgrade_v1(conn: Connection, scheme: str) -> int: - is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version")) +async def upgrade_v1(conn: Connection, scheme: Scheme) -> int: + is_legacy = await conn.table_exists("alembic_version") if is_legacy: await migrate_legacy_to_v1(conn, scheme) return 1 else: - return await create_latest_tables(conn) + return await create_latest_tables(conn, scheme) async def drop_constraints(conn: Connection, table: str, contype: str) -> None: @@ -59,14 +51,14 @@ async def drop_constraints(conn: Connection, table: str, contype: str) -> None: await conn.execute(f"ALTER TABLE {table} {drops}") -async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None: +async def migrate_legacy_to_v1(conn: Connection, scheme: Scheme) -> None: legacy_version = await conn.fetchval(legacy_version_query) if legacy_version != last_legacy_version: raise RuntimeError( "Legacy database is not on last version. " "Please upgrade the old database with alembic or drop it completely first." ) - if scheme != "sqlite": + if scheme != Scheme.SQLITE: await drop_constraints(conn, "contact", contype="f") await conn.execute( """ @@ -131,12 +123,12 @@ async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None: await conn.execute("DROP TABLE alembic_version") -async def update_state_store(conn: Connection, scheme: str) -> None: +async def update_state_store(conn: Connection, scheme: Scheme) -> None: # The Matrix state store already has more or less the correct schema, so set the version await conn.execute("CREATE TABLE mx_version (version INTEGER PRIMARY KEY)") await conn.execute("INSERT INTO mx_version (version) VALUES (2)") await conn.execute("UPDATE mx_user_profile SET membership='LEAVE' WHERE membership='LEFT'") - if scheme != "sqlite": + if scheme != Scheme.SQLITE: # Also add the membership type on postgres await conn.execute( "CREATE TYPE membership AS ENUM ('join', 'leave', 'invite', 'ban', 'knock')"