| @@ -7,6 +7,8 @@ from unittest.mock import Mock, AsyncMock | |||
| PIPE = object() | |||
| __all__ = [ 'WSFWDServer', 'WSFWDClient', 'WFProcess', ] | |||
| def timeout(timeout): | |||
| def timeout_wrapper(fun): | |||
| @functools.wraps(fun) | |||
| @@ -184,7 +186,23 @@ class WFStreamWriter: | |||
| await self._closed_event.wait() | |||
| class WSFWDCommon: | |||
| ''' | |||
| The base class for common routines for WSFWD client and server. | |||
| The instance should be used in an async with block to properly | |||
| cancel the task for dispatching messages. | |||
| ''' | |||
| def __init__(self, reader, writer): | |||
| ''' | |||
| The arguments reader and writer are both coroutines. The | |||
| writer takes a single argument which is the byte array to | |||
| send. | |||
| If reader needs to be an async iterator, pass the __anext__ | |||
| method for reader. | |||
| ''' | |||
| self._reader = reader | |||
| self._writer = writer | |||
| self._task = asyncio.create_task(self._process_msgs()) | |||
| @@ -196,10 +214,21 @@ class WSFWDCommon: | |||
| self._procmsgs = dict() | |||
| def add_stream_handler(self, stream, hndlr): | |||
| ''' | |||
| Add hndlr, which is a coroutine, that will be called | |||
| when a payload on stream is received. | |||
| ''' | |||
| # XXX - make sure we don't overwrite an existing one | |||
| self._procmsgs[stream] = hndlr | |||
| async def _process_msgs(self): | |||
| ''' | |||
| Internal routine to dispatch messages received via | |||
| the reader to any handlers registered with | |||
| add_stream_handler. | |||
| ''' | |||
| while True: | |||
| msg = await self._reader() | |||
| #print('got:', repr(msg)) | |||
| @@ -207,12 +236,24 @@ class WSFWDCommon: | |||
| await self._procmsgs[stream](msg[1:]) | |||
| def sendstream(self, stream, *data): | |||
| ''' | |||
| Queue data to be sent to stream. This does NOT start | |||
| sending the data, the drain coroutine must by awaited | |||
| to send the data queued by this function. | |||
| ''' | |||
| if not all(isinstance(x, bytes) for x in data): | |||
| raise ValueError('write data must be bytes') | |||
| self._streams.setdefault(stream, []).extend(data) | |||
| async def drain(self, stream): | |||
| ''' | |||
| Actually send any data queued by calls to sendstream. | |||
| No data is sent until this coroutine is awaited. | |||
| ''' | |||
| datalist = self._streams[stream] | |||
| data = datalist.copy() | |||
| @@ -231,6 +272,11 @@ class WSFWDCommon: | |||
| self._task.cancel() | |||
| async def _sendcmd(self, cmd): | |||
| ''' | |||
| Internal function for sending the dict in cmd. This | |||
| can be used for either commands or response to commands. | |||
| ''' | |||
| await self._writer(b'\x00' + json.dumps(cmd).encode('utf-8')) | |||
| class WSFWDServer(WSFWDCommon): | |||
| @@ -293,6 +339,14 @@ class WSFWDClient(WSFWDCommon): | |||
| await self._cmdq.put(msg) | |||
| async def auth(self, auth): | |||
| ''' | |||
| Send the auth object in an authentication message to | |||
| the server. Some servers require this before allowing | |||
| other commands to be executed. | |||
| If the authentication fails, a RuntimeError is raised. | |||
| ''' | |||
| await self._sendcmd(dict(cmd='auth', auth=auth)) | |||
| rsp = await self._fetchpend[0]() | |||
| @@ -305,10 +359,9 @@ class WSFWDClient(WSFWDCommon): | |||
| writer.feed_data(data) | |||
| async def exec(self, args, stdin=PIPE, stdout=PIPE): | |||
| '''Returns a tuple of (StreamWriter, StreamReader). The | |||
| StreamWriter object is similar, but not exactly like | |||
| asyncio.StreamWriter in that it does not have the transport | |||
| property. | |||
| ''' | |||
| Returns a WFProcess instance. WFProcess is very similar | |||
| to asyncio.Process. | |||
| ''' | |||
| # get the stdin/stdout setup | |||