Browse Source

add a wrapper around kqueue for VNODE that is asyncio friendly.

main
John-Mark Gurney 9 months ago
commit
bae7061560
8 changed files with 324 additions and 0 deletions
  1. +4
    -0
      .gitignore
  2. +23
    -0
      LICENSE.txt
  3. +14
    -0
      Makefile
  4. +30
    -0
      README.md
  5. +179
    -0
      aiokq/__init__.py
  6. +45
    -0
      misc/kqtest.py
  7. +4
    -0
      requirements.txt
  8. +25
    -0
      setup.py

+ 4
- 0
.gitignore View File

@@ -0,0 +1,4 @@
.coverage
p
*.pyc
aiokq.egg-info

+ 23
- 0
LICENSE.txt View File

@@ -0,0 +1,23 @@
# Copyright 2020 John-Mark Gurney.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.

+ 14
- 0
Makefile View File

@@ -0,0 +1,14 @@
PROJNAME=aiokq
VIRTUALENV ?= virtualenv-3.7
VRITUALENVARGS =

FILES=$(PROJNAME)/__init__.py

test:
(echo $(FILES) | entr sh -c 'python -m coverage run -m unittest $(PROJNAME) && coverage report --omit=p/\* -m -i')

test-noentr:
python -m coverage run -m unittest $(PROJNAME) && coverage report --omit=p/\* -m -i

env:
($(VIRTUALENV) $(VIRTUALENVARGS) p && . ./p/bin/activate && pip install -r requirements.txt)

+ 30
- 0
README.md View File

@@ -0,0 +1,30 @@
aiokq
=====

This is a module to make select.kqueue module compatible with programs
that use asyncio.

The core of kqueue is already implemented via the core asyncio, but
other parts of kqueue, like EVFILT_VNODE and EVFILT_PROC are not. This
module is currently limited to supporting basic EVFILT_VNODE
functionality.

Sample Usage
============

To watch a file for modification:
```
fp = open(fname)
async with aiokq.watch_file(fp) as wf:
while True:
data = fp.read()
# do some work on data

# wait for a modification
await wf()
```

The with symantics is required in order to address the race where a
write is issued between the registration and the time that you do the
read. There is the possibility that a wakeup happens and there are
no modifications due to this race.

+ 179
- 0
aiokq/__init__.py View File

@@ -0,0 +1,179 @@
# Copyright 2020 John-Mark Gurney.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#

import asyncio
import contextlib
import itertools
import os
import select
import shutil
import tempfile
import unittest

# XXX - this matches on both FreeBSD and Darwin
KQ_EV_RECEIPT = 0x40

class AsyncKqueue(object):
def __init__(self):
self._kq = select.kqueue()
self._idgen = itertools.count()
self._events = {}

loop = asyncio.get_event_loop()
loop.add_reader(self._kq, self.handleread)

def handleread(self):
r = self._kq.control([ ], 10, 0)
for i in r:
event, kevent = self._events[i.udata]

# unblock anyone waiting
event.set()
event.clear()

def isempty(self):
return len(self._events) == 0

def addevent(self, fno, filter, fflags=0):
eid = next(self._idgen)
event = asyncio.Event()
kevent = select.kevent(fno, filter, flags=select.KQ_EV_ADD|select.KQ_EV_ENABLE|select.KQ_EV_CLEAR|KQ_EV_RECEIPT, udata=eid, fflags=fflags)

r = self._kq.control([ kevent ], 1, 0)[0]
if r.flags != select.KQ_EV_ERROR or r.data != 0:
raise RuntimeError('unable to add event')

self._events[eid] = (event, kevent)

return eid, event.wait

def removeevent(self, id):
event, kevent = self._events[id]

kevent.flags=select.KQ_EV_DELETE|KQ_EV_RECEIPT

r = self._kq.control([ kevent ], 1, 0)[0]
if r.flags != select.KQ_EV_ERROR or r.data != 0:
raise RuntimeError('unable to remove event')

del self._events[id]

@contextlib.asynccontextmanager
async def watch_file(self, fp):
try:
id, waitfun = self.addevent(fp, select.KQ_FILTER_VNODE, select.KQ_NOTE_EXTEND|select.KQ_NOTE_WRITE)
yield waitfun
finally:
self.removeevent(id)

_globalkq = AsyncKqueue()

watch_file = _globalkq.watch_file

