import pytest import aiosocks import aiohttp import asyncio import os import ssl import struct from aiohttp import web from aiohttp.test_utils import RawTestServer from aiohttp.test_utils import make_mocked_coro from aiosocks.test_utils import FakeSocksSrv, FakeSocks4Srv from aiosocks.connector import ProxyConnector, ProxyClientRequest from aiosocks.errors import SocksConnectionError from aiosocks import constants as c from async_timeout import timeout from unittest import mock async def test_socks4_connect_success(loop): pld = b'\x00\x5a\x04W\x01\x01\x01\x01test' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) auth = aiosocks.Socks4Auth('usr') dst = ('python.org', 80) transport, protocol = await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert protocol.proxy_sockname == ('1.1.1.1', 1111) data = await protocol._stream_reader.read(4) assert data == b'test' transport.close() async def test_socks4_invalid_data(loop): pld = b'\x01\x5a\x04W\x01\x01\x01\x01' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) auth = aiosocks.Socks4Auth('usr') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'invalid data' in str(ct) async def test_socks4_srv_error(loop): pld = b'\x00\x5b\x04W\x01\x01\x01\x01' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) auth = aiosocks.Socks4Auth('usr') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert '0x5b' in str(ct) # https://stackoverflow.com/a/55693498 def with_timeout(t): def wrapper(corofunc): async def run(*args, **kwargs): with timeout(t): return await corofunc(*args, **kwargs) return run return wrapper async def test_socks4_datagram_failure(): loop = asyncio.get_event_loop() async with FakeSocksSrv(loop, b'') as srv: addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) with pytest.raises(ValueError): await aiosocks.open_datagram(addr, None, None, loop=loop) async def test_socks4_datagram_connect_failure(): loop = asyncio.get_event_loop() async def raiseconnerr(*args, **kwargs): raise OSError(1) async with FakeSocksSrv(loop, b'') as srv: addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) with mock.patch.object(loop, 'create_connection', raiseconnerr), pytest.raises(SocksConnectionError): await aiosocks.open_datagram(addr, None, None, loop=loop) @with_timeout(2) async def test_socks5_datagram_success_anonymous(): # # This code is testing aiosocks.open_datagram. # # The server it is interacting with is srv (FakeSocksSrv). # # We mock the UDP Protocol to the SOCKS server w/ # sockservdgram (FakeDGramTransport) # # UDP packet flow: # dgram (DGram) -> sockservdgram (FakeDGramTransport) # which reflects it back for delivery # loop = asyncio.get_event_loop() pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W' respdata = b'response data' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dname = 'python.org' portnum = 53 dst = (dname, portnum) # Fake SOCKS server UDP relay class FakeDGramTransport(asyncio.DatagramTransport): def sendto(self, data, addr=None): # Verify correct packet was receieved frag, addr, payload = aiosocks.protocols.Socks5Protocol.parse_udp(data) assert frag == 0 assert addr == ('python.org', 53) assert payload == b'some data' # Send frag reply, make sure it's ignored ba = bytearray() ba.extend([ 0, 0, 1, 1, 2, 2, 2, 2, ]) ba += (53).to_bytes(2, 'big') ba += respdata dgram.datagram_received(ba, ('3.3.3.3', 0)) # Send reply # wish I could use build_udp here, but it's async ba = bytearray() ba.extend([ 0, 0, 0, 1, 2, 2, 2, 2, ]) ba += (53).to_bytes(2, 'big') ba += respdata dgram.datagram_received(ba, ('3.3.3.3', 0)) sockservdgram = FakeDGramTransport() # Fake the creation of the UDP relay async def fake_cde(factory, remote_addr): assert remote_addr == ('1.1.1.1', 1111) proto = factory() proto.connection_made(sockservdgram) return sockservdgram, proto # Open the UDP connection with mock.patch.object(loop, 'create_datagram_endpoint', fake_cde) as m: dgram = await aiosocks.open_datagram(addr, None, dst, loop=loop) rdr = srv.get_reader() # make sure we negotiated the correct command assert (await rdr.readexactly(5))[4] == c.SOCKS_CMD_UDP_ASSOCIATE assert dgram.proxy_sockname == ('1.1.1.1', 1111) dgram.send(b'some data') # XXX -- assert from fakesockssrv assert await dgram == (respdata, ('2.2.2.2', 53)) dgram.close() async def test_socks5_connect_success_anonymous(loop): pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) transport, protocol = await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert protocol.proxy_sockname == ('1.1.1.1', 1111) data = await protocol._stream_reader.read(4) assert data == b'test' transport.close() async def test_socks5_connect_success_usr_pwd(loop): pld = b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) transport, protocol = await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert protocol.proxy_sockname == ('1.1.1.1', 1111) data = await protocol._stream_reader.read(4) assert data == b'test' transport.close() async def test_socks5_auth_ver_err(loop): async with FakeSocksSrv(loop, b'\x04\x02') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'invalid version' in str(ct) async def test_socks5_auth_method_rejected(loop): async with FakeSocksSrv(loop, b'\x05\xFF') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'authentication methods were rejected' in str(ct) async def test_socks5_auth_status_invalid(loop): async with FakeSocksSrv(loop, b'\x05\xF0') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'invalid data' in str(ct) async def test_socks5_auth_status_invalid2(loop): async with FakeSocksSrv(loop, b'\x05\x02\x02\x00') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'invalid data' in str(ct) async def test_socks5_auth_failed(loop): async with FakeSocksSrv(loop, b'\x05\x02\x01\x01') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'authentication failed' in str(ct) async def test_socks5_cmd_ver_err(loop): async with FakeSocksSrv(loop, b'\x05\x02\x01\x00\x04\x00\x00') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'invalid version' in str(ct) async def test_socks5_cmd_not_granted(loop): async with FakeSocksSrv(loop, b'\x05\x02\x01\x00\x05\x01\x00') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'General SOCKS server failure' in str(ct) async def test_socks5_invalid_address_type(loop): async with FakeSocksSrv(loop, b'\x05\x02\x01\x00\x05\x00\x00\xFF') as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) with pytest.raises(aiosocks.SocksError) as ct: await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert 'invalid data' in str(ct) async def test_socks5_atype_ipv4(loop): pld = b'\x05\x02\x01\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) transport, protocol = await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert protocol.proxy_sockname == ('1.1.1.1', 1111) transport.close() async def test_socks5_atype_ipv6(loop): pld = b'\x05\x02\x01\x00\x05\x00\x00\x04\x00\x00\x00\x00' \ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x11\x04W' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) transport, protocol = await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert protocol.proxy_sockname == ('::111', 1111) or \ protocol.proxy_sockname == ('::0.0.1.17', 1111) transport.close() async def test_socks5_atype_domain(loop): pld = b'\x05\x02\x01\x00\x05\x00\x00\x03\x0apython.org\x04W' async with FakeSocksSrv(loop, pld) as srv: addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) auth = aiosocks.Socks5Auth('usr', 'pwd') dst = ('python.org', 80) transport, protocol = await aiosocks.create_connection( None, addr, auth, dst, loop=loop) assert protocol.proxy_sockname == (b'python.org', 1111) transport.close() async def test_http_connect(loop): async def handler(request): return web.Response(text='Test message') async with RawTestServer(handler, host='127.0.0.1', loop=loop) as ws: async with FakeSocks4Srv(loop) as srv: conn = ProxyConnector(loop=loop, remote_resolve=False) async with aiohttp.ClientSession( connector=conn, loop=loop, request_class=ProxyClientRequest) as ses: proxy = 'socks4://127.0.0.1:{}'.format(srv.port) async with ses.get(ws.make_url('/'), proxy=proxy) as resp: assert resp.status == 200 assert (await resp.text()) == 'Test message' async def test_https_connect(loop): async def handler(request): return web.Response(text='Test message') here = os.path.join(os.path.dirname(__file__), '..', 'tests') keyfile = os.path.join(here, 'sample.key') certfile = os.path.join(here, 'sample.crt') sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslcontext.load_cert_chain(certfile, keyfile) ws = RawTestServer(handler, scheme='https', host='127.0.0.1', loop=loop) await ws.start_server(loop=loop, ssl=sslcontext) v_fp = (b'0\x9a\xc9D\x83\xdc\x91\'\x88\x91\x11\xa1d\x97\xfd' b'\xcb~7U\x14D@L' b'\x11\xab\x99\xa8\xae\xb7\x14\xee\x8b') inv_fp = (b'0\x9d\xc9D\x83\xdc\x91\'\x88\x91\x11\xa1d\x97\xfd' b'\xcb~7U\x14D@L' b'\x11\xab\x99\xa8\xae\xb7\x14\xee\x9e') async with FakeSocks4Srv(loop) as srv: v_conn = ProxyConnector(loop=loop, remote_resolve=False, verify_ssl=False, fingerprint=v_fp) inv_conn = ProxyConnector(loop=loop, remote_resolve=False, verify_ssl=False, fingerprint=inv_fp) async with aiohttp.ClientSession( connector=v_conn, loop=loop, request_class=ProxyClientRequest) as ses: proxy = 'socks4://127.0.0.1:{}'.format(srv.port) async with ses.get(ws.make_url('/'), proxy=proxy) as resp: assert resp.status == 200 assert (await resp.text()) == 'Test message' async with aiohttp.ClientSession( connector=inv_conn, loop=loop, request_class=ProxyClientRequest) as ses: proxy = 'socks4://127.0.0.1:{}'.format(srv.port) with pytest.raises(aiohttp.ServerFingerprintMismatch): async with ses.get(ws.make_url('/'), proxy=proxy) as resp: assert resp.status == 200