Make play IDs shorter
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
import codecs
|
||||
import base64
|
||||
import re
|
||||
|
||||
@@ -164,6 +165,9 @@ async def sync(evt: CommandEvent) -> Optional[Dict]:
|
||||
return await evt.reply("Synchronization complete.")
|
||||
|
||||
|
||||
PEER_TYPE_CHAT = b"g"
|
||||
|
||||
|
||||
@command_handler(help_section=SECTION_MISC,
|
||||
help_args="<play ID>",
|
||||
help_text="Play a Telegram game")
|
||||
@@ -176,21 +180,19 @@ async def play(evt: CommandEvent) -> Optional[Dict]:
|
||||
return await evt.reply("Bots can't play games :(")
|
||||
|
||||
try:
|
||||
play_id = evt.args[0]
|
||||
play_id += (4 - len(play_id) % 4) * "="
|
||||
play_id = base64.b64decode(play_id)
|
||||
peer_type, play_id = bytes([play_id[0]]), play_id[1:]
|
||||
tgid = TelegramID(int(codecs.encode(play_id[0:5], "hex_codec"), 16))
|
||||
msg_id = TelegramID(int(codecs.encode(play_id[5:10], "hex_codec"), 16))
|
||||
space = None
|
||||
peer_type, play_id = base64.b64decode(evt.args[0]).decode("utf-8").split("-", 1)
|
||||
if peer_type == "chan" or peer_type == "user":
|
||||
tgid, msg_id = play_id.split("-")
|
||||
elif peer_type == "chat":
|
||||
tgid, space, msg_id = play_id.split("-")
|
||||
space = TelegramID(int(space))
|
||||
else:
|
||||
raise ValueError()
|
||||
tgid = TelegramID(int(tgid))
|
||||
msg_id = TelegramID(int(msg_id))
|
||||
if peer_type == PEER_TYPE_CHAT:
|
||||
space = TelegramID(int(codecs.encode(play_id[10:15], "hex_codec"), 16))
|
||||
except ValueError:
|
||||
return await evt.reply("Invalid play ID (format)")
|
||||
|
||||
if peer_type == "chat":
|
||||
if peer_type == PEER_TYPE_CHAT:
|
||||
orig_msg = DBMessage.get_by_tgid(msg_id, space)
|
||||
if not orig_msg:
|
||||
return await evt.reply("Invalid play ID (original message not found in db)")
|
||||
@@ -200,7 +202,7 @@ async def play(evt: CommandEvent) -> Optional[Dict]:
|
||||
msg_id = new_msg.tgid
|
||||
try:
|
||||
peer = await evt.sender.client.get_input_entity(tgid)
|
||||
except ValueError as e:
|
||||
except ValueError:
|
||||
return await evt.reply("Invalid play ID (chat not found)")
|
||||
|
||||
msg = await evt.sender.client.get_messages(entity=peer, ids=msg_id)
|
||||
|
||||
@@ -22,6 +22,7 @@ from html import escape as escape_html
|
||||
import asyncio
|
||||
import random
|
||||
import mimetypes
|
||||
import codecs
|
||||
import unicodedata
|
||||
import base64
|
||||
import hashlib
|
||||
@@ -1430,21 +1431,32 @@ class Portal:
|
||||
"net.maunium.telegram.unsupported": True,
|
||||
}, timestamp=evt.date, external_url=self.get_external_url(evt))
|
||||
|
||||
@staticmethod
|
||||
def _int_to_bytes(i: int) -> bytes:
|
||||
hex = "{0:010x}".format(i)
|
||||
return codecs.decode(hex, "hex_codec")
|
||||
|
||||
async def handle_telegram_game(self, source: 'AbstractUser', intent: IntentAPI,
|
||||
evt: Message, relates_to: dict = None):
|
||||
game = evt.media.game
|
||||
if self.peer_type == "channel":
|
||||
play_id = base64.b64encode(f"chan-{self.tgid}-{evt.id}".encode("utf-8"))
|
||||
play_id = base64.b64encode(b"c"
|
||||
+ self._int_to_bytes(self.tgid)
|
||||
+ self._int_to_bytes(evt.id))
|
||||
elif self.peer_type == "chat":
|
||||
play_id = base64.b64encode(f"chat-{self.tgid}-{source.tgid}-{evt.id}".encode("utf-8"))
|
||||
play_id = base64.b64encode(b"g"
|
||||
+ self._int_to_bytes(self.tgid)
|
||||
+ self._int_to_bytes(evt.id)
|
||||
+ self._int_to_bytes(source.tgid))
|
||||
elif self.peer_type == "user":
|
||||
play_id = base64.b64encode(f"user-{self.tgid}-{evt.id}".encode("utf-8"))
|
||||
play_id = base64.b64encode(b"u"
|
||||
+ self._int_to_bytes(self.tgid)
|
||||
+ self._int_to_bytes(evt.id))
|
||||
else:
|
||||
raise ValueError("Portal has invalid peer type")
|
||||
play_id = play_id.decode("utf-8")
|
||||
play_id = play_id.decode("utf-8").rstrip("=")
|
||||
command = f"!tg play {play_id}"
|
||||
override_text = (f"Run {command} in your bridge management room to "
|
||||
f"play {game.title}:\n\n{game.description}")
|
||||
override_text = f"Run {command} in your bridge management room to play {game.title}"
|
||||
override_entities = [MessageEntityCode(offset=len("Run "), length=len(command))]
|
||||
text, html, relates_to = await formatter.telegram_to_matrix(
|
||||
evt, source, self.main_intent,
|
||||
|
||||
Reference in New Issue
Block a user