398 lines
13 KiB
Python
398 lines
13 KiB
Python
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
|
# Copyright (C) 2022 Tulir Asokan
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from typing import Any, Literal, TypedDict
|
|
from pathlib import Path
|
|
import argparse
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import logging
|
|
import math
|
|
import mimetypes
|
|
import pickle
|
|
import random
|
|
import string
|
|
|
|
from lottie.exporters import export_tgs
|
|
from lottie.exporters.cairo import export_png
|
|
from lottie.exporters.tgs_validator import Severity, TgsValidator
|
|
from lottie.importers.svg import import_svg
|
|
from lottie.objects import Animation
|
|
from lottie.utils.stripper import float_strip
|
|
from PIL import Image
|
|
from telethon import TelegramClient
|
|
from telethon.custom import Conversation, Message
|
|
from telethon.tl.functions.messages import GetStickerSetRequest
|
|
from telethon.tl.types import (
|
|
Document,
|
|
DocumentAttributeCustomEmoji,
|
|
DocumentAttributeFilename,
|
|
DocumentAttributeImageSize,
|
|
InputMediaUploadedDocument,
|
|
InputStickerSetShortName,
|
|
)
|
|
import aiohttp
|
|
|
|
mimetypes.add_type("image/webp", ".webp")
|
|
|
|
parser = argparse.ArgumentParser(description="mautrix-telegram unicode emoji packer")
|
|
parser.add_argument(
|
|
"-i", "--api-id", type=int, required=True, metavar="<api id>", help="Telegram API ID"
|
|
)
|
|
parser.add_argument(
|
|
"-a", "--api-hash", type=str, required=True, metavar="<api hash>", help="Telegram API hash"
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--session",
|
|
type=str,
|
|
default="unicodemojipacker.session",
|
|
metavar="<file name>",
|
|
help="Telethon session name",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
type=str,
|
|
default="mautrix_telegram/unicodemojipack.json",
|
|
metavar="<file name>",
|
|
help="Path to save created emoji pack document IDs",
|
|
)
|
|
parser.add_argument(
|
|
"-f",
|
|
"--font-directory",
|
|
type=Path,
|
|
required=True,
|
|
metavar="<directory path>",
|
|
help="Path to the Noto color emoji files",
|
|
)
|
|
parser.add_argument(
|
|
"-m",
|
|
"--media-directory",
|
|
type=Path,
|
|
required=True,
|
|
metavar="<directory path>",
|
|
help="Path to save converted tgs and webp emoji files",
|
|
)
|
|
args = parser.parse_args()
|
|
font_dir: Path = args.font_directory
|
|
media_dir: Path = args.media_directory
|
|
|
|
EMOJI_DATA_URL = "https://raw.githubusercontent.com/iamcal/emoji-data/master/emoji.json"
|
|
|
|
|
|
def unified_to_unicode(unified: str) -> str:
|
|
return (
|
|
"".join(rf"\U{chunk:0>8}" for chunk in unified.split("-"))
|
|
.encode("ascii")
|
|
.decode("unicode_escape")
|
|
)
|
|
|
|
|
|
def tag_to_str(unified: str) -> str:
|
|
return "".join(chr(int(x.removeprefix("E00"), 16)) for x in unified.split("-"))
|
|
|
|
|
|
EmojiType = Literal["webp", "tgs"]
|
|
PackType = Literal["Animated emoji", "Static emoji"]
|
|
|
|
|
|
class Emoji(TypedDict):
|
|
hex: str
|
|
emoji: str
|
|
type: EmojiType
|
|
filename: str
|
|
|
|
|
|
class EmojiData(TypedDict):
|
|
tgs: list[Emoji]
|
|
webp: list[Emoji]
|
|
|
|
|
|
def parse_emoji_data(tone: dict[str, Any], emoji: dict[str, Any]) -> Emoji:
|
|
hex = (tone["non_qualified"] or tone["unified"]).replace("-FE0F", "")
|
|
filename_hex = hex.replace("-", "_").lower()
|
|
filename = f"svg/emoji_u{filename_hex}.svg"
|
|
if emoji["category"] == "Flags" and emoji["subcategory"] in (
|
|
"country-flag",
|
|
"subdivision-flag",
|
|
):
|
|
filename = f"third_party/region-flags/waved-svg/emoji_u{filename_hex}.svg"
|
|
|
|
with (font_dir / filename).open() as f:
|
|
lot: Animation = import_svg(f)
|
|
float_strip(lot)
|
|
lot.tgs_sanitize()
|
|
|
|
output = io.BytesIO()
|
|
export_tgs(lot, output)
|
|
|
|
validator = TgsValidator()
|
|
validator(lot)
|
|
validator.check_size(len(output.getvalue()))
|
|
errors = [err for err in validator.errors if err.severity != Severity.Note]
|
|
if errors or ("region-flags" in filename and len(output.getvalue()) > 32768):
|
|
lot.scale(100, 100)
|
|
|
|
png_out = io.BytesIO()
|
|
export_png(lot, png_out)
|
|
img = Image.open(png_out)
|
|
output = io.BytesIO()
|
|
output.name = "image.webp"
|
|
img.save(output, "webp")
|
|
|
|
media_type: EmojiType = "webp"
|
|
else:
|
|
media_type: EmojiType = "tgs"
|
|
path = media_dir / f"{filename_hex}.{media_type}"
|
|
with path.open("wb") as f:
|
|
f.write(output.getvalue())
|
|
print(
|
|
"Converted", filename, "->", path.name, "//" if errors else "", "\n".join(map(str, errors))
|
|
)
|
|
|
|
return {
|
|
"hex": hex,
|
|
"emoji": unified_to_unicode(tone["unified"]),
|
|
"type": media_type,
|
|
"filename": path.name,
|
|
}
|
|
|
|
|
|
async def load_emoji_data() -> EmojiData:
|
|
cache_path = media_dir / "conversion-cache.json"
|
|
try:
|
|
with cache_path.open() as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
pass
|
|
async with aiohttp.ClientSession() as sess, sess.get(EMOJI_DATA_URL) as resp:
|
|
raw_emoji_data = sorted(
|
|
await resp.json(content_type=None),
|
|
key=lambda dat: dat["sort_order"],
|
|
)
|
|
tgs_emoji = []
|
|
webp_emoji = []
|
|
for emoji in raw_emoji_data:
|
|
for tone in (emoji, *emoji.get("skin_variations", {}).values()):
|
|
parsed_emoji = parse_emoji_data(tone, emoji)
|
|
if parsed_emoji["type"] == "tgs":
|
|
tgs_emoji.append(parsed_emoji)
|
|
else:
|
|
webp_emoji.append(parsed_emoji)
|
|
full_data = {"tgs": tgs_emoji, "webp": webp_emoji}
|
|
with cache_path.open("w") as f:
|
|
json.dump(full_data, f, ensure_ascii=False)
|
|
return full_data
|
|
|
|
|
|
async def create_pack(conv: Conversation, name: str, pack_type: str) -> None:
|
|
await conv.send_message("/newemojipack")
|
|
resp: Message = await conv.get_response()
|
|
assert "A new set of custom emoji" in resp.raw_text
|
|
assert "Please choose the type" in resp.raw_text
|
|
await conv.send_message(pack_type)
|
|
resp = await conv.get_response()
|
|
if pack_type == "Animated emoji":
|
|
assert "When ready to upload, tell me the name of your set." in resp.raw_text
|
|
else:
|
|
assert "Now choose a name for your set." in resp.raw_text
|
|
await conv.send_message(name)
|
|
resp = await conv.get_response()
|
|
if pack_type == "Animated emoji":
|
|
assert "Now send me the first animated emoji" in resp.raw_text
|
|
else:
|
|
assert "Now send me the custom emoji" in resp.raw_text
|
|
|
|
|
|
async def publish_pack(conv: Conversation, shortname: str) -> None:
|
|
await conv.send_message("/publish")
|
|
|
|
resp: Message = await conv.get_response()
|
|
assert "You can send me a custom emoji from your emoji set" in resp.raw_text
|
|
await conv.send_message("/skip")
|
|
|
|
resp = await conv.get_response()
|
|
assert "Please provide a short name for your emoji set" in resp.raw_text
|
|
await conv.send_message(shortname)
|
|
|
|
resp = await conv.get_response()
|
|
assert "I've just published your emoji set" in resp.raw_text
|
|
|
|
|
|
async def send_emoji(
|
|
conv: Conversation, file: bytes | Path | InputMediaUploadedDocument, emoji: str
|
|
) -> None:
|
|
await conv.send_file(file)
|
|
resp: Message = await conv.get_response()
|
|
assert "Send me a replacement emoji that corresponds to your custom emoji" in resp.raw_text
|
|
await conv.send_message(emoji)
|
|
resp = await conv.get_response()
|
|
if "Sorry, too many attempts" in resp.raw_text:
|
|
print(resp.raw_text)
|
|
input("Press enter to continue")
|
|
await conv.send_message(emoji)
|
|
resp = await conv.get_response()
|
|
while "Please send an emoji that best describes your custom emoji." in resp.raw_text:
|
|
emoji = input(f"{emoji} was rejected, provide replacement: ")
|
|
await conv.send_message(emoji)
|
|
resp = await conv.get_response()
|
|
assert "Congratulations" in resp.raw_text
|
|
|
|
|
|
class CachedPack(TypedDict):
|
|
name: str
|
|
short_name: str
|
|
part: int
|
|
type: PackType
|
|
published: bool
|
|
collected: bool
|
|
emojis: list[Emoji]
|
|
|
|
|
|
class CachedData(TypedDict):
|
|
packs: list[CachedPack]
|
|
|
|
|
|
def _split_packs_int(
|
|
emoji_list: list[Emoji], pack_type: PackType, current_part: int, total_parts: int
|
|
) -> tuple[list[CachedPack], int]:
|
|
packs = []
|
|
current_pack: CachedPack | None = None
|
|
for i, emoji in enumerate(emoji_list):
|
|
if i % 200 == 0:
|
|
current_part += 1
|
|
random_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
|
|
short_name = f"mxtg_unicodemoji_{random_id}"
|
|
name = f"mautrix-telegram unicodemoji ({current_part}/{total_parts})"
|
|
current_pack = {
|
|
"type": pack_type,
|
|
"short_name": short_name,
|
|
"part": current_part,
|
|
"name": name,
|
|
"published": False,
|
|
"collected": False,
|
|
"emojis": [],
|
|
}
|
|
packs.append(current_pack)
|
|
current_pack["emojis"].append(emoji)
|
|
return packs, current_part
|
|
|
|
|
|
def split_packs(emoji_data: EmojiData) -> list[CachedPack]:
|
|
total_parts = math.ceil(len(emoji_data["tgs"]) / 200) + math.ceil(
|
|
len(emoji_data["webp"]) / 200
|
|
)
|
|
current_part = 0
|
|
animated_packs, current_part = _split_packs_int(
|
|
emoji_data["tgs"], "Animated emoji", current_part, total_parts
|
|
)
|
|
static_packs, current_part = _split_packs_int(
|
|
emoji_data["webp"], "Static emoji", current_part, total_parts
|
|
)
|
|
return animated_packs + static_packs
|
|
|
|
|
|
async def create_and_fill_pack(
|
|
client: TelegramClient, conv: Conversation, pack: CachedPack
|
|
) -> None:
|
|
if pack["short_name"] == "mxtg_unicodemoji_xvzs6743":
|
|
print("Continuing pack", pack["name"])
|
|
else:
|
|
print("Creating pack", pack["name"])
|
|
await create_pack(conv, pack["name"], pack["type"])
|
|
total = len(pack["emojis"])
|
|
for i, emoji in enumerate(pack["emojis"]):
|
|
if pack["short_name"] == "mxtg_unicodemoji_xvzs6743" and i < 87:
|
|
continue
|
|
print(f"Adding emoji {i+1}/{total}", emoji["hex"], emoji["emoji"])
|
|
emoji_file = media_dir / emoji["filename"]
|
|
if emoji["type"] == "webp":
|
|
attrs = [
|
|
DocumentAttributeImageSize(w=100, h=100),
|
|
DocumentAttributeFilename(file_name="image.webp"),
|
|
]
|
|
with emoji_file.open("rb") as f:
|
|
file_handle = await client.upload_file(f, file_name="emoji.webp")
|
|
emoji_file = InputMediaUploadedDocument(
|
|
file_handle, mime_type="image/webp", attributes=attrs
|
|
)
|
|
await send_emoji(conv, emoji_file, emoji["emoji"])
|
|
await asyncio.sleep(2)
|
|
print("Publishing pack", pack["short_name"])
|
|
await publish_pack(conv, pack["short_name"])
|
|
|
|
|
|
async def main():
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
emoji_data = await load_emoji_data()
|
|
|
|
split_cache = media_dir / "split-cache.json"
|
|
try:
|
|
with split_cache.open() as f:
|
|
packs: list[CachedPack] = json.load(f)
|
|
except FileNotFoundError:
|
|
packs = split_packs(emoji_data)
|
|
with split_cache.open("w") as f:
|
|
json.dump(packs, f)
|
|
|
|
doc_id_file = Path(args.output)
|
|
try:
|
|
with doc_id_file.open() as f:
|
|
doc_ids = json.load(f)
|
|
except FileNotFoundError:
|
|
doc_ids = {}
|
|
|
|
client = TelegramClient(args.session, args.api_id, args.api_hash, flood_sleep_threshold=3600)
|
|
await client.start()
|
|
async with client.conversation("Stickers", max_messages=20000) as conv:
|
|
for pack in packs:
|
|
if not pack["published"]:
|
|
await create_and_fill_pack(client, conv, pack)
|
|
pack["published"] = True
|
|
with split_cache.open("w") as f:
|
|
json.dump(packs, f, ensure_ascii=False)
|
|
if not pack["collected"] or True:
|
|
print("Collecting document IDs from pack", pack["short_name"])
|
|
stickers = await client(
|
|
GetStickerSetRequest(InputStickerSetShortName(pack["short_name"]), 0)
|
|
)
|
|
doc: Document
|
|
for i, doc in enumerate(stickers.documents):
|
|
attr = next(
|
|
attr
|
|
for attr in doc.attributes
|
|
if isinstance(attr, DocumentAttributeCustomEmoji)
|
|
)
|
|
base_emoji = attr.alt.replace("\ufe0f", "")
|
|
emoji = pack["emojis"][i]["emoji"].replace("\ufe0f", "")
|
|
doc_ids[emoji] = doc.id
|
|
print(f"Mapped {emoji} (fallback: {base_emoji}) -> {doc_ids[emoji]}")
|
|
pack["collected"] = True
|
|
with split_cache.open("w") as f:
|
|
json.dump(packs, f, ensure_ascii=False)
|
|
with doc_id_file.open("w") as f:
|
|
json.dump(doc_ids, f, ensure_ascii=False)
|
|
print("Pack completed")
|
|
await asyncio.sleep(5)
|
|
with open(args.output.replace(".json", ".pickle"), "wb") as f:
|
|
pickle.dump(doc_ids, f)
|
|
print("Wrote pickle")
|
|
|
|
|
|
asyncio.run(main())
|