Deduplicate service messages, typing notifications and presence
This commit is contained in:
@@ -194,12 +194,16 @@ class IntentAPI:
|
||||
content = {"displayname": name}
|
||||
return await self.client.request("PUT", f"/profile/{self.mxid}/displayname", content)
|
||||
|
||||
async def set_presence(self, status="online"):
|
||||
async def set_presence(self, status="online", ignore_cache=False):
|
||||
await self.ensure_registered()
|
||||
if not ignore_cache and self.state_store.has_presence(self.mxid, status):
|
||||
return
|
||||
content = {
|
||||
"presence": status
|
||||
}
|
||||
return await self.client.request("PUT", f"/presence/{self.mxid}/status", content)
|
||||
resp = await self.client.request("PUT", f"/presence/{self.mxid}/status", content)
|
||||
self.state_store.set_presence(self.mxid, status)
|
||||
return resp
|
||||
|
||||
async def set_avatar(self, url):
|
||||
await self.ensure_registered()
|
||||
@@ -340,14 +344,18 @@ class IntentAPI:
|
||||
await self.ensure_joined(room_id)
|
||||
return await self.client.request("GET", f"/rooms/{room_id}/event/{event_id}")
|
||||
|
||||
async def set_typing(self, room_id, is_typing=True, timeout=5000):
|
||||
async def set_typing(self, room_id, is_typing=True, timeout=5000, ignore_cache=False):
|
||||
await self.ensure_joined(room_id)
|
||||
if not ignore_cache and is_typing == self.state_store.is_typing(room_id, self.mxid):
|
||||
return
|
||||
content = {
|
||||
"typing": is_typing
|
||||
}
|
||||
if is_typing:
|
||||
content["timeout"] = timeout
|
||||
return await self.client.request("PUT", f"/rooms/{room_id}/typing/{self.mxid}", content)
|
||||
resp = await self.client.request("PUT", f"/rooms/{room_id}/typing/{self.mxid}", content)
|
||||
self.state_store.set_typing(room_id, self.mxid, is_typing, timeout)
|
||||
return resp
|
||||
|
||||
async def mark_read(self, room_id, event_id):
|
||||
await self.ensure_joined(room_id)
|
||||
|
||||
@@ -15,14 +15,21 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
import json
|
||||
import time
|
||||
|
||||
|
||||
class StateStore:
|
||||
def __init__(self, autosave_file=None):
|
||||
self.autosave_file = autosave_file
|
||||
|
||||
# Persistent storage
|
||||
self.registrations = set()
|
||||
self.memberships = {}
|
||||
self.power_levels = {}
|
||||
self.autosave_file = autosave_file
|
||||
|
||||
# Non-persistent storage
|
||||
self.presence = {}
|
||||
self.typing = {}
|
||||
|
||||
def save(self, file):
|
||||
if isinstance(file, str):
|
||||
@@ -63,6 +70,29 @@ class StateStore:
|
||||
if self.autosave_file:
|
||||
self.save(self.autosave_file)
|
||||
|
||||
def set_presence(self, user, presence):
|
||||
self.presence[user] = presence
|
||||
|
||||
def has_presence(self, user, presence):
|
||||
try:
|
||||
return self.presence[user] == presence
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def set_typing(self, room_id, user, is_typing, timeout=0):
|
||||
if is_typing:
|
||||
ts = int(round(time.time() * 1000))
|
||||
self.typing[(room_id, user)] = ts + timeout
|
||||
else:
|
||||
del self.typing[(room_id, user)]
|
||||
|
||||
def is_typing(self, room_id, user):
|
||||
ts = int(round(time.time() * 1000))
|
||||
try:
|
||||
return self.typing[(room_id, user)] > ts
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def is_registered(self, user):
|
||||
return user in self.registrations
|
||||
|
||||
|
||||
@@ -144,13 +144,14 @@ class AbstractUser:
|
||||
await puppet.intent.mark_read(portal.mxid, message.mxid)
|
||||
|
||||
async def update_admin(self, update):
|
||||
# TODO duplication not checked
|
||||
portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat")
|
||||
if isinstance(update, UpdateChatAdmins):
|
||||
await portal.set_telegram_admins_enabled(update.enabled)
|
||||
elif isinstance(update, UpdateChatParticipantAdmin):
|
||||
await portal.set_telegram_admin(update.user_id)
|
||||
else:
|
||||
self.log.warninng("Unexpected admin status update: %s", update)
|
||||
self.log.warning("Unexpected admin status update: %s", update)
|
||||
|
||||
async def update_typing(self, update):
|
||||
if isinstance(update, UpdateUserTyping):
|
||||
@@ -161,6 +162,7 @@ class AbstractUser:
|
||||
await portal.handle_telegram_typing(sender, update)
|
||||
|
||||
async def update_others_info(self, update):
|
||||
# TODO duplication not checked
|
||||
puppet = pu.Puppet.get(update.user_id)
|
||||
if isinstance(update, UpdateUserName):
|
||||
if await puppet.update_displayname(self, update):
|
||||
@@ -169,7 +171,7 @@ class AbstractUser:
|
||||
if await puppet.update_avatar(self, update.photo.photo_big):
|
||||
puppet.save()
|
||||
else:
|
||||
self.log.warninng("Unexpected other user info update: %s", update)
|
||||
self.log.warning("Unexpected other user info update: %s", update)
|
||||
|
||||
async def update_status(self, update):
|
||||
puppet = pu.Puppet.get(update.user_id)
|
||||
@@ -214,7 +216,7 @@ class AbstractUser:
|
||||
return
|
||||
self.log.debug("Handling action %s to %s by %d", update.action, portal.tgid_log,
|
||||
sender.id)
|
||||
return portal.handle_telegram_action(self, sender, update.action)
|
||||
return portal.handle_telegram_action(self, sender, update)
|
||||
|
||||
user = sender.tgid if sender else "admin"
|
||||
if isinstance(original_update, (UpdateEditMessage, UpdateEditChannelMessage)):
|
||||
|
||||
+37
-21
@@ -64,6 +64,7 @@ class Portal:
|
||||
|
||||
self._dedup = deque()
|
||||
self._dedup_mxid = {}
|
||||
self._dedup_action = deque()
|
||||
|
||||
if save_to_cache:
|
||||
if tgid:
|
||||
@@ -95,36 +96,47 @@ class Portal:
|
||||
print("BOT PRINT", self.bot)
|
||||
return self.bot and self.bot.is_in_chat(self.tgid)
|
||||
|
||||
def _hash_event(self, event):
|
||||
if self.peer_type == "channel":
|
||||
# Message IDs are unique per-channel
|
||||
return event.id
|
||||
|
||||
@staticmethod
|
||||
def _hash_event(event):
|
||||
# Non-channel messages are unique per-user (wtf telegram), so we have no other choice than
|
||||
# to deduplicate based on a hash of the message content.
|
||||
|
||||
# The timestamp is only accurate to the second, so we can't rely on solely that either.
|
||||
hash_content = [event.date.timestamp(), event.message]
|
||||
if event.fwd_from:
|
||||
hash_content += [event.fwd_from.from_id, event.fwd_from.channel_id]
|
||||
elif isinstance(event, Message) and event.media:
|
||||
try:
|
||||
hash_content += {
|
||||
MessageMediaContact: lambda media: [media.user_id],
|
||||
MessageMediaDocument: lambda media: [media.document.id, media.caption],
|
||||
MessageMediaPhoto: lambda media: [media.photo.id, media.caption],
|
||||
MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat],
|
||||
}[type(event.media)](event.media)
|
||||
except KeyError:
|
||||
pass
|
||||
# The timestamp is only accurate to the second, so we can't rely solely on that either.
|
||||
if isinstance(event, MessageService):
|
||||
hash_content = [event.date.timestamp(), event.from_id, event.action]
|
||||
else:
|
||||
hash_content = [event.date.timestamp(), event.message]
|
||||
if event.fwd_from:
|
||||
hash_content += [event.fwd_from.from_id, event.fwd_from.channel_id]
|
||||
elif isinstance(event, Message) and event.media:
|
||||
try:
|
||||
hash_content += {
|
||||
MessageMediaContact: lambda media: [media.user_id],
|
||||
MessageMediaDocument: lambda media: [media.document.id, media.caption],
|
||||
MessageMediaPhoto: lambda media: [media.photo.id, media.caption],
|
||||
MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat],
|
||||
}[type(event.media)](event.media)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return hashlib.md5("-"
|
||||
.join(str(a) for a in hash_content)
|
||||
.encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
def is_duplicate_action(self, event):
|
||||
hash = self._hash_event(event) if self.peer_type != "channel" else event.id
|
||||
if hash in self._dedup_action:
|
||||
return True
|
||||
|
||||
self._dedup_action.append(hash)
|
||||
|
||||
if len(self._dedup_action) > 20:
|
||||
self._dedup_action.popleft()
|
||||
return False
|
||||
|
||||
def is_duplicate(self, event, mxid=None):
|
||||
hash = self._hash_event(event)
|
||||
hash = self._hash_event(event) if self.peer_type != "channel" else event.id
|
||||
if hash in self._dedup:
|
||||
return self._dedup_mxid[hash]
|
||||
|
||||
@@ -901,7 +913,8 @@ class Portal:
|
||||
self.db.add(DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=mxid, tg_space=tg_space))
|
||||
self.db.commit()
|
||||
|
||||
async def handle_telegram_action(self, source, sender, action):
|
||||
async def handle_telegram_action(self, source, sender, update):
|
||||
action = update.action
|
||||
if not self.mxid:
|
||||
create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate)
|
||||
create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink)
|
||||
@@ -911,6 +924,9 @@ class Portal:
|
||||
if not isinstance(action, create_and_continue):
|
||||
return
|
||||
|
||||
if self.is_duplicate_action(update):
|
||||
return
|
||||
|
||||
# TODO figure out how to see changes to about text / channel username
|
||||
if isinstance(action, MessageActionChatEditTitle):
|
||||
if await self.update_title(action.title):
|
||||
|
||||
Reference in New Issue
Block a user