# https://stackoverflow.com/questions/23033939/how-to-test-python-3-4-asyncio-code
# Slightly modified to timeout and to print trace back when canceled.
# This makes it easier to figure out what "froze".
def async_test(f):
def wrapper(*args, **kwargs):
async def tbcapture():
try:
return await f(*args, **kwargs)
except asyncio.CancelledError as e: # pragma: no cover
# if we are going to be cancelled, print out a tb
import traceback
traceback.print_exc()
raise

loop = asyncio.get_event_loop()

# timeout after 4 seconds
loop.run_until_complete(asyncio.wait_for(tbcapture(), 4))

return wrapper

class Tests(unittest.TestCase):
def setUp(self):
# setup temporary directory
d = os.path.realpath(tempfile.mkdtemp())
self.basetempdir = d
self.tempdir = os.path.join(d, 'subdir')
os.mkdir(self.tempdir)

os.chdir(self.tempdir)

def tearDown(self):
#print('td:', time.time())
shutil.rmtree(self.basetempdir)
self.tempdir = None

@async_test
async def test_filemod(self):
loop = asyncio.get_event_loop()

with open('samplefile.txt', 'w+') as fp, open('samplefile.txt', 'r') as rfp:
def fpwrflush(data):
fp.write(data)
fp.flush()

# schedule some writes for later
loop.call_later(.01, fpwrflush, 'something')
loop.call_later(.03, fpwrflush, 'end')

res = []
async with watch_file(rfp) as wf:
while True:
# read some data and record it
data = rfp.read()
res.append(data)

# exit if we're at the end
if data == 'end':
break

# wait for a modification
await wf()

# make sure we got the writes batched properly
self.assertEqual(res, [ '', 'something', 'end' ])

# make sure the event got removed
self.assertTrue(_globalkq.isempty())

def test_errors(self):
eid = 100
kevent = select.kevent(100, select.KQ_FILTER_VNODE, flags=select.KQ_EV_ADD|select.KQ_EV_ENABLE|select.KQ_EV_CLEAR|KQ_EV_RECEIPT, udata=eid)
_globalkq._events[eid] = (None, kevent)

try:
# make sure that when removing an invalid event, we get an error
self.assertRaises(RuntimeError, _globalkq.removeevent, eid)
finally:
del _globalkq._events[eid]

# that an invalid event raises an exception
self.assertRaises(RuntimeError, _globalkq.addevent, 100, select.KQ_FILTER_VNODE, select.KQ_NOTE_EXTEND|select.KQ_NOTE_WRITE)

# make sure their are no pending events
self.assertTrue(_globalkq.isempty())

+ 45
- 0
misc/kqtest.py View File

@@ -0,0 +1,45 @@
#
# Test to figure out how to trigger the event.
#
# Key points:
# CLEAR is needed if you want to reset things
# fp.flush is needed otherwise kernel does not see the write
#

import select

kq = select.kqueue()

print('begining')

with open('somefile.txt', 'w+') as fp, open('somefile.txt', 'r') as rfp:
print('fno:', fp.fileno())
print('rfno:', rfp.fileno())

kevent = select.kevent(rfp, select.KQ_FILTER_VNODE, flags=select.KQ_EV_ADD|select.KQ_EV_ENABLE|select.KQ_EV_CLEAR|0x40, fflags=select.KQ_NOTE_EXTEND)

r = kq.control([ kevent ], 1, 0)
print(repr(r))

fp.write('foo!')
fp.flush()

r = kq.control([], 1, 0)

print('r:', repr(r))

print('rfp.tell:', rfp.tell())
print('rfp.read:', repr(rfp.read()))

r = kq.control([], 1, 0)

print(repr(r))

fp.write('foo!')
fp.flush()

r = kq.control([], 1, 0)

print(repr(r))

print('fp.tell:', fp.tell())

+ 4
- 0
requirements.txt View File

@@ -0,0 +1,4 @@
# use setup.py for dependancy info
-e .

-e .[dev]

+ 25
- 0
setup.py View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python

from setuptools import setup

setup(name='aiokq',
version='0.1.0',
description='Asyncio wrapper for kqueue.',
author='John-Mark Gurney',
author_email='jmg@funkthat.com',
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: BSD License',
],
url='https://www.funkthat.com/gitea/jmg/aiokq',
packages=[ 'aiokq', ],
install_requires=[
],
extras_require = {
'dev': [ 'coverage' ],
},
entry_points={
'console_scripts': [
]
}
)

Loading…
Cancel
Save