diff --git a/mautrix_telegram/db/telethon_session.py b/mautrix_telegram/db/telethon_session.py index d520da91..2df272bd 100644 --- a/mautrix_telegram/db/telethon_session.py +++ b/mautrix_telegram/db/telethon_session.py @@ -123,19 +123,55 @@ class PgSession(MemorySession): date = datetime.datetime.utcfromtimestamp(row["date"]) return updates.State(row["pts"], row["qts"], date, row["seq"], row["unread_count"]) + _set_update_state_q = """ + INSERT INTO telethon_update_state (session_id, entity_id, pts, qts, date, seq, unread_count) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (session_id, entity_id) DO UPDATE SET + pts=excluded.pts, qts=excluded.qts, date=excluded.date, seq=excluded.seq, + unread_count=excluded.unread_count + """ + async def set_update_state(self, entity_id: int, row: updates.State) -> None: - q = """ - INSERT INTO telethon_update_state(session_id, entity_id, pts, qts, date, seq, unread_count) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (session_id, entity_id) DO UPDATE SET - pts=excluded.pts, qts=excluded.qts, date=excluded.date, seq=excluded.seq, - unread_count=excluded.unread_count - """ + q = self._set_update_state_q ts = row.date.timestamp() await self.db.execute( q, self.session_id, entity_id, row.pts, row.qts, ts, row.seq, row.unread_count ) + async def set_update_states(self, rows: list[tuple[int, updates.State]]) -> None: + rows = [ + ( + self.session_id, + entity_id, + row.pts, + row.qts, + row.date.timestamp(), + row.seq, + row.unread_count, + ) + for entity_id, row in rows + ] + if self.db.scheme == Scheme.POSTGRES: + q = """ + INSERT INTO telethon_update_state ( + session_id, entity_id, pts, qts, date, seq, unread_count + ) + VALUES ( + $1, + unnest($2::bigint[]), unnest($3::bigint[]), unnest($4::bigint[]), + unnest($5::bigint[]), unnest($6::bigint[]), unnest($7::integer[]) + ) + ON CONFLICT (session_id, entity_id) DO UPDATE SET + pts=excluded.pts, qts=excluded.qts, date=excluded.date, seq=excluded.seq, + unread_count=excluded.unread_count + """ + _, entity_ids, ptses, qtses, timestamps, seqs, unread_counts = zip(*rows) + await self.db.execute( + q, self.session_id, entity_ids, ptses, qtses, timestamps, seqs, unread_counts + ) + else: + await self.db.executemany(self._set_update_state_q, rows) + async def delete_update_state(self, entity_id: int) -> None: q = "DELETE FROM telethon_update_state WHERE session_id=$1 AND entity_id=$2" await self.db.execute(q, self.session_id, entity_id) diff --git a/requirements.txt b/requirements.txt index fcc8dd22..abacdf0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ aiohttp>=3,<4 yarl>=1,<2 mautrix>=0.19.4,<0.20 #telethon>=1.25.4,<1.27 -tulir-telethon==1.28.0a1 +tulir-telethon==1.28.0a3 asyncpg>=0.20,<0.28 mako>=1,<2 setuptools