TorLinkRotator/contrib/dangerous-side/web-app/main.py

84 lines
4.1 KiB
Python

import re
import urllib
import random
import datetime
import aiohttp
from aiohttp import web
from aiohttp_socks import ProxyType, ProxyConnector
import config
recipient_regex = re.compile(r"^[\w\d\=\+\-\.\%\s\:\/\\_\)\(]{1,256}$")
async def get_link(recipient):
if isinstance(config.TOR_SOCKS5_PROXY, list) or isinstance(config.TOR_SOCKS5_PROXY, tuple):
proxy_host, proxy_port = random.choice(config.TOR_SOCKS5_PROXY).split(':')
else:
proxy_host, proxy_port = config.TOR_SOCKS5_PROXY.split(':')
try:
async with aiohttp.ClientSession(
connector=ProxyConnector(
proxy_type=ProxyType.SOCKS5,
host=proxy_host,
port=int(proxy_port),
rdns=True
),
timeout=aiohttp.ClientTimeout(total=None,sock_connect=30,sock_read=30),
headers={'X-RECIPIENT-ID': recipient} if recipient is not None else {}
) as session:
async with session.get(f"{config.TOR_LINK_ROTATION_URL}/", timeout=30) as response:
return True, await response.json()
except:
return False, None
async def web_handler(request):
recipient = request.cookies.get('cf_clearance', None)
if recipient is not None:
if not recipient_regex.match(recipient):
recipient = None
status, resp = await get_link(recipient)
if not status:
return web.Response(
text="""<html><head><meta http-equiv="refresh" content="1"></head><body><p>We Are Sorry! Error while connecting to rotation gateway. Trying refreshing the page...</p></body><footer><br><p>Source code of Rotation System created by Infinity Team - <a href="https://github.com/TorInfinityProject/TorLinkRotator">Github Link</a></p><p>If you found this Rotation System useful, visit the developers website and buy them a cup of coffee - <a href="https://infinity.taxi/">Infinity Project</a></p></footer></html>""",
content_type='text/html'
)
if resp.get('status', None) == 'OK':
subdomain = '.'.join(request.host.split('.')[:-2])
return web.Response(
text=f"""<html><head><meta http-equiv="refresh" content="10;url={resp['scheme']}://{subdomain + '.' if len(subdomain) > 0 else ''}{resp['link']}{urllib.parse.quote(request.path)}{'?' + urllib.parse.urlencode(request.query) if len(request.query) > 0 else ''}" /></head><body><p>You will be redirected in 10-15 seconds</p><br><p>If you see a "Onion not found" error, please click on the "New Tor Circuit for this Site" button.</p>{'<p>The mirror will be deleted ' + str(int(datetime.timedelta(seconds=resp['lifetime']).total_seconds() // 3600)) + ' hours after the redirect</p>' if resp.get('lifetime') is not None else ''}</body><footer><br><p>Source code of Rotation System created by Infinity Team - <a href="https://github.com/TorInfinityProject/TorLinkRotator">Github Link</a></p><p>If you found this Rotation System useful, visit the developers website and buy them a cup of coffee - <a href="https://infinity.taxi/">Infinity Project</a></p></footer></html>""",
content_type='text/html'
)
else:
return web.Response(text=resp.get('detail', ''))
async def index(request):
if config.INDEX_HTML is None:
return web.Response(status=404)
subdomain = '.'.join(request.host.split('.')[:-2])
alias_name = config.ALIAS_SUBDOMAIN_NAME.get(subdomain, None)
return web.Response(
text=config.INDEX_HTML.format(
alias_subdomain_name=' ' + alias_name if alias_name is not None else '',
subdomain=subdomain + '.' if len(subdomain) > 0 else '',
query_params='?' + urllib.parse.urlencode(request.query) if len(request.query) > 0 else ''
),
content_type='text/html'
)
app = web.Application()
app.add_routes([
web.get(r'/_', index, allow_head=True),
web.post(r'/_', index),
web.get(r'/{path:.*}', web_handler, allow_head=False),
web.post(r'/{path:.*}', web_handler)
])
if __name__ == '__main__':
web.run_app(app)