Browse Source

minor change to the test to make sure that things can cancel properly...

This introduces a helper function to do so..
main
John-Mark Gurney 3 years ago
parent
commit
18f53e26f9
1 changed files with 52 additions and 16 deletions
  1. +52
    -16
      aiokq/__init__.py

+ 52
- 16
aiokq/__init__.py View File

@@ -37,6 +37,25 @@ KQ_EV_RECEIPT = 0x40

__all__ = [ 'watch_file', 'run_on_modify', ]

@contextlib.asynccontextmanager
async def _with_task(coro):
'''Make sure that the provided coro is complete before returning.
If the task is not yet done, it will be cancelled, and then
waited till it's completed. If the coro supresses the
cancelation, the coro must finish via some other mechanism.
'''

try:
task = asyncio.create_task(coro)
yield task
finally:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

class AsyncKqueue(object):
def __init__(self):
self._kq = select.kqueue()
@@ -50,6 +69,8 @@ class AsyncKqueue(object):
r = self._kq.control([ ], 10, 0)
for i in r:
event, kevent = self._events[i.udata]
#if i.flags == select.KQ_EV_ERROR and i.data != 0:
# print('warning: got error: %s' % repr(i))

# unblock anyone waiting
event.set()
@@ -60,6 +81,8 @@ class AsyncKqueue(object):

def addevent(self, fno, filter, fflags=0):
eid = next(self._idgen)
#print('adding %s, %s' % (repr(eid), repr(fno)))
#sys.stdout.flush()
event = asyncio.Event()
kevent = select.kevent(fno, filter, flags=select.KQ_EV_ADD|select.KQ_EV_ENABLE|select.KQ_EV_CLEAR|KQ_EV_RECEIPT, udata=eid, fflags=fflags)

@@ -74,11 +97,15 @@ class AsyncKqueue(object):
def removeevent(self, id):
event, kevent = self._events[id]

#print('removing %s, %s' % (repr(id), repr(fno)))
#sys.stdout.flush()
kevent.flags=select.KQ_EV_DELETE|KQ_EV_RECEIPT

r = self._kq.control([ kevent ], 1, 0)[0]
if r.flags != select.KQ_EV_ERROR or r.data != 0:
raise RuntimeError('unable to remove event')
if r.flags == select.KQ_EV_ERROR and r.data != 0:
raise RuntimeError(
'unable to remove event(%s), got: %s' %
(repr(kevent), repr(r)))

del self._events[id]

@@ -109,6 +136,17 @@ class AsyncKqueue(object):

Currently, it is just fp, but in the future if fp is a file
name, it is necessary to provide fun a different fp.

Note that this depends upon the passing in fp being valid
the entire lifetime of this function. If you use
create_task from within a with block that opened the fp,
make sure that you cancel the task before leaving the
with block, ala:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
'''

async with self.watch_file(fp) as wf:
@@ -172,25 +210,23 @@ class Tests(unittest.TestCase):
sem.release()

with open('samplefile.txt', 'w+') as fp, open('samplefile.txt', 'r') as rfp:
loop.create_task(run_on_modify(rfp, myfun, 1, foo='bar'))

def fpwrflush(data):
fp.write(data)
fp.flush()

await sem.acquire()
self.assertEqual(res.pop(), (rfp, '', (1,), { 'foo': 'bar' }))
async with _with_task(run_on_modify(rfp, myfun, 1, foo='bar')):
def fpwrflush(data):
fp.write(data)
fp.flush()

fpwrflush('foo')
await sem.acquire()
self.assertEqual(res.pop(), (rfp, '', (1,), { 'foo': 'bar' }))

await sem.acquire()
self.assertEqual(res.pop(), (rfp, 'foo', (1,), { 'foo': 'bar' }))
fpwrflush('foo')

fpwrflush('foobar')
await sem.acquire()
self.assertEqual(res.pop(), (rfp, 'foo', (1,), { 'foo': 'bar' }))

await sem.acquire()
self.assertEqual(res.pop(), (rfp, 'foofoobar', (1,), { 'foo': 'bar' }))
fpwrflush('foobar')

await sem.acquire()
self.assertEqual(res.pop(), (rfp, 'foofoobar', (1,), { 'foo': 'bar' }))

@async_test
async def test_filemod(self):


Loading…
Cancel
Save