Fix linebreak handling in lxml parser and add better bullets
Fixes #218
This commit is contained in:
@@ -22,10 +22,15 @@ from telethon.tl.types import TypeMessageEntity
|
||||
class MatrixParserCommon:
|
||||
mention_regex = re.compile("https://matrix.to/#/(@.+:.+)") # type: Pattern
|
||||
room_regex = re.compile("https://matrix.to/#/(#.+:.+)") # type: Pattern
|
||||
block_tags = ("br", "p", "pre", "blockquote",
|
||||
block_tags = ("p", "pre", "blockquote",
|
||||
"ol", "ul", "li",
|
||||
"h1", "h2", "h3", "h4", "h5", "h6",
|
||||
"div", "hr", "table") # type: Tuple[str, ...]
|
||||
list_bullets = ("●", "○", "■", "‣")
|
||||
|
||||
@classmethod
|
||||
def list_bullet(cls, depth: int) -> str:
|
||||
return cls.list_bullets[(depth - 1) % len(cls.list_bullets)]
|
||||
|
||||
|
||||
ParsedMessage = Tuple[str, List[TypeMessageEntity]]
|
||||
|
||||
@@ -123,7 +123,7 @@ class MatrixParser(HTMLParser, MatrixParserCommon):
|
||||
self._open_tags_meta.popleft()
|
||||
self._open_tags_meta.appendleft(url)
|
||||
|
||||
if tag in self.block_tags and ("blockquote" not in self._open_tags or tag == "br"):
|
||||
if (tag in self.block_tags and ("blockquote" not in self._open_tags)) or tag == "br":
|
||||
self._newline()
|
||||
|
||||
if entity_type and tag not in self._building_entities:
|
||||
@@ -202,7 +202,8 @@ class MatrixParser(HTMLParser, MatrixParserCommon):
|
||||
else:
|
||||
prefix = int(math.log(n, 10)) * 3 * " " + 4 * " "
|
||||
else:
|
||||
prefix = "* " if self._list_entry_is_new else 3 * " "
|
||||
prefix = (f"{self.list_bullet(self._open_tags.count('ul'))} "
|
||||
if self._list_entry_is_new else 3 * " ")
|
||||
if not self._list_entry_is_new and not self._line_is_new:
|
||||
prefix = ""
|
||||
extra_offset += len(indent) + len(prefix)
|
||||
|
||||
@@ -171,11 +171,32 @@ class TelegramMessage:
|
||||
return main
|
||||
|
||||
|
||||
class RecursionContext:
|
||||
strip_linebreaks: bool
|
||||
ul_depth: int
|
||||
|
||||
def __init__(self, strip_linebreaks: bool = True, ul_depth: int = 0):
|
||||
self.strip_linebreaks = strip_linebreaks
|
||||
self.ul_depth = ul_depth
|
||||
self._inited = True
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if getattr(self, "_inited", False) is True:
|
||||
raise TypeError("'RecursionContext' object is immutable")
|
||||
super(RecursionContext, self).__setattr__(key, value)
|
||||
|
||||
def enter_list(self) -> 'RecursionContext':
|
||||
return RecursionContext(strip_linebreaks=self.strip_linebreaks, ul_depth=self.ul_depth + 1)
|
||||
|
||||
def enter_code_block(self) -> 'RecursionContext':
|
||||
return RecursionContext(strip_linebreaks=True, ul_depth=self.ul_depth)
|
||||
|
||||
|
||||
class MatrixParser(MatrixParserCommon):
|
||||
@classmethod
|
||||
def list_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
def list_to_tmessage(cls, node: html.HtmlElement, ctx: RecursionContext) -> TelegramMessage:
|
||||
ordered = node.tag == "ol"
|
||||
tagged_children = cls.node_to_tagged_tmessages(node, strip_linebreaks)
|
||||
tagged_children = cls.node_to_tagged_tmessages(node, ctx)
|
||||
counter = 1
|
||||
indent_length = 0
|
||||
if ordered:
|
||||
@@ -196,7 +217,7 @@ class MatrixParser(MatrixParserCommon):
|
||||
prefix = f"{counter}. "
|
||||
counter += 1
|
||||
else:
|
||||
prefix = "● "
|
||||
prefix = f"{cls.list_bullet(ctx.ul_depth)} "
|
||||
child = child.prepend(prefix)
|
||||
parts = child.split("\n")
|
||||
parts = parts[:1] + [part.prepend(indent) for part in parts[1:]]
|
||||
@@ -205,22 +226,24 @@ class MatrixParser(MatrixParserCommon):
|
||||
return TelegramMessage.join(children, "\n")
|
||||
|
||||
@classmethod
|
||||
def blockquote_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
msg = cls.tag_aware_parse_node(node, strip_linebreaks)
|
||||
def blockquote_to_tmessage(cls, node: html.HtmlElement, ctx: RecursionContext
|
||||
) -> TelegramMessage:
|
||||
msg = cls.tag_aware_parse_node(node, ctx)
|
||||
children = msg.trim().split("\n")
|
||||
children = [child.prepend("> ") for child in children]
|
||||
return TelegramMessage.join(children, "\n")
|
||||
|
||||
@classmethod
|
||||
def header_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
children = cls.node_to_tmessages(node, strip_linebreaks)
|
||||
def header_to_tmessage(cls, node: html.HtmlElement, ctx: RecursionContext) -> TelegramMessage:
|
||||
children = cls.node_to_tmessages(node, ctx)
|
||||
length = int(node.tag[1])
|
||||
prefix = "#" * length + " "
|
||||
return TelegramMessage.join(children, "").prepend(prefix)
|
||||
|
||||
@classmethod
|
||||
def basic_format_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
msg = cls.tag_aware_parse_node(node, strip_linebreaks)
|
||||
def basic_format_to_tmessage(cls, node: html.HtmlElement, ctx: RecursionContext
|
||||
) -> TelegramMessage:
|
||||
msg = cls.tag_aware_parse_node(node, ctx)
|
||||
if node.tag in ("b", "strong"):
|
||||
msg.format(Bold)
|
||||
elif node.tag in ("i", "em"):
|
||||
@@ -238,8 +261,8 @@ class MatrixParser(MatrixParserCommon):
|
||||
return msg
|
||||
|
||||
@classmethod
|
||||
def link_to_tstring(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
msg = cls.tag_aware_parse_node(node, strip_linebreaks)
|
||||
def link_to_tstring(cls, node: html.HtmlElement, ctx: RecursionContext) -> TelegramMessage:
|
||||
msg = cls.tag_aware_parse_node(node, ctx)
|
||||
href = node.attrib.get("href", "")
|
||||
if not href:
|
||||
return msg
|
||||
@@ -273,21 +296,23 @@ class MatrixParser(MatrixParserCommon):
|
||||
else msg.format(TextURL, url=href))
|
||||
|
||||
@classmethod
|
||||
def node_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
def node_to_tmessage(cls, node: html.HtmlElement, ctx: RecursionContext) -> TelegramMessage:
|
||||
if node.tag == "blockquote":
|
||||
return cls.blockquote_to_tmessage(node, strip_linebreaks)
|
||||
elif node.tag in ("ol", "ul"):
|
||||
return cls.list_to_tmessage(node, strip_linebreaks)
|
||||
return cls.blockquote_to_tmessage(node, ctx)
|
||||
elif node.tag == "ol":
|
||||
return cls.list_to_tmessage(node, ctx)
|
||||
elif node.tag == "ul":
|
||||
return cls.list_to_tmessage(node, ctx.enter_list())
|
||||
elif node.tag in ("h1", "h2", "h3", "h4", "h5", "h6"):
|
||||
return cls.header_to_tmessage(node, strip_linebreaks)
|
||||
return cls.header_to_tmessage(node, ctx)
|
||||
elif node.tag == "br":
|
||||
return TelegramMessage("\n")
|
||||
elif node.tag in ("b", "strong", "i", "em", "s", "del", "u", "ins", "command"):
|
||||
return cls.basic_format_to_tmessage(node, strip_linebreaks)
|
||||
return cls.basic_format_to_tmessage(node, ctx)
|
||||
elif node.tag == "a":
|
||||
return cls.link_to_tstring(node, strip_linebreaks)
|
||||
return cls.link_to_tstring(node, ctx)
|
||||
elif node.tag == "p":
|
||||
return cls.tag_aware_parse_node(node, strip_linebreaks).append("\n")
|
||||
return cls.tag_aware_parse_node(node, ctx).append("\n")
|
||||
elif node.tag == "pre":
|
||||
lang = ""
|
||||
try:
|
||||
@@ -296,37 +321,39 @@ class MatrixParser(MatrixParserCommon):
|
||||
node = node[0]
|
||||
except (IndexError, KeyError):
|
||||
pass
|
||||
return cls.parse_node(node, strip_linebreaks=False).format(Pre, language=lang)
|
||||
return cls.parse_node(node, ctx.enter_code_block()).format(Pre, language=lang)
|
||||
elif node.tag == "code":
|
||||
return cls.parse_node(node, strip_linebreaks=False).format(Code)
|
||||
return cls.tag_aware_parse_node(node, strip_linebreaks)
|
||||
return cls.parse_node(node, ctx.enter_code_block()).format(Code)
|
||||
return cls.tag_aware_parse_node(node, ctx)
|
||||
|
||||
@staticmethod
|
||||
def text_to_tmessage(text: str, strip_linebreaks: bool = True) -> TelegramMessage:
|
||||
if strip_linebreaks:
|
||||
def text_to_tmessage(text: str, ctx: RecursionContext) -> TelegramMessage:
|
||||
if ctx.strip_linebreaks:
|
||||
text = text.replace("\n", "")
|
||||
return TelegramMessage(text)
|
||||
|
||||
@classmethod
|
||||
def node_to_tagged_tmessages(cls, node: html.HtmlElement, strip_linebreaks: bool = True
|
||||
def node_to_tagged_tmessages(cls, node: html.HtmlElement, ctx: RecursionContext
|
||||
) -> List[Tuple[TelegramMessage, str]]:
|
||||
output = []
|
||||
|
||||
if node.text:
|
||||
output.append((cls.text_to_tmessage(node.text, strip_linebreaks), "text"))
|
||||
output.append((cls.text_to_tmessage(node.text, ctx), "text"))
|
||||
for child in node:
|
||||
output.append((cls.node_to_tmessage(child, strip_linebreaks), child.tag))
|
||||
output.append((cls.node_to_tmessage(child, ctx), child.tag))
|
||||
if child.tail:
|
||||
output.append((cls.text_to_tmessage(child.tail, strip_linebreaks), "text"))
|
||||
output.append((cls.text_to_tmessage(child.tail, ctx), "text"))
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
def node_to_tmessages(cls, node: html.HtmlElement, strip_linebreaks) -> List[TelegramMessage]:
|
||||
return [msg for (msg, tag) in cls.node_to_tagged_tmessages(node, strip_linebreaks)]
|
||||
def node_to_tmessages(cls, node: html.HtmlElement, ctx: RecursionContext
|
||||
) -> List[TelegramMessage]:
|
||||
return [msg for (msg, tag) in cls.node_to_tagged_tmessages(node, ctx)]
|
||||
|
||||
@classmethod
|
||||
def tag_aware_parse_node(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
msgs = cls.node_to_tagged_tmessages(node, strip_linebreaks)
|
||||
def tag_aware_parse_node(cls, node: html.HtmlElement, ctx: RecursionContext
|
||||
) -> TelegramMessage:
|
||||
msgs = cls.node_to_tagged_tmessages(node, ctx)
|
||||
output = TelegramMessage()
|
||||
for msg, tag in msgs:
|
||||
if tag in cls.block_tags:
|
||||
@@ -335,11 +362,11 @@ class MatrixParser(MatrixParserCommon):
|
||||
return output.trim()
|
||||
|
||||
@classmethod
|
||||
def parse_node(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
|
||||
return TelegramMessage.join(cls.node_to_tmessages(node, strip_linebreaks))
|
||||
def parse_node(cls, node: html.HtmlElement, ctx: RecursionContext) -> TelegramMessage:
|
||||
return TelegramMessage.join(cls.node_to_tmessages(node, ctx))
|
||||
|
||||
@classmethod
|
||||
def parse(cls, data: str) -> ParsedMessage:
|
||||
document = html.fromstring(f"<html>{data}</html>")
|
||||
msg = cls.parse_node(document, strip_linebreaks=True)
|
||||
msg = cls.parse_node(document, RecursionContext())
|
||||
return msg.text, msg.entities
|
||||
|
||||
Reference in New Issue
Block a user