Update latest revision migration

This commit is contained in:
Tulir Asokan
2022-09-19 19:10:08 +03:00
parent 65bd7fcc49
commit ed1e5474bf
2 changed files with 38 additions and 20 deletions
@@ -13,12 +13,12 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from mautrix.util.async_db import Connection
from mautrix.util.async_db import Connection, Scheme
latest_version = 10
latest_version = 13
async def create_latest_tables(conn: Connection) -> int:
async def create_latest_tables(conn: Connection, scheme: Scheme) -> int:
await conn.execute(
"""CREATE TABLE "user" (
mxid TEXT PRIMARY KEY,
@@ -67,6 +67,8 @@ async def create_latest_tables(conn: Connection) -> int:
edit_index INTEGER,
redacted BOOLEAN NOT NULL DEFAULT false,
content_hash bytea,
sender_mxid TEXT,
sender BIGINT,
PRIMARY KEY (tgid, tg_space, edit_index),
UNIQUE (mxid, mx_room, tg_space)
)"""
@@ -207,4 +209,28 @@ async def create_latest_tables(conn: Connection) -> int:
PRIMARY KEY (session_id, entity_id)
)"""
)
gen = ""
if scheme in (Scheme.POSTGRES, Scheme.COCKROACH):
gen = "GENERATED ALWAYS AS IDENTITY"
await conn.execute(
f"""
CREATE TABLE backfill_queue (
queue_id INTEGER PRIMARY KEY {gen},
user_mxid TEXT,
priority INTEGER NOT NULL,
portal_tgid BIGINT,
portal_tg_receiver BIGINT,
messages_per_batch INTEGER NOT NULL,
post_batch_delay INTEGER NOT NULL,
max_batches INTEGER NOT NULL,
dispatch_time TIMESTAMP,
completed_at TIMESTAMP,
cooldown_timeout TIMESTAMP,
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (portal_tgid, portal_tg_receiver)
REFERENCES portal(tgid, tg_receiver) ON DELETE CASCADE
)
"""
)
return latest_version
@@ -24,29 +24,21 @@ legacy_version_query = "SELECT version_num FROM alembic_version"
last_legacy_version = "bfc0a39bfe02"
def table_exists(scheme: str, name: str) -> str:
if scheme == Scheme.SQLITE:
return f"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='{name}')"
elif scheme in (Scheme.POSTGRES, Scheme.COCKROACH):
return f"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name='{name}')"
raise RuntimeError("unsupported database scheme")
async def first_upgrade_target(conn: Connection, scheme: str) -> int:
is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version"))
async def first_upgrade_target(conn: Connection, scheme: Scheme) -> int:
is_legacy = await conn.table_exists("alembic_version")
# If it's a legacy db, the upgrade process will go to v1 and run each migration up to latest.
# If it's a new db, we'll create the latest tables directly (see create_latest_tables call).
return 1 if is_legacy else latest_version
@upgrade_table.register(description="Initial asyncpg revision", upgrades_to=first_upgrade_target)
async def upgrade_v1(conn: Connection, scheme: str) -> int:
is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version"))
async def upgrade_v1(conn: Connection, scheme: Scheme) -> int:
is_legacy = await conn.table_exists("alembic_version")
if is_legacy:
await migrate_legacy_to_v1(conn, scheme)
return 1
else:
return await create_latest_tables(conn)
return await create_latest_tables(conn, scheme)
async def drop_constraints(conn: Connection, table: str, contype: str) -> None:
@@ -59,14 +51,14 @@ async def drop_constraints(conn: Connection, table: str, contype: str) -> None:
await conn.execute(f"ALTER TABLE {table} {drops}")
async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None:
async def migrate_legacy_to_v1(conn: Connection, scheme: Scheme) -> None:
legacy_version = await conn.fetchval(legacy_version_query)
if legacy_version != last_legacy_version:
raise RuntimeError(
"Legacy database is not on last version. "
"Please upgrade the old database with alembic or drop it completely first."
)
if scheme != "sqlite":
if scheme != Scheme.SQLITE:
await drop_constraints(conn, "contact", contype="f")
await conn.execute(
"""
@@ -131,12 +123,12 @@ async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None:
await conn.execute("DROP TABLE alembic_version")
async def update_state_store(conn: Connection, scheme: str) -> None:
async def update_state_store(conn: Connection, scheme: Scheme) -> None:
# The Matrix state store already has more or less the correct schema, so set the version
await conn.execute("CREATE TABLE mx_version (version INTEGER PRIMARY KEY)")
await conn.execute("INSERT INTO mx_version (version) VALUES (2)")
await conn.execute("UPDATE mx_user_profile SET membership='LEAVE' WHERE membership='LEFT'")
if scheme != "sqlite":
if scheme != Scheme.SQLITE:
# Also add the membership type on postgres
await conn.execute(
"CREATE TYPE membership AS ENUM ('join', 'leave', 'invite', 'ban', 'knock')"