455 lines
18 KiB
Python
455 lines
18 KiB
Python
__package__ = 'archivebox.misc'
|
|
|
|
import re
|
|
import requests
|
|
import json as pyjson
|
|
import http.cookiejar
|
|
|
|
from typing import List, Optional, Any, Callable
|
|
from pathlib import Path
|
|
from inspect import signature
|
|
from functools import wraps
|
|
from hashlib import sha256
|
|
from urllib.parse import urlparse, quote, unquote
|
|
from html import escape, unescape
|
|
from datetime import datetime, timezone
|
|
from dateparser import parse as dateparser
|
|
from requests.exceptions import RequestException, ReadTimeout
|
|
|
|
from base32_crockford import encode as base32_encode # type: ignore
|
|
from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
|
|
try:
|
|
import chardet # type:ignore
|
|
detect_encoding = lambda rawdata: chardet.detect(rawdata)["encoding"]
|
|
except ImportError:
|
|
detect_encoding = lambda rawdata: "utf-8"
|
|
|
|
|
|
from archivebox.config.constants import CONSTANTS
|
|
|
|
from .logging import COLOR_DICT
|
|
|
|
|
|
### Parsing Helpers
|
|
|
|
# All of these are (str) -> str
|
|
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
|
|
scheme = lambda url: urlparse(url).scheme.lower()
|
|
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
|
|
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
|
|
without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
|
|
without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
|
|
path = lambda url: urlparse(url).path
|
|
basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
|
|
domain = lambda url: urlparse(url).netloc
|
|
query = lambda url: urlparse(url).query
|
|
fragment = lambda url: urlparse(url).fragment
|
|
extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
|
|
base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
|
|
|
|
without_www = lambda url: url.replace('://www.', '://', 1)
|
|
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
|
|
hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
|
|
|
|
urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
|
|
urldecode = lambda s: s and unquote(s)
|
|
htmlencode = lambda s: s and escape(s, quote=True)
|
|
htmldecode = lambda s: s and unescape(s)
|
|
|
|
short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
|
|
ts_to_date_str = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
|
|
ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
|
|
|
|
COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
|
|
|
|
|
|
# https://mathiasbynens.be/demo/url-regex
|
|
URL_REGEX = re.compile(
|
|
r'(?=('
|
|
r'http[s]?://' # start matching from allowed schemes
|
|
r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
|
|
r'|[-_$@.&+!*\(\),]' # or allowed symbols (keep hyphen first to match literal hyphen)
|
|
r'|[^\u0000-\u007F])+' # or allowed unicode bytes
|
|
r'[^\]\[<>"\'\s]+' # stop parsing at these symbols
|
|
r'))',
|
|
re.IGNORECASE | re.UNICODE,
|
|
)
|
|
|
|
def parens_are_matched(string: str, open_char='(', close_char=')'):
|
|
"""check that all parentheses in a string are balanced and nested properly"""
|
|
count = 0
|
|
for c in string:
|
|
if c == open_char:
|
|
count += 1
|
|
elif c == close_char:
|
|
count -= 1
|
|
if count < 0:
|
|
return False
|
|
return count == 0
|
|
|
|
def fix_url_from_markdown(url_str: str) -> str:
|
|
"""
|
|
cleanup a regex-parsed url that may contain dangling trailing parens from markdown link syntax
|
|
helpful to fix URLs parsed from markdown e.g.
|
|
input: https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def).somemoretext
|
|
result: https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def
|
|
|
|
IMPORTANT ASSUMPTION: valid urls wont have unbalanced or incorrectly nested parentheses
|
|
e.g. this will fail the user actually wants to ingest a url like 'https://example.com/some_wei)(rd_url'
|
|
in that case it will return https://example.com/some_wei (truncated up to the first unbalanced paren)
|
|
This assumption is true 99.9999% of the time, and for the rare edge case the user can use url_list parser.
|
|
"""
|
|
trimmed_url = url_str
|
|
|
|
# cut off one trailing character at a time
|
|
# until parens are balanced e.g. /a(b)c).x(y)z -> /a(b)c
|
|
while not parens_are_matched(trimmed_url):
|
|
trimmed_url = trimmed_url[:-1]
|
|
|
|
# make sure trimmed url is still valid
|
|
if re.findall(URL_REGEX, trimmed_url):
|
|
return trimmed_url
|
|
|
|
return url_str
|
|
|
|
def find_all_urls(urls_str: str):
|
|
for url in re.findall(URL_REGEX, urls_str):
|
|
yield fix_url_from_markdown(url)
|
|
|
|
|
|
def is_static_file(url: str):
|
|
# TODO: the proper way is with MIME type detection + ext, not only extension
|
|
return extension(url).lower() in CONSTANTS.STATICFILE_EXTENSIONS
|
|
|
|
|
|
def enforce_types(func):
|
|
"""
|
|
Enforce function arg and kwarg types at runtime using its python3 type hints
|
|
Simpler version of pydantic @validate_call decorator
|
|
"""
|
|
# TODO: check return type as well
|
|
|
|
@wraps(func)
|
|
def typechecked_function(*args, **kwargs):
|
|
sig = signature(func)
|
|
|
|
def check_argument_type(arg_key, arg_val):
|
|
try:
|
|
annotation = sig.parameters[arg_key].annotation
|
|
except KeyError:
|
|
annotation = None
|
|
|
|
if annotation is not None and annotation.__class__ is type:
|
|
if not isinstance(arg_val, annotation):
|
|
raise TypeError(
|
|
'{}(..., {}: {}) got unexpected {} argument {}={}'.format(
|
|
func.__name__,
|
|
arg_key,
|
|
annotation.__name__,
|
|
type(arg_val).__name__,
|
|
arg_key,
|
|
str(arg_val)[:64],
|
|
)
|
|
)
|
|
|
|
# check args
|
|
for arg_val, arg_key in zip(args, sig.parameters):
|
|
check_argument_type(arg_key, arg_val)
|
|
|
|
# check kwargs
|
|
for arg_key, arg_val in kwargs.items():
|
|
check_argument_type(arg_key, arg_val)
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
return typechecked_function
|
|
|
|
|
|
def docstring(text: Optional[str]):
|
|
"""attach the given docstring to the decorated function"""
|
|
def decorator(func):
|
|
if text:
|
|
func.__doc__ = text
|
|
return func
|
|
return decorator
|
|
|
|
|
|
@enforce_types
|
|
def str_between(string: str, start: str, end: str=None) -> str:
|
|
"""(<abc>12345</def>, <abc>, </def>) -> 12345"""
|
|
|
|
content = string.split(start, 1)[-1]
|
|
if end is not None:
|
|
content = content.rsplit(end, 1)[0]
|
|
|
|
return content
|
|
|
|
|
|
@enforce_types
|
|
def parse_date(date: Any) -> datetime:
|
|
"""Parse unix timestamps, iso format, and human-readable strings"""
|
|
|
|
if date is None:
|
|
return None # type: ignore
|
|
|
|
if isinstance(date, datetime):
|
|
if date.tzinfo is None:
|
|
return date.replace(tzinfo=timezone.utc)
|
|
|
|
assert date.tzinfo.utcoffset(datetime.now()).seconds == 0, 'Refusing to load a non-UTC date!'
|
|
return date
|
|
|
|
if isinstance(date, (float, int)):
|
|
date = str(date)
|
|
|
|
if isinstance(date, str):
|
|
return dateparser(date, settings={'TIMEZONE': 'UTC'}).astimezone(timezone.utc)
|
|
|
|
raise ValueError('Tried to parse invalid date! {}'.format(date))
|
|
|
|
|
|
@enforce_types
|
|
def download_url(url: str, timeout: int=None) -> str:
|
|
"""Download the contents of a remote url and return the text"""
|
|
|
|
from archivebox.config.common import ARCHIVING_CONFIG
|
|
|
|
timeout = timeout or ARCHIVING_CONFIG.TIMEOUT
|
|
session = requests.Session()
|
|
|
|
if ARCHIVING_CONFIG.COOKIES_FILE and Path(ARCHIVING_CONFIG.COOKIES_FILE).is_file():
|
|
cookie_jar = http.cookiejar.MozillaCookieJar(ARCHIVING_CONFIG.COOKIES_FILE)
|
|
cookie_jar.load(ignore_discard=True, ignore_expires=True)
|
|
for cookie in cookie_jar:
|
|
session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path)
|
|
|
|
response = session.get(
|
|
url,
|
|
headers={'User-Agent': ARCHIVING_CONFIG.USER_AGENT},
|
|
verify=ARCHIVING_CONFIG.CHECK_SSL_VALIDITY,
|
|
timeout=timeout,
|
|
)
|
|
|
|
content_type = response.headers.get('Content-Type', '')
|
|
encoding = http_content_type_encoding(content_type) or html_body_declared_encoding(response.text)
|
|
|
|
if encoding is not None:
|
|
response.encoding = encoding
|
|
|
|
try:
|
|
return response.text
|
|
except UnicodeDecodeError:
|
|
# if response is non-test (e.g. image or other binary files), just return the filename instead
|
|
return url.rsplit('/', 1)[-1]
|
|
|
|
@enforce_types
|
|
def get_headers(url: str, timeout: int | None=None) -> str:
|
|
"""Download the contents of a remote url and return the headers"""
|
|
# TODO: get rid of this and use an abx pluggy hook instead
|
|
|
|
from archivebox.config.common import ARCHIVING_CONFIG
|
|
|
|
timeout = timeout or ARCHIVING_CONFIG.TIMEOUT
|
|
|
|
try:
|
|
response = requests.head(
|
|
url,
|
|
headers={'User-Agent': ARCHIVING_CONFIG.USER_AGENT},
|
|
verify=ARCHIVING_CONFIG.CHECK_SSL_VALIDITY,
|
|
timeout=timeout,
|
|
allow_redirects=True,
|
|
)
|
|
if response.status_code >= 400:
|
|
raise RequestException
|
|
except ReadTimeout:
|
|
raise
|
|
except RequestException:
|
|
response = requests.get(
|
|
url,
|
|
headers={'User-Agent': ARCHIVING_CONFIG.USER_AGENT},
|
|
verify=ARCHIVING_CONFIG.CHECK_SSL_VALIDITY,
|
|
timeout=timeout,
|
|
stream=True
|
|
)
|
|
|
|
return pyjson.dumps(
|
|
{
|
|
'URL': url,
|
|
'Status-Code': response.status_code,
|
|
'Elapsed': response.elapsed.total_seconds()*1000,
|
|
'Encoding': str(response.encoding),
|
|
'Apparent-Encoding': response.apparent_encoding,
|
|
**dict(response.headers),
|
|
},
|
|
indent=4,
|
|
)
|
|
|
|
|
|
@enforce_types
|
|
def ansi_to_html(text: str) -> str:
|
|
"""
|
|
Based on: https://stackoverflow.com/questions/19212665/python-converting-ansi-color-codes-to-html
|
|
Simple way to render colored CLI stdout/stderr in HTML properly, Textual/rich is probably better though.
|
|
"""
|
|
|
|
TEMPLATE = '<span style="color: rgb{}"><br>'
|
|
text = text.replace('[m', '</span>')
|
|
|
|
def single_sub(match):
|
|
argsdict = match.groupdict()
|
|
if argsdict['arg_3'] is None:
|
|
if argsdict['arg_2'] is None:
|
|
_, color = 0, argsdict['arg_1']
|
|
else:
|
|
_, color = argsdict['arg_1'], argsdict['arg_2']
|
|
else:
|
|
_, color = argsdict['arg_3'], argsdict['arg_2']
|
|
|
|
return TEMPLATE.format(COLOR_DICT[color][0])
|
|
|
|
return COLOR_REGEX.sub(single_sub, text)
|
|
|
|
|
|
@enforce_types
|
|
def dedupe(options: List[str]) -> List[str]:
|
|
"""
|
|
Deduplicates the given CLI args by key=value. Options that come later override earlier.
|
|
"""
|
|
deduped = {}
|
|
|
|
for option in options:
|
|
key = option.split('=')[0]
|
|
deduped[key] = option
|
|
|
|
return list(deduped.values())
|
|
|
|
|
|
|
|
class ExtendedEncoder(pyjson.JSONEncoder):
|
|
"""
|
|
Extended json serializer that supports serializing several model
|
|
fields and objects
|
|
"""
|
|
|
|
def default(self, obj):
|
|
cls_name = obj.__class__.__name__
|
|
|
|
if hasattr(obj, '_asdict'):
|
|
return obj._asdict()
|
|
|
|
elif isinstance(obj, bytes):
|
|
return obj.decode()
|
|
|
|
elif isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
|
|
elif isinstance(obj, Exception):
|
|
return '{}: {}'.format(obj.__class__.__name__, obj)
|
|
|
|
elif isinstance(obj, Path):
|
|
return str(obj)
|
|
|
|
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
|
|
return tuple(obj)
|
|
|
|
elif isinstance(obj, Callable):
|
|
return str(obj)
|
|
|
|
return pyjson.JSONEncoder.default(self, obj)
|
|
|
|
|
|
### URL PARSING TESTS / ASSERTIONS
|
|
|
|
# Check that plain text regex URL parsing works as expected
|
|
# this is last-line-of-defense to make sure the URL_REGEX isn't
|
|
# misbehaving due to some OS-level or environment level quirks (e.g. regex engine / cpython / locale differences)
|
|
# the consequences of bad URL parsing could be disastrous and lead to many
|
|
# incorrect/badly parsed links being added to the archive, so this is worth the cost of checking
|
|
|
|
assert fix_url_from_markdown('http://example.com/a(b)c).x(y)z') == 'http://example.com/a(b)c'
|
|
assert fix_url_from_markdown('https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def).link(with)_trailingtext') == 'https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def'
|
|
|
|
URL_REGEX_TESTS = [
|
|
('https://example.com', ['https://example.com']),
|
|
('http://abc-file234example.com/abc?def=abc&23423=sdfsdf#abc=234&234=a234', ['http://abc-file234example.com/abc?def=abc&23423=sdfsdf#abc=234&234=a234']),
|
|
|
|
('https://twitter.com/share?url=https://akaao.success-corp.co.jp&text=ア@サ!ト&hashtags=ア%オ,元+ア.ア-オ_イ*シ$ロ abc', ['https://twitter.com/share?url=https://akaao.success-corp.co.jp&text=ア@サ!ト&hashtags=ア%オ,元+ア.ア-オ_イ*シ$ロ', 'https://akaao.success-corp.co.jp&text=ア@サ!ト&hashtags=ア%オ,元+ア.ア-オ_イ*シ$ロ']),
|
|
('<a href="https://twitter.com/share#url=https://akaao.success-corp.co.jp&text=ア@サ!ト?hashtags=ア%オ,元+ア&abc=.ア-オ_イ*シ$ロ"> abc', ['https://twitter.com/share#url=https://akaao.success-corp.co.jp&text=ア@サ!ト?hashtags=ア%オ,元+ア&abc=.ア-オ_イ*シ$ロ', 'https://akaao.success-corp.co.jp&text=ア@サ!ト?hashtags=ア%オ,元+ア&abc=.ア-オ_イ*シ$ロ']),
|
|
|
|
('///a', []),
|
|
('http://', []),
|
|
('http://../', ['http://../']),
|
|
('http://-error-.invalid/', ['http://-error-.invalid/']),
|
|
('https://a(b)c+1#2?3&4/', ['https://a(b)c+1#2?3&4/']),
|
|
('http://उदाहरण.परीक्षा', ['http://उदाहरण.परीक्षा']),
|
|
('http://例子.测试', ['http://例子.测试']),
|
|
('http://➡.ws/䨹 htps://abc.1243?234', ['http://➡.ws/䨹']),
|
|
('http://⌘.ws">https://exa+mple.com//:abc ', ['http://⌘.ws', 'https://exa+mple.com//:abc']),
|
|
('http://مثال.إختبار/abc?def=ت&ب=abc#abc=234', ['http://مثال.إختبار/abc?def=ت&ب=abc#abc=234']),
|
|
('http://-.~_!$&()*+,;=:%40:80%2f::::::@example.c\'om', ['http://-.~_!$&()*+,;=:%40:80%2f::::::@example.c']),
|
|
|
|
('http://us:pa@ex.co:42/http://ex.co:19/a?_d=4#-a=2.3', ['http://us:pa@ex.co:42/http://ex.co:19/a?_d=4#-a=2.3', 'http://ex.co:19/a?_d=4#-a=2.3']),
|
|
('http://code.google.com/events/#&product=browser', ['http://code.google.com/events/#&product=browser']),
|
|
('http://foo.bar?q=Spaces should be encoded', ['http://foo.bar?q=Spaces']),
|
|
('http://foo.com/blah_(wikipedia)#c(i)t[e]-1', ['http://foo.com/blah_(wikipedia)#c(i)t']),
|
|
('http://foo.com/(something)?after=parens', ['http://foo.com/(something)?after=parens']),
|
|
('http://foo.com/unicode_(✪)_in_parens) abc', ['http://foo.com/unicode_(✪)_in_parens']),
|
|
('http://foo.bar/?q=Test%20URL-encoded%20stuff', ['http://foo.bar/?q=Test%20URL-encoded%20stuff']),
|
|
|
|
('[xyz](http://a.b/?q=(Test)%20U)RL-encoded%20stuff', ['http://a.b/?q=(Test)%20U']),
|
|
('[xyz](http://a.b/?q=(Test)%20U)-ab https://abc+123', ['http://a.b/?q=(Test)%20U', 'https://abc+123']),
|
|
('[xyz](http://a.b/?q=(Test)%20U) https://a(b)c+12)3', ['http://a.b/?q=(Test)%20U', 'https://a(b)c+12']),
|
|
('[xyz](http://a.b/?q=(Test)a\nabchttps://a(b)c+12)3', ['http://a.b/?q=(Test)a', 'https://a(b)c+12']),
|
|
('http://foo.bar/?q=Test%20URL-encoded%20stuff', ['http://foo.bar/?q=Test%20URL-encoded%20stuff']),
|
|
]
|
|
for urls_str, expected_url_matches in URL_REGEX_TESTS:
|
|
url_matches = list(find_all_urls(urls_str))
|
|
assert url_matches == expected_url_matches, 'FAILED URL_REGEX CHECK!'
|
|
|
|
|
|
# More test cases
|
|
_test_url_strs = {
|
|
'example.com': 0,
|
|
'/example.com': 0,
|
|
'//example.com': 0,
|
|
':/example.com': 0,
|
|
'://example.com': 0,
|
|
'htt://example8.com': 0,
|
|
'/htt://example.com': 0,
|
|
'https://example': 1,
|
|
'https://localhost/2345': 1,
|
|
'https://localhost:1234/123': 1,
|
|
'://': 0,
|
|
'https://': 0,
|
|
'http://': 0,
|
|
'ftp://': 0,
|
|
'ftp://example.com': 0,
|
|
'https://example.com': 1,
|
|
'https://example.com/': 1,
|
|
'https://a.example.com': 1,
|
|
'https://a.example.com/': 1,
|
|
'https://a.example.com/what/is/happening.html': 1,
|
|
'https://a.example.com/what/ís/happening.html': 1,
|
|
'https://a.example.com/what/is/happening.html?what=1&2%20b#höw-about-this=1a': 1,
|
|
'https://a.example.com/what/is/happéning/?what=1&2%20b#how-aboüt-this=1a': 1,
|
|
'HTtpS://a.example.com/what/is/happening/?what=1&2%20b#how-about-this=1af&2f%20b': 1,
|
|
'https://example.com/?what=1#how-about-this=1&2%20baf': 1,
|
|
'https://example.com?what=1#how-about-this=1&2%20baf': 1,
|
|
'<test>http://example7.com</test>': 1,
|
|
'https://<test>': 0,
|
|
'https://[test]': 0,
|
|
'http://"test"': 0,
|
|
'http://\'test\'': 0,
|
|
'[https://example8.com/what/is/this.php?what=1]': 1,
|
|
'[and http://example9.com?what=1&other=3#and-thing=2]': 1,
|
|
'<what>https://example10.com#and-thing=2 "</about>': 1,
|
|
'abc<this["https://example11.com/what/is#and-thing=2?whoami=23&where=1"]that>def': 1,
|
|
'sdflkf[what](https://example12.com/who/what.php?whoami=1#whatami=2)?am=hi': 1,
|
|
'<or>http://examplehttp://15.badc</that>': 2,
|
|
'https://a.example.com/one.html?url=http://example.com/inside/of/another?=http://': 2,
|
|
'[https://a.example.com/one.html?url=http://example.com/inside/of/another?=](http://a.example.com)': 3,
|
|
}
|
|
for url_str, num_urls in _test_url_strs.items():
|
|
assert len(list(find_all_urls(url_str))) == num_urls, (
|
|
f'{url_str} does not contain {num_urls} urls')
|