diff --git a/aiosocks/__init__.py b/aiosocks/__init__.py index 67b7866..e9b7472 100644 --- a/aiosocks/__init__.py +++ b/aiosocks/__init__.py @@ -6,7 +6,9 @@ from .errors import ( from .helpers import ( SocksAddr, Socks4Addr, Socks5Addr, Socks4Auth, Socks5Auth ) -from .protocols import Socks4Protocol, Socks5Protocol, DEFAULT_LIMIT +from .protocols import ( + Socks4Protocol, Socks5Protocol, Socks5DGramProtocol, DEFAULT_LIMIT +) __version__ = '0.2.6' @@ -17,6 +19,108 @@ __all__ = ('Socks4Protocol', 'Socks5Protocol', 'Socks4Auth', 'InvalidServerReply', 'create_connection', 'open_connection') +# https://stackoverflow.com/a/53789029 +def chain__await__(f): + return lambda *args, **kwargs: f(*args, **kwargs).__await__() + +class DGram(object): + '''An object that represents a datagram object. + + Use the send method to send data to the remote host. + + To receive data, simply await on the instance, and the next available + datagram will be returned when available. + + When done, call the close method to shut everything down.''' + + def __init__(self, socksproto, hdr): + self._sockproto = socksproto + self._hdr = hdr + self._q = asyncio.Queue() + + def connection_made(self, transport): + self._dgtrans = transport + + def datagram_received(self, data, addr): + '''Process relay UDP packets from the SOCKS server.''' + + frag, addr, payload = self._sockproto.parse_udp(data) + if frag != 0: + return + + self._q.put_nowait((payload, addr)) + + @property + def proxy_sockname(self): + return self._sockproto.proxy_sockname + + def send(self, data): + '''Send datagram to the SOCKS server. + + This will wrap the datagram as needed before sending it on. + + This currently does not fragment UDP packets.''' + + self._dgtrans.sendto(self._hdr + data, None) + + def close(self): + pass + + @chain__await__ + async def __await__(self): + '''Receive a datagram.''' + + return await self._q.get() + +async def open_datagram(proxy, proxy_auth, dst, *, + remote_resolve=True, loop=None, family=0, + proto=0, flags=0, sock=None, local_addr=None, + server_hostname=None, reader_limit=DEFAULT_LIMIT): + '''Create a transport object used to receive and send UDP packets + to dst, via the SOCKS v5 proxy specified by proxy. + + The returned value is an instance of DGram.''' + + loop = loop or asyncio.get_event_loop() + waiter = asyncio.Future(loop=loop) + + def sockdgram_factory(): + if not isinstance(proxy, Socks5Addr): + raise ValueError('only SOCKS v5 supports UDP') + + return Socks5DGramProtocol(proxy=proxy, proxy_auth=proxy_auth, dst=dst, + app_protocol_factory=None, + waiter=waiter, remote_resolve=remote_resolve, + loop=loop, server_hostname=server_hostname, + reader_limit=reader_limit) + + try: + # connect to socks proxy + transport, protocol = await loop.create_connection( + sockdgram_factory, proxy.host, proxy.port, family=family, + proto=proto, flags=flags, sock=sock, local_addr=local_addr) + except OSError as exc: + raise SocksConnectionError( + '[Errno %s] Can not connect to proxy %s:%d [%s]' % + (exc.errno, proxy.host, proxy.port, exc.strerror)) from exc + + try: + await waiter + except Exception: # noqa + transport.close() + raise + + # Build the header that the SOCKS UDP relay expects + # https://tools.ietf.org/html/rfc1928#section-7 + hdr = (await protocol.build_dst_address(*dst))[0] + hdr = protocol.flatten_req([ 0, 0, 0, ] + hdr) + + # connect to the UDP relay the socks server told us to + dgtrans, dgproto = await loop.create_datagram_endpoint( + lambda: DGram(protocol, hdr), remote_addr=protocol.proxy_sockname) + + return dgproto + async def create_connection(protocol_factory, proxy, proxy_auth, dst, *, remote_resolve=True, loop=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, diff --git a/aiosocks/protocols.py b/aiosocks/protocols.py index 0a43b3d..d96f045 100644 --- a/aiosocks/protocols.py +++ b/aiosocks/protocols.py @@ -17,6 +17,8 @@ DEFAULT_LIMIT = getattr(asyncio.streams, '_DEFAULT_LIMIT', 2**16) class BaseSocksProtocol(asyncio.StreamReaderProtocol): + cmd = c.SOCKS_CMD_CONNECT + def __init__(self, proxy, proxy_auth, dst, app_protocol_factory, waiter, *, remote_resolve=True, loop=None, ssl=False, server_hostname=None, negotiate_done_cb=None, @@ -133,7 +135,8 @@ class BaseSocksProtocol(asyncio.StreamReaderProtocol): async def socks_request(self, cmd): raise NotImplementedError - def write_request(self, request): + @staticmethod + def flatten_req(request): bdata = bytearray() for item in request: @@ -143,6 +146,11 @@ class BaseSocksProtocol(asyncio.StreamReaderProtocol): bdata += item else: raise ValueError('Unsupported item') + + return bdata + + def write_request(self, request): + bdata = self.flatten_req(request) self._stream_writer.write(bdata) async def read_response(self, n): @@ -389,3 +397,37 @@ class Socks5Protocol(BaseSocksProtocol): port = struct.unpack('>H', port)[0] return addr, port + + async def build_udp(self, frag, addr, payload=b''): + req, _ = await self.build_dst_address(*addr) + return self.flatten_req([ 0, 0, frag ] + req + [ payload ]) + + @staticmethod + def parse_udp(payload): + resv, frag, atype = struct.unpack('>HBB', payload[:4]) + + if resv != 0: + raise InvalidServerReply('SOCKS5 proxy server sent invalid data') + + pos = 4 + if atype == c.SOCKS5_ATYP_IPv4: + last = pos + 4 + addr = socket.inet_ntoa(payload[pos:last]) + elif atype == c.SOCKS5_ATYP_DOMAIN: + length = payload[pos] + pos += 1 + last = pos + length + addr = payload[pos:pos + length] + addr = addr.decode('idna') + elif atype == c.SOCKS5_ATYP_IPv6: + last = pos + 16 + addr = socket.inet_ntop(socket.AF_INET6, payload[pos:last]) + else: + raise InvalidServerReply('SOCKS5 proxy server sent invalid data') + + port = int.from_bytes(payload[last:last + 2], 'big') + last += 2 + return frag, (addr, port), payload[last:] + +class Socks5DGramProtocol(Socks5Protocol): + cmd = c.SOCKS_CMD_UDP_ASSOCIATE diff --git a/tests/test_functional.py b/tests/test_functional.py index 6c912bb..03649c4 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -1,12 +1,18 @@ import pytest import aiosocks import aiohttp +import asyncio import os import ssl +import struct from aiohttp import web from aiohttp.test_utils import RawTestServer +from aiohttp.test_utils import make_mocked_coro from aiosocks.test_utils import FakeSocksSrv, FakeSocks4Srv from aiosocks.connector import ProxyConnector, ProxyClientRequest +from aiosocks.errors import SocksConnectionError +from async_timeout import timeout +from unittest import mock async def test_socks4_connect_success(loop): @@ -56,6 +62,109 @@ async def test_socks4_srv_error(loop): assert '0x5b' in str(ct) +# https://stackoverflow.com/a/55693498 +def with_timeout(t): + def wrapper(corofunc): + async def run(*args, **kwargs): + with timeout(t): + return await corofunc(*args, **kwargs) + return run + return wrapper + +async def test_socks4_datagram_failure(): + loop = asyncio.get_event_loop() + + async with FakeSocksSrv(loop, b'') as srv: + addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) + with pytest.raises(ValueError): + await aiosocks.open_datagram(addr, None, None, loop=loop) + +async def test_socks4_datagram_connect_failure(): + loop = asyncio.get_event_loop() + + async def raiseconnerr(*args, **kwargs): + raise OSError(1) + + async with FakeSocksSrv(loop, b'') as srv: + addr = aiosocks.Socks4Addr('127.0.0.1', srv.port) + with mock.patch.object(loop, 'create_connection', + raiseconnerr), pytest.raises(SocksConnectionError): + await aiosocks.open_datagram(addr, None, None, loop=loop) + +@with_timeout(2) +async def test_socks5_datagram_success_anonymous(): + # + # This code is testing aiosocks.open_datagram. + # + # The server it is interacting with is srv (FakeSocksSrv). + # + # We mock the UDP Protocol to the SOCKS server w/ + # sockservdgram (FakeDGramTransport) + # + # UDP packet flow: + # dgram (DGram) -> sockservdgram (FakeDGramTransport) + # which reflects it back for delivery + # + loop = asyncio.get_event_loop() + pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04W' + + respdata = b'response data' + + async with FakeSocksSrv(loop, pld) as srv: + addr = aiosocks.Socks5Addr('127.0.0.1', srv.port) + auth = aiosocks.Socks5Auth('usr', 'pwd') + dname = 'python.org' + portnum = 53 + dst = (dname, portnum) + + class FakeDGramTransport(asyncio.DatagramTransport): + def sendto(self, data, addr=None): + # Verify correct packet was receieved + frag, addr, payload = aiosocks.protocols.Socks5Protocol.parse_udp(data) + assert frag == 0 + assert addr == ('python.org', 53) + assert payload == b'some data' + + # Send frag reply, make sure it's ignored + ba = bytearray() + ba.extend([ 0, 0, 1, 1, 2, 2, 2, 2, ]) + ba += (53).to_bytes(2, 'big') + ba += respdata + dgram.datagram_received(ba, ('3.3.3.3', 0)) + + # Send reply + # wish I could use build_udp here, but it's async + ba = bytearray() + ba.extend([ 0, 0, 0, 1, 2, 2, 2, 2, ]) + ba += (53).to_bytes(2, 'big') + ba += respdata + dgram.datagram_received(ba, ('3.3.3.3', 0)) + + sockservdgram = FakeDGramTransport() + + async def fake_cde(factory, remote_addr): + assert remote_addr == ('1.1.1.1', 1111) + + proto = factory() + + proto.connection_made(sockservdgram) + + return sockservdgram, proto + + with mock.patch.object(loop, 'create_datagram_endpoint', + fake_cde) as m: + dgram = await aiosocks.open_datagram(addr, None, dst, loop=loop) + + assert dgram.proxy_sockname == ('1.1.1.1', 1111) + + dgram.send(b'some data') + # XXX -- assert from fakesockssrv + + assert await dgram == (respdata, ('2.2.2.2', 53)) + + dgram.close() + + async def test_socks5_connect_success_anonymous(loop): pld = b'\x05\x00\x05\x00\x00\x01\x01\x01\x01\x01\x04Wtest' diff --git a/tests/test_protocols.py b/tests/test_protocols.py index 384b986..f144fcc 100644 --- a/tests/test_protocols.py +++ b/tests/test_protocols.py @@ -8,6 +8,7 @@ from asyncio import coroutine as coro, sslproto from aiohttp.test_utils import make_mocked_coro import aiosocks.constants as c from aiosocks.protocols import BaseSocksProtocol +from aiosocks.errors import InvalidServerReply def make_base(loop, *, dst=None, waiter=None, ap_factory=None, ssl=None): @@ -604,6 +605,52 @@ async def test_socks5_rd_addr_domain(loop): assert r == (b'python.org', 80) +async def test_socks5_build_udp_ipv4(loop): + proto = make_socks5(loop) + + assert (await proto.build_udp(5, ('1.2.3.4', 16)) == + b'\x00\x00\x05\x01\x01\x02\x03\x04\x00\x10') + +async def test_socks5_parse_udp_ipv4(loop): + proto = make_socks5(loop) + + frag, addr, data = proto.parse_udp(b'\x00\x00\x07\x01\x01\x02\x09\x04\x00\x20foobar') + + assert frag == 7 + assert addr == ('1.2.9.4', 32) + assert data == b'foobar' + +async def test_socks5_parse_udp_domain(loop): + proto = make_socks5(loop) + + frag, addr, data = proto.parse_udp(b'\x00\x00\x07\x03\x06domain\x00\x20foobar') + + assert frag == 7 + assert addr == ('domain', 32) + assert data == b'foobar' + +async def test_socks5_parse_udp_ipv6(loop): + proto = make_socks5(loop) + + frag, addr, data = proto.parse_udp(b'\x00\x00\x07\x04' + b' \x01\r\xb8\x11\xa3\t\xd7\x1f4\x8a.\x07\xa0v]' + b'\x00\x20foobar') + + assert frag == 7 + assert addr == ('2001:db8:11a3:9d7:1f34:8a2e:7a0:765d', 32) + assert data == b'foobar' + +async def test_socks5_parse_udp_invalid(loop): + proto = make_socks5(loop) + + for i in [ + b'\x01\x00\x07\x01\x01\x02\x09\x04\x00\x20foobar', + b'\x00\x01\x07\x01\x01\x02\x09\x04\x00\x20foobar', + b'\x00\x00\x07\x09\x01\x02\x09\x04\x00\x20foobar', + ]: + with pytest.raises(InvalidServerReply): + proto.parse_udp(i) + async def test_socks5_socks_req_inv_ver(loop): proto = make_socks5(loop, r=[b'\x05\x00', b'\x04\x00\x00'])