From f4fe9be40b3d162273026c53e561598fcfba8fcf Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Mon, 23 Mar 2015 11:55:16 -0700 Subject: [PATCH 1/5] Initial pass. --- alarmdecoder/decoder.py | 36 +++++++++++-------- alarmdecoder/devices.py | 70 ++++++++++++++++++------------------ alarmdecoder/messages.py | 8 +++-- alarmdecoder/util.py | 1 + alarmdecoder/zonetracking.py | 2 +- 5 files changed, 63 insertions(+), 54 deletions(-) diff --git a/alarmdecoder/decoder.py b/alarmdecoder/decoder.py index 5d12ba8..66ad156 100644 --- a/alarmdecoder/decoder.py +++ b/alarmdecoder/decoder.py @@ -9,6 +9,8 @@ Provides the main AlarmDecoder class. import time import re +from builtins import chr + from .event import event from .util import InvalidMessageError from .messages import Message, ExpanderMessage, RFMessage, LRRMessage @@ -51,15 +53,15 @@ class AlarmDecoder(object): on_write = event.Event("This event is called when data has been written to the device.\n\n**Callback definition:** *def callback(device, data)*") # Constants - KEY_F1 = unichr(1) + unichr(1) + unichr(1) + KEY_F1 = chr(1) + chr(1) + chr(1) """Represents panel function key #1""" - KEY_F2 = unichr(2) + unichr(2) + unichr(2) + KEY_F2 = chr(2) + chr(2) + chr(2) """Represents panel function key #2""" - KEY_F3 = unichr(3) + unichr(3) + unichr(3) + KEY_F3 = chr(3) + chr(3) + chr(3) """Represents panel function key #3""" - KEY_F4 = unichr(4) + unichr(4) + unichr(4) + KEY_F4 = chr(4) + chr(4) + chr(4) """Represents panel function key #4""" - KEY_PANIC = unichr(5) + unichr(5) + unichr(5) + KEY_PANIC = chr(5) + chr(5) + chr(5) """Represents a panic keypress""" BATTERY_TIMEOUT = 30 @@ -74,9 +76,9 @@ class AlarmDecoder(object): """The configuration bits set on the device.""" address_mask = 0xFFFFFFFF """The address mask configured on the device.""" - emulate_zone = [False for _ in range(5)] + emulate_zone = [False for _ in list(range(5))] """List containing the devices zone emulation status.""" - emulate_relay = [False for _ in range(4)] + emulate_relay = [False for _ in list(range(4))] """List containing the devices relay emulation status.""" emulate_lrr = False """The status of the devices LRR emulation.""" @@ -87,7 +89,7 @@ class AlarmDecoder(object): def __init__(self, device): """ - Constructor + Constructora :param device: The low-level device used for this `AlarmDecoder`_ interface. @@ -110,8 +112,8 @@ class AlarmDecoder(object): self.address = 18 self.configbits = 0xFF00 self.address_mask = 0x00000000 - self.emulate_zone = [False for x in range(5)] - self.emulate_relay = [False for x in range(4)] + self.emulate_zone = [False for x in list(range(5))] + self.emulate_relay = [False for x in list(range(4))] self.emulate_lrr = False self.deduplicate = False self.mode = ADEMCO @@ -211,13 +213,13 @@ class AlarmDecoder(object): """ if self._device: - self._device.write(str(data)) + self._device.write(data) def get_config(self): """ Retrieves the configuration from the device. Called automatically by :py:meth:`_on_open`. """ - self.send("C\r") + self.send(b"C\r") def save_config(self): """ @@ -236,7 +238,7 @@ class AlarmDecoder(object): ''.join(['Y' if r else 'N' for r in self.emulate_relay]))) config_entries.append(('LRR', 'Y' if self.emulate_lrr else 'N')) config_entries.append(('DEDUPLICATE', 'Y' if self.deduplicate else 'N')) - config_entries.append(('MODE', PANEL_TYPES.keys()[PANEL_TYPES.values().index(self.mode)])) + config_entries.append(('MODE', list(PANEL_TYPES)[list(PANEL_TYPES.values()).index(self.mode)])) config_string = '&'.join(['='.join(t) for t in config_entries]) @@ -301,6 +303,8 @@ class AlarmDecoder(object): :returns: :py:class:`~alarmdecoder.messages.Message` """ + data = str(data) + if data is not None: data = data.lstrip('\0') @@ -310,6 +314,8 @@ class AlarmDecoder(object): msg = None header = data[0:4] + #print('header', header, type(header), type(header[0])) + if header[0] != '!' or header == '!KPM': msg = self._handle_keypad_message(data) @@ -424,9 +430,9 @@ class AlarmDecoder(object): elif key == 'MASK': self.address_mask = int(val, 16) elif key == 'EXP': - self.emulate_zone = [val[z] == 'Y' for z in range(5)] + self.emulate_zone = [val[z] == 'Y' for z in list(range(5))] elif key == 'REL': - self.emulate_relay = [val[r] == 'Y' for r in range(4)] + self.emulate_relay = [val[r] == 'Y' for r in list(range(4))] elif key == 'LRR': self.emulate_lrr = (val == 'Y') elif key == 'DEDUPLICATE': diff --git a/alarmdecoder/devices.py b/alarmdecoder/devices.py index e3acd8b..3c10af8 100644 --- a/alarmdecoder/devices.py +++ b/alarmdecoder/devices.py @@ -61,7 +61,7 @@ class Device(object): Constructor """ self._id = '' - self._buffer = '' + self._buffer = b'' self._device = None self._running = False self._read_thread = None @@ -171,10 +171,10 @@ class Device(object): except SSL.WantReadError: pass - except CommError, err: + except CommError: self._device.close() - except Exception, err: + except Exception: self._device.close() self._running = False raise @@ -226,7 +226,7 @@ class USBDevice(Device): try: cls.__devices = Ftdi.find_all(query, nocache=True) - except (usb.core.USBError, FtdiError), err: + except (usb.core.USBError, FtdiError) as err: raise CommError('Error enumerating AD2USB devices: {0}'.format(str(err)), err) return cls.__devices @@ -433,10 +433,10 @@ class USBDevice(Device): self._id = self._serial_number - except (usb.core.USBError, FtdiError), err: + except (usb.core.USBError, FtdiError) as err: raise NoDeviceError('Error opening device: {0}'.format(str(err)), err) - except KeyError, err: + except KeyError as err: raise NoDeviceError('Unsupported device. ({0:04x}:{1:04x}) You probably need a newer version of pyftdi.'.format(err[0][0], err[0][1])) else: @@ -478,7 +478,7 @@ class USBDevice(Device): self.on_write(data=data) - except FtdiError, err: + except FtdiError as err: raise CommError('Error writing to device: {0}'.format(str(err)), err) def read(self): @@ -493,7 +493,7 @@ class USBDevice(Device): try: ret = self._device.read_data(1) - except (usb.core.USBError, FtdiError), err: + except (usb.core.USBError, FtdiError) as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) return ret @@ -518,7 +518,7 @@ class USBDevice(Device): timeout_event.reading = True if purge_buffer: - self._buffer = '' + self._buffer = b'' got_line, ret = False, None @@ -530,11 +530,11 @@ class USBDevice(Device): while timeout_event.reading: buf = self._device.read_data(1) - if buf != '': + if buf != b'': self._buffer += buf - if buf == "\n": - self._buffer = self._buffer.rstrip("\r\n") + if buf == b"\n": + self._buffer = self._buffer.rstrip(b"\r\n") if len(self._buffer) > 0: got_line = True @@ -542,12 +542,12 @@ class USBDevice(Device): else: time.sleep(0.01) - except (usb.core.USBError, FtdiError), err: + except (usb.core.USBError, FtdiError) as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) else: if got_line: - ret, self._buffer = self._buffer, '' + ret, self._buffer = self._buffer, b'' self.on_read(data=ret) @@ -653,7 +653,7 @@ class SerialDevice(Device): else: devices = serial.tools.list_ports.comports() - except serial.SerialException, err: + except serial.SerialException as err: raise CommError('Error enumerating serial devices: {0}'.format(str(err)), err) return devices @@ -724,7 +724,7 @@ class SerialDevice(Device): # all issues with it. self._device.baudrate = baudrate - except (serial.SerialException, ValueError, OSError), err: + except (serial.SerialException, ValueError, OSError) as err: raise NoDeviceError('Error opening device on {0}.'.format(self._port), err) else: @@ -764,7 +764,7 @@ class SerialDevice(Device): except serial.SerialTimeoutException: pass - except serial.SerialException, err: + except serial.SerialException as err: raise CommError('Error writing to device.', err) else: @@ -782,7 +782,7 @@ class SerialDevice(Device): try: ret = self._device.read(1) - except serial.SerialException, err: + except serial.SerialException as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) return ret @@ -807,7 +807,7 @@ class SerialDevice(Device): timeout_event.reading = True if purge_buffer: - self._buffer = '' + self._buffer = b'' got_line, ret = False, None @@ -820,11 +820,11 @@ class SerialDevice(Device): buf = self._device.read(1) # NOTE: AD2SERIAL apparently sends down \xFF on boot. - if buf != '' and buf != "\xff": + if buf != b'' and buf != b"\xff": self._buffer += buf - if buf == "\n": - self._buffer = self._buffer.rstrip("\r\n") + if buf == b"\n": + self._buffer = self._buffer.rstrip(b"\r\n") if len(self._buffer) > 0: got_line = True @@ -832,12 +832,12 @@ class SerialDevice(Device): else: time.sleep(0.01) - except (OSError, serial.SerialException), err: + except (OSError, serial.SerialException) as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) else: if got_line: - ret, self._buffer = self._buffer, '' + ret, self._buffer = self._buffer, b'' self.on_read(data=ret) @@ -1001,7 +1001,7 @@ class SocketDevice(Device): self._id = '{0}:{1}'.format(self._host, self._port) - except socket.error, err: + except socket.error as err: raise NoDeviceError('Error opening device at {0}:{1}'.format(self._host, self._port), err) else: @@ -1054,7 +1054,7 @@ class SocketDevice(Device): self.on_write(data=data) - except (SSL.Error, socket.error), err: + except (SSL.Error, socket.error) as err: raise CommError('Error writing to device.', err) return data_sent @@ -1071,7 +1071,7 @@ class SocketDevice(Device): try: data = self._device.recv(1) - except socket.error, err: + except socket.error as err: raise CommError('Error while reading from device: {0}'.format(str(err)), err) return data @@ -1096,7 +1096,7 @@ class SocketDevice(Device): timeout_event.reading = True if purge_buffer: - self._buffer = '' + self._buffer = b'' got_line, ret = False, None @@ -1108,11 +1108,11 @@ class SocketDevice(Device): while timeout_event.reading: buf = self._device.recv(1) - if buf != '': + if buf != b'': self._buffer += buf - if buf == "\n": - self._buffer = self._buffer.rstrip("\r\n") + if buf == b"\n": + self._buffer = self._buffer.rstrip(b"\r\n") if len(self._buffer) > 0: got_line = True @@ -1120,10 +1120,10 @@ class SocketDevice(Device): else: time.sleep(0.01) - except socket.error, err: + except socket.error as err: raise CommError('Error reading from device: {0}'.format(str(err)), err) - except SSL.SysCallError, err: + except SSL.SysCallError as err: errno, msg = err raise CommError('SSL error while reading from device: {0} ({1})'.format(msg, errno)) @@ -1132,7 +1132,7 @@ class SocketDevice(Device): else: if got_line: - ret, self._buffer = self._buffer, '' + ret, self._buffer = self._buffer, b'' self.on_read(data=ret) @@ -1177,7 +1177,7 @@ class SocketDevice(Device): self._device = SSL.Connection(ctx, self._device) - except SSL.Error, err: + except SSL.Error as err: raise CommError('Error setting up SSL connection.', err) def _verify_ssl_callback(self, connection, x509, errnum, errdepth, ok): diff --git a/alarmdecoder/messages.py b/alarmdecoder/messages.py index 6ed5d1f..25327ed 100644 --- a/alarmdecoder/messages.py +++ b/alarmdecoder/messages.py @@ -15,6 +15,8 @@ devices. import re import datetime +from reprlib import repr + from .util import InvalidMessageError from .panels import PANEL_TYPES, ADEMCO, DSC @@ -136,7 +138,7 @@ class Message(BaseMessage): :raises: :py:class:`~alarmdecoder.util.InvalidMessageError` """ - match = self._regex.match(data) + match = self._regex.match(str(data)) if match is None: raise InvalidMessageError('Received invalid message: {0}'.format(data)) @@ -163,7 +165,7 @@ class Message(BaseMessage): self.check_zone = is_bit_set(15) self.perimeter_only = is_bit_set(16) self.system_fault = is_bit_set(17) - if self.bitfield[18] in PANEL_TYPES.keys(): + if self.bitfield[18] in list(PANEL_TYPES): self.panel_type = PANEL_TYPES[self.bitfield[18]] # pos 20-21 - Unused. self.text = alpha.strip('"') @@ -292,7 +294,7 @@ class RFMessage(BaseMessage): """Low battery indication""" supervision = False """Supervision required indication""" - loop = [False for _ in range(4)] + loop = [False for _ in list(range(4))] """Loop indicators""" def __init__(self, data=None): diff --git a/alarmdecoder/util.py b/alarmdecoder/util.py index 23abf4a..cee0963 100644 --- a/alarmdecoder/util.py +++ b/alarmdecoder/util.py @@ -8,6 +8,7 @@ Provides utility classes for the `AlarmDecoder`_ (AD2) devices. import time import threading +from io import open class NoDeviceError(Exception): diff --git a/alarmdecoder/zonetracking.py b/alarmdecoder/zonetracking.py index 36334b0..e3adac6 100644 --- a/alarmdecoder/zonetracking.py +++ b/alarmdecoder/zonetracking.py @@ -294,7 +294,7 @@ class Zonetracker(object): """ zones = [] - for z in self._zones.keys(): + for z in list(self._zones): zones += [z] for z in zones: From 0d3b6fe780c922ae92421289d77fbbcae70d6666 Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Fri, 27 Mar 2015 11:13:47 -0700 Subject: [PATCH 2/5] Python3-friendly fix for the optional SSL stuff in the thread. --- alarmdecoder/devices.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/alarmdecoder/devices.py b/alarmdecoder/devices.py index 3c10af8..9012397 100644 --- a/alarmdecoder/devices.py +++ b/alarmdecoder/devices.py @@ -40,8 +40,16 @@ try: have_openssl = True except ImportError: - from collections import namedtuple - SSL = namedtuple('SSL', ['Error', 'WantReadError', 'SysCallError']) + class SSL: + class Error(BaseException): + pass + + class WantReadError(BaseException): + pass + + class SysCallError(BaseException): + pass + have_openssl = False From b22e7c3d28e0dadf07421b1bc752871e79172878 Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Fri, 27 Mar 2015 12:33:25 -0700 Subject: [PATCH 3/5] Correctly decoding byte string. Added future requirement. --- alarmdecoder/decoder.py | 4 +--- requirements.txt | 1 + setup.py | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/alarmdecoder/decoder.py b/alarmdecoder/decoder.py index 66ad156..3365224 100644 --- a/alarmdecoder/decoder.py +++ b/alarmdecoder/decoder.py @@ -303,7 +303,7 @@ class AlarmDecoder(object): :returns: :py:class:`~alarmdecoder.messages.Message` """ - data = str(data) + data = data.decode('utf-8') if data is not None: data = data.lstrip('\0') @@ -314,8 +314,6 @@ class AlarmDecoder(object): msg = None header = data[0:4] - #print('header', header, type(header), type(header[0])) - if header[0] != '!' or header == '!KPM': msg = self._handle_keypad_message(data) diff --git a/requirements.txt b/requirements.txt index 8313187..9bbdf93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ +future==0.14.3 pyserial==2.7 diff --git a/setup.py b/setup.py index b5e733c..8a436c9 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,8 @@ setup(name='alarmdecoder', license='MIT', packages=['alarmdecoder', 'alarmdecoder.event'], install_requires=[ - 'pyserial>=2.7', + 'future==0.14.3' + 'pyserial==2.7', ], test_suite='nose.collector', tests_require=['nose', 'mock'], From d8958eaad9df5d1acd5a6fc1c236770e9499319b Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Fri, 27 Mar 2015 14:47:22 -0700 Subject: [PATCH 4/5] Fixed string encoding used with send. --- alarmdecoder/decoder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alarmdecoder/decoder.py b/alarmdecoder/decoder.py index 3365224..5dfae87 100644 --- a/alarmdecoder/decoder.py +++ b/alarmdecoder/decoder.py @@ -213,13 +213,13 @@ class AlarmDecoder(object): """ if self._device: - self._device.write(data) + self._device.write(str.encode(data)) def get_config(self): """ Retrieves the configuration from the device. Called automatically by :py:meth:`_on_open`. """ - self.send(b"C\r") + self.send("C\r") def save_config(self): """ From b992ae1500f785f2eac85a20c0af1ccf5668239b Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Thu, 13 Aug 2015 13:04:37 -0700 Subject: [PATCH 5/5] More conversion. --- alarmdecoder/devices.py | 5 +- alarmdecoder/zonetracking.py | 6 +- bin/ad2-firmwareupload | 12 +- examples/alarm_email.py | 9 +- examples/lrr_example.py | 6 +- examples/rf_device.py | 6 +- examples/serialport.py | 6 +- examples/socket_example.py | 6 +- examples/ssl_socket.py | 6 +- examples/usb_detection.py | 8 +- examples/usb_device.py | 6 +- examples/virtual_zone_expander.py | 11 +- setup.py | 8 +- test/test_ad2.py | 92 ++++---- test/test_devices.py | 362 ++++++++++++++++-------------- 15 files changed, 293 insertions(+), 256 deletions(-) diff --git a/alarmdecoder/devices.py b/alarmdecoder/devices.py index 9012397..c310eb7 100644 --- a/alarmdecoder/devices.py +++ b/alarmdecoder/devices.py @@ -20,6 +20,7 @@ import threading import serial import serial.tools.list_ports import socket +from builtins import bytes from .util import CommError, TimeoutError, NoDeviceError, InvalidMessageError from .event import event @@ -825,7 +826,7 @@ class SerialDevice(Device): try: while timeout_event.reading: - buf = self._device.read(1) + buf = bytes(self._device.read(1), 'utf-8') # NOTE: AD2SERIAL apparently sends down \xFF on boot. if buf != b'' and buf != b"\xff": @@ -1114,7 +1115,7 @@ class SocketDevice(Device): try: while timeout_event.reading: - buf = self._device.recv(1) + buf = bytes(self._device.recv(1), 'utf-8') if buf != b'': self._buffer += buf diff --git a/alarmdecoder/zonetracking.py b/alarmdecoder/zonetracking.py index e3adac6..1a7fa1b 100644 --- a/alarmdecoder/zonetracking.py +++ b/alarmdecoder/zonetracking.py @@ -242,7 +242,7 @@ class Zonetracker(object): it = iter(self._zones_faulted) try: while not found_last_faulted: - z = it.next() + z = next(it) if z == self._last_zone_fault: found_last_faulted = True @@ -255,7 +255,7 @@ class Zonetracker(object): # between to our clear list. try: while not at_end and not found_current: - z = it.next() + z = next(it) if z == zone: found_current = True @@ -273,7 +273,7 @@ class Zonetracker(object): try: while not found_current: - z = it.next() + z = next(it) if z == zone: found_current = True diff --git a/bin/ad2-firmwareupload b/bin/ad2-firmwareupload index 8223f91..54f96d2 100755 --- a/bin/ad2-firmwareupload +++ b/bin/ad2-firmwareupload @@ -15,10 +15,10 @@ def handle_firmware(stage): sys.stdout.write('.') sys.stdout.flush() elif stage == alarmdecoder.util.Firmware.STAGE_BOOT: - if handle_firmware.wait_tick > 0: print "" - print "Rebooting device.." + if handle_firmware.wait_tick > 0: print("") + print("Rebooting device..") elif stage == alarmdecoder.util.Firmware.STAGE_LOAD: - print 'Waiting for boot loader..' + print('Waiting for boot loader..') elif stage == alarmdecoder.util.Firmware.STAGE_UPLOADING: if handle_firmware.upload_tick == 0: sys.stdout.write('Uploading firmware.') @@ -29,7 +29,7 @@ def handle_firmware(stage): sys.stdout.write('.') sys.stdout.flush() elif stage == alarmdecoder.util.Firmware.STAGE_DONE: - print "\r\nDone!" + print("\r\nDone!") def main(): device = '/dev/ttyUSB0' @@ -37,7 +37,7 @@ def main(): baudrate = 115200 if len(sys.argv) < 2: - print "Syntax: {0} [device path or hostname:port] [baudrate]".format(sys.argv[0]) + print("Syntax: {0} [device path or hostname:port] [baudrate]".format(sys.argv[0])) sys.exit(1) firmware = sys.argv[1] @@ -47,7 +47,7 @@ def main(): if len(sys.argv) > 3: baudrate = sys.argv[3] - print "Flashing device: {0} - {2} baud\r\nFirmware: {1}".format(device, firmware, baudrate) + print("Flashing device: {0} - {2} baud\r\nFirmware: {1}".format(device, firmware, baudrate)) if ':' in device: hostname, port = device.split(':') diff --git a/examples/alarm_email.py b/examples/alarm_email.py index 999c073..abc5193 100644 --- a/examples/alarm_email.py +++ b/examples/alarm_email.py @@ -31,16 +31,15 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_alarm(sender, **kwargs): """ Handles alarm events from the AlarmDecoder. """ - status = kwargs.pop('status', None) zone = kwargs.pop('zone', None) - text = "Alarm status: {0} - Zone {1}".format(status, zone) + text = "Alarm: Zone {0}".format(zone) # Build the email message msg = MIMEText(text) @@ -58,7 +57,7 @@ def handle_alarm(sender, **kwargs): s.sendmail(FROM_ADDRESS, TO_ADDRESS, msg.as_string()) s.quit() - print 'sent alarm email:', text + print('sent alarm email:', text) if __name__ == '__main__': main() diff --git a/examples/lrr_example.py b/examples/lrr_example.py index 6a2f424..d518373 100644 --- a/examples/lrr_example.py +++ b/examples/lrr_example.py @@ -19,14 +19,14 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_lrr_message(sender, message): """ Handles message events from the AlarmDecoder. """ - print sender, message.partition, message.event_type, message.event_data + print(sender, message.partition, message.event_type, message.event_data) if __name__ == '__main__': main() diff --git a/examples/rf_device.py b/examples/rf_device.py index 6e2f8fb..8662ffe 100644 --- a/examples/rf_device.py +++ b/examples/rf_device.py @@ -29,8 +29,8 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_rfx(sender, message): """ @@ -38,7 +38,7 @@ def handle_rfx(sender, message): """ # Check for our target serial number and loop if message.serial_number == RF_DEVICE_SERIAL_NUMBER and message.loop[0] == True: - print message.serial_number, 'triggered loop #1' + print(message.serial_number, 'triggered loop #1') if __name__ == '__main__': main() diff --git a/examples/serialport.py b/examples/serialport.py index b504d59..c83c059 100644 --- a/examples/serialport.py +++ b/examples/serialport.py @@ -23,14 +23,14 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_message(sender, message): """ Handles message events from the AlarmDecoder. """ - print sender, message.raw + print(sender, message.raw) if __name__ == '__main__': main() diff --git a/examples/socket_example.py b/examples/socket_example.py index 9e4bce6..291f9eb 100644 --- a/examples/socket_example.py +++ b/examples/socket_example.py @@ -21,14 +21,14 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_message(sender, message): """ Handles message events from the AlarmDecoder. """ - print sender, message.raw + print(sender, message.raw) if __name__ == '__main__': main() diff --git a/examples/ssl_socket.py b/examples/ssl_socket.py index 50289c5..cdae3ae 100644 --- a/examples/ssl_socket.py +++ b/examples/ssl_socket.py @@ -35,14 +35,14 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_message(sender, message): """ Handles message events from the AlarmDecoder. """ - print sender, message.raw + print(sender, message.raw) if __name__ == '__main__': main() diff --git a/examples/usb_detection.py b/examples/usb_detection.py index df8f0cd..8155369 100644 --- a/examples/usb_detection.py +++ b/examples/usb_detection.py @@ -21,7 +21,7 @@ def main(): while True: time.sleep(1) - except Exception, ex: + except Exception as ex: print 'Exception:', ex finally: @@ -48,7 +48,7 @@ def handle_message(sender, message): """ Handles message events from the AlarmDecoder. """ - print sender, message.raw + print(sender, message.raw) def handle_attached(sender, device): """ @@ -58,7 +58,7 @@ def handle_attached(sender, device): dev = create_device(device) __devices[dev.id] = dev - print 'attached', dev.id + print('attached', dev.id) def handle_detached(sender, device): """ @@ -72,7 +72,7 @@ def handle_detached(sender, device): del __devices[sernum] - print 'detached', sernum + print('detached', sernum) if __name__ == '__main__': main() diff --git a/examples/usb_device.py b/examples/usb_device.py index 2714057..fd935a7 100644 --- a/examples/usb_device.py +++ b/examples/usb_device.py @@ -16,14 +16,14 @@ def main(): while True: time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + print('Exception:', ex) def handle_message(sender, message): """ Handles message events from the AlarmDecoder. """ - print sender, message.raw + print(sender, message.raw) if __name__ == '__main__': main() diff --git a/examples/virtual_zone_expander.py b/examples/virtual_zone_expander.py index 42ac1c1..591d029 100644 --- a/examples/virtual_zone_expander.py +++ b/examples/virtual_zone_expander.py @@ -47,14 +47,17 @@ def main(): time.sleep(1) - except Exception, ex: - print 'Exception:', ex + except Exception as ex: + import sys + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_exc(exc_traceback) + print('Exception:', ex) def handle_zone_fault(sender, zone): """ Handles zone fault messages. """ - print 'zone faulted', zone + print('zone faulted', zone) # Restore the zone sender.clear_zone(zone) @@ -63,7 +66,7 @@ def handle_zone_restore(sender, zone): """ Handles zone restore messages. """ - print 'zone cleared', zone + print('zone cleared', zone) if __name__ == '__main__': main() diff --git a/setup.py b/setup.py index 8a436c9..d1627f5 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ """Setup script""" +import sys from setuptools import setup def readme(): @@ -8,6 +9,10 @@ def readme(): with open('README.rst') as readme_file: return readme_file.read() +extra_requirements = [] +if sys.version_info < (3,): + extra_requirements.append('future==0.14.3') + setup(name='alarmdecoder', version='0.9.1', description='Python interface for the AlarmDecoder (AD2) family ' @@ -30,9 +35,8 @@ setup(name='alarmdecoder', license='MIT', packages=['alarmdecoder', 'alarmdecoder.event'], install_requires=[ - 'future==0.14.3' 'pyserial==2.7', - ], + ] + extra_requirements, test_suite='nose.collector', tests_require=['nose', 'mock'], scripts=['bin/ad2-sslterm', 'bin/ad2-firmwareupload'], diff --git a/test/test_ad2.py b/test/test_ad2.py index a4771f5..8586ba5 100644 --- a/test/test_ad2.py +++ b/test/test_ad2.py @@ -1,5 +1,7 @@ import time +from builtins import bytes + from unittest import TestCase from mock import Mock, MagicMock, patch @@ -133,11 +135,11 @@ class TestAlarmDecoder(TestCase): def test_send(self): self._decoder.send('test') - self._device.write.assert_called_with('test') + self._device.write.assert_called_with(b'test') def test_get_config(self): self._decoder.get_config() - self._device.write.assert_called_with("C\r") + self._device.write.assert_called_with(b"C\r") def test_save_config(self): self._decoder.save_config() @@ -145,66 +147,66 @@ class TestAlarmDecoder(TestCase): def test_reboot(self): self._decoder.reboot() - self._device.write.assert_called_with('=') + self._device.write.assert_called_with(b'=') def test_fault(self): self._decoder.fault_zone(1) - self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1)) + self._device.write.assert_called_with(bytes("L{0:02}{1}\r".format(1, 1), 'utf-8')) def test_fault_wireproblem(self): self._decoder.fault_zone(1, simulate_wire_problem=True) - self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2)) + self._device.write.assert_called_with(bytes("L{0:02}{1}\r".format(1, 2), 'utf-8')) def test_clear_zone(self): self._decoder.clear_zone(1) - self._device.write.assert_called_with("L{0:02}0\r".format(1)) + self._device.write.assert_called_with(bytes("L{0:02}0\r".format(1), 'utf-8')) def test_message(self): - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertIsInstance(msg, Message) - self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self._decoder._on_read(self, data=b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertTrue(self._message_received) def test_message_kpm(self): - msg = self._decoder._handle_message('!KPM:[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'!KPM:[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertIsInstance(msg, Message) - self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "') + self._decoder._on_read(self, data=b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertTrue(self._message_received) def test_expander_message(self): - msg = self._decoder._handle_message('!EXP:07,01,01') + msg = self._decoder._handle_message(b'!EXP:07,01,01') self.assertIsInstance(msg, ExpanderMessage) - self._decoder._on_read(self, data='!EXP:07,01,01') + self._decoder._on_read(self, data=b'!EXP:07,01,01') self.assertTrue(self._expander_message_received) def test_relay_message(self): self._decoder.open() - msg = self._decoder._handle_message('!REL:12,01,01') + msg = self._decoder._handle_message(b'!REL:12,01,01') self.assertIsInstance(msg, ExpanderMessage) self.assertEquals(self._relay_changed, True) def test_rfx_message(self): - msg = self._decoder._handle_message('!RFX:0180036,80') + msg = self._decoder._handle_message(b'!RFX:0180036,80') self.assertIsInstance(msg, RFMessage) self.assertTrue(self._rfx_message_received) def test_panic(self): self._decoder.open() - msg = self._decoder._handle_message('!LRR:012,1,ALARM_PANIC') + msg = self._decoder._handle_message(b'!LRR:012,1,ALARM_PANIC') self.assertEquals(self._panicked, True) - msg = self._decoder._handle_message('!LRR:012,1,CANCEL') + msg = self._decoder._handle_message(b'!LRR:012,1,CANCEL') self.assertEquals(self._panicked, False) self.assertIsInstance(msg, LRRMessage) def test_config_message(self): self._decoder.open() - msg = self._decoder._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N') + msg = self._decoder._handle_message(b'!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N') self.assertEquals(self._decoder.address, 18) self.assertEquals(self._decoder.configbits, int('ff00', 16)) self.assertEquals(self._decoder.address_mask, int('ffffffff', 16)) @@ -216,100 +218,100 @@ class TestAlarmDecoder(TestCase): self.assertEquals(self._got_config, True) def test_power_changed_event(self): - msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._power_changed, False) # Not set first time we hit it. - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._power_changed, False) - msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._power_changed, True) def test_alarm_event(self): - msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._alarmed, False) # Not set first time we hit it. - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._alarmed, False) self.assertEquals(self._alarm_restored, True) - msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._alarmed, True) def test_zone_bypassed_event(self): - msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._bypassed, False) # Not set first time we hit it. - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._bypassed, False) - msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._bypassed, True) def test_armed_away_event(self): - msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._armed, False) # Not set first time we hit it. - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._armed, False) - msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._armed, True) self._armed = False - msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._armed, False) # Not set first time we hit it. - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._armed, False) - msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._armed, True) def test_battery_low_event(self): - msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000010000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._battery, True) # force the timeout to expire. with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35): - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._battery, False) def test_fire_alarm_event(self): - msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000100----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._fire, True) # force the timeout to expire. with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35): - msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "') + msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "') self.assertEquals(self._fire, False) def test_hit_for_faults(self): - self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "') + self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "') - self._decoder._device.write.assert_called_with('*') + self._decoder._device.write.assert_called_with(b'*') def test_sending_received(self): - self._decoder._on_read(self, data='!Sending.done') + self._decoder._on_read(self, data=b'!Sending.done') self.assertTrue(self._sending_received_status) - self._decoder._on_read(self, data='!Sending.....done') + self._decoder._on_read(self, data=b'!Sending.....done') self.assertFalse(self._sending_received_status) def test_boot(self): - self._decoder._on_read(self, data='!Ready') + self._decoder._on_read(self, data=b'!Ready') self.assertTrue(self._on_boot_received) def test_zone_fault_and_restore(self): - self._decoder._on_read(self, data='[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "') + self._decoder._on_read(self, data=b'[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "') self.assertEquals(self._zone_faulted, 3) - self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "') + self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "') self.assertEquals(self._zone_faulted, 4) - self._decoder._on_read(self, data='[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "') + self._decoder._on_read(self, data=b'[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "') self.assertEquals(self._zone_faulted, 5) - self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "') + self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "') self.assertEquals(self._zone_restored, 3) diff --git a/test/test_devices.py b/test/test_devices.py index 628927a..e2c768d 100644 --- a/test/test_devices.py +++ b/test/test_devices.py @@ -1,168 +1,32 @@ -from unittest import TestCase -from mock import Mock, MagicMock, patch -from serial import Serial, SerialException -from pyftdi.pyftdi.ftdi import Ftdi, FtdiError -from usb.core import USBError, Device as USBCoreDevice import socket import time import tempfile import os -from OpenSSL import SSL, crypto -from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice -from alarmdecoder.util import NoDeviceError, CommError, TimeoutError - - -class TestUSBDevice(TestCase): - def setUp(self): - self._device = USBDevice() - self._device._device = Mock(spec=Ftdi) - self._device._device.usb_dev = Mock(spec=USBCoreDevice) - self._device._device.usb_dev.bus = 0 - self._device._device.usb_dev.address = 0 - - self._attached = False - self._detached = False - - def tearDown(self): - self._device.close() - - def attached_event(self, sender, *args, **kwargs): - self._attached = True - - def detached_event(self, sender, *args, **kwargs): - self._detached = True - - def test_find_default_param(self): - with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): - device = USBDevice.find() - - self.assertEquals(device.interface, 'AD2') - - def test_find_with_param(self): - with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): - device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2')) - self.assertEquals(device.interface, 'AD2-1') - - device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2')) - self.assertEquals(device.interface, 'AD2-2') - - def test_events(self): - self.assertEquals(self._attached, False) - self.assertEquals(self._detached, False) - - # this is ugly, but it works. - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): - USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event) - - with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]): - USBDevice.find_all() - time.sleep(1) - USBDevice.stop_detection() - - self.assertEquals(self._attached, True) - self.assertEquals(self._detached, True) - - def test_find_all(self): - with patch.object(USBDevice, 'find_all', return_value=[]) as mock: - devices = USBDevice.find_all() - - self.assertEquals(devices, []) - - def test_find_all_exception(self): - with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock: - with self.assertRaises(CommError): - devices = USBDevice.find_all() - - with self.assertRaises(CommError): - devices = USBDevice.find_all() - - def test_interface_serial_number(self): - self._device.interface = 'AD2USB' - - self.assertEquals(self._device.interface, 'AD2USB') - self.assertEquals(self._device.serial_number, 'AD2USB') - self.assertEquals(self._device._device_number, 0) - - def test_interface_index(self): - self._device.interface = 1 - - self.assertEquals(self._device.interface, 1) - self.assertEquals(self._device.serial_number, None) - self.assertEquals(self._device._device_number, 1) - - def test_open(self): - self._device.interface = 'AD2USB' - - with patch.object(self._device._device, 'open') as mock: - self._device.open(no_reader_thread=True) - - mock.assert_any_calls() - - def test_open_failed(self): - self._device.interface = 'AD2USB' - - with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]): - with self.assertRaises(NoDeviceError): - self._device.open(no_reader_thread=True) - - with self.assertRaises(NoDeviceError): - self._device.open(no_reader_thread=True) - - def test_write(self): - self._device.interface = 'AD2USB' - self._device.open(no_reader_thread=True) - - with patch.object(self._device._device, 'write_data') as mock: - self._device.write('test') - - mock.assert_called_with('test') - def test_write_exception(self): - with patch.object(self._device._device, 'write_data', side_effect=FtdiError): - with self.assertRaises(CommError): - self._device.write('test') - - def test_read(self): - self._device.interface = 'AD2USB' - self._device.open(no_reader_thread=True) - - with patch.object(self._device._device, 'read_data') as mock: - self._device.read() - - mock.assert_called_with(1) - - def test_read_exception(self): - with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]): - with self.assertRaises(CommError): - self._device.read() - - with self.assertRaises(CommError): - self._device.read() +from unittest import TestCase +from mock import Mock, MagicMock, patch +from serial import Serial, SerialException - def test_read_line(self): - with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")): - ret = None - try: - ret = self._device.read_line() - except StopIteration: - pass +from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice +from alarmdecoder.util import NoDeviceError, CommError, TimeoutError - self.assertEquals(ret, "testing") +# Optional FTDI tests +try: + from pyftdi.pyftdi.ftdi import Ftdi, FtdiError + from usb.core import USBError, Device as USBCoreDevice - def test_read_line_timeout(self): - with patch.object(self._device._device, 'read_data', return_value='a') as mock: - with self.assertRaises(TimeoutError): - self._device.read_line(timeout=0.1) + have_pyftdi = True - self.assertIn('a', self._device._buffer) +except ImportError: + have_pyftdi = False - def test_read_line_exception(self): - with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]): - with self.assertRaises(CommError): - self._device.read_line() +# Optional SSL tests +try: + from OpenSSL import SSL, crypto - with self.assertRaises(CommError): - self._device.read_line() + have_openssl = True +except ImportError: + have_openssl = False class TestSerialDevice(TestCase): @@ -203,14 +67,14 @@ class TestSerialDevice(TestCase): self._device.open(no_reader_thread=True) with patch.object(self._device._device, 'write') as mock: - self._device.write('test') + self._device.write(b'test') - mock.assert_called_with('test') + mock.assert_called_with(b'test') def test_write_exception(self): with patch.object(self._device._device, 'write', side_effect=SerialException): with self.assertRaises(CommError): - self._device.write('test') + self._device.write(b'test') def test_read(self): self._device.interface = '/dev/ttyS0' @@ -234,14 +98,14 @@ class TestSerialDevice(TestCase): except StopIteration: pass - self.assertEquals(ret, "testing") + self.assertEquals(ret, b"testing") def test_read_line_timeout(self): with patch.object(self._device._device, 'read', return_value='a') as mock: with self.assertRaises(TimeoutError): self._device.read_line(timeout=0.1) - self.assertIn('a', self._device._buffer) + self.assertIn('a', self._device._buffer.decode('utf-8')) def test_read_line_exception(self): with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]): @@ -278,14 +142,18 @@ class TestSocketDevice(TestCase): self._device.open(no_reader_thread=True) with patch.object(socket.socket, 'send') as mock: - self._device.write('test') + self._device.write(b'test') - mock.assert_called_with('test') + mock.assert_called_with(b'test') def test_write_exception(self): - with patch.object(self._device._device, 'send', side_effect=[SSL.Error, socket.error]): + side_effects = [socket.error] + if (have_openssl): + side_effects.append(SSL.Error) + + with patch.object(self._device._device, 'send', side_effect=side_effects): with self.assertRaises(CommError): - self._device.write('test') + self._device.write(b'test') def test_read(self): with patch.object(socket.socket, '__init__', return_value=None): @@ -310,14 +178,14 @@ class TestSocketDevice(TestCase): except StopIteration: pass - self.assertEquals(ret, "testing") + self.assertEquals(ret, b"testing") def test_read_line_timeout(self): with patch.object(self._device._device, 'recv', return_value='a') as mock: with self.assertRaises(TimeoutError): self._device.read_line(timeout=0.1) - self.assertIn('a', self._device._buffer) + self.assertIn('a', self._device._buffer.decode('utf-8')) def test_read_line_exception(self): with patch.object(self._device._device, 'recv', side_effect=socket.error): @@ -328,6 +196,9 @@ class TestSocketDevice(TestCase): self._device.read_line() def test_ssl(self): + if not have_openssl: + return + ssl_key = crypto.PKey() ssl_key.generate_key(crypto.TYPE_RSA, 2048) ssl_cert = crypto.X509() @@ -351,7 +222,7 @@ class TestSocketDevice(TestCase): with patch.object(socket.socket, 'fileno', return_value=fileno): try: self._device.open(no_reader_thread=True) - except SSL.SysCallError, ex: + except SSL.SysCallError as ex: pass os.close(fileno) @@ -361,6 +232,9 @@ class TestSocketDevice(TestCase): self.assertIsInstance(self._device._device, SSL.Connection) def test_ssl_exception(self): + if not have_openssl: + return + self._device.ssl = True self._device.ssl_key = 'None' self._device.ssl_certificate = 'None' @@ -376,8 +250,162 @@ class TestSocketDevice(TestCase): with self.assertRaises(CommError): try: self._device.open(no_reader_thread=True) - except SSL.SysCallError, ex: + except SSL.SysCallError as ex: pass os.close(fileno) os.unlink(path) + + +if have_pyftdi: + class TestUSBDevice(TestCase): + def setUp(self): + self._device = USBDevice() + self._device._device = Mock(spec=Ftdi) + self._device._device.usb_dev = Mock(spec=USBCoreDevice) + self._device._device.usb_dev.bus = 0 + self._device._device.usb_dev.address = 0 + + self._attached = False + self._detached = False + + def tearDown(self): + self._device.close() + + def attached_event(self, sender, *args, **kwargs): + self._attached = True + + def detached_event(self, sender, *args, **kwargs): + self._detached = True + + def test_find_default_param(self): + with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]): + device = USBDevice.find() + + self.assertEquals(device.interface, 'AD2') + + def test_find_with_param(self): + with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): + device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2')) + self.assertEquals(device.interface, 'AD2-1') + + device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2')) + self.assertEquals(device.interface, 'AD2-2') + + def test_events(self): + self.assertEquals(self._attached, False) + self.assertEquals(self._detached, False) + + # this is ugly, but it works. + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]): + USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event) + + with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]): + USBDevice.find_all() + time.sleep(1) + USBDevice.stop_detection() + + self.assertEquals(self._attached, True) + self.assertEquals(self._detached, True) + + def test_find_all(self): + with patch.object(USBDevice, 'find_all', return_value=[]) as mock: + devices = USBDevice.find_all() + + self.assertEquals(devices, []) + + def test_find_all_exception(self): + with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock: + with self.assertRaises(CommError): + devices = USBDevice.find_all() + + with self.assertRaises(CommError): + devices = USBDevice.find_all() + + def test_interface_serial_number(self): + self._device.interface = 'AD2USB' + + self.assertEquals(self._device.interface, 'AD2USB') + self.assertEquals(self._device.serial_number, 'AD2USB') + self.assertEquals(self._device._device_number, 0) + + def test_interface_index(self): + self._device.interface = 1 + + self.assertEquals(self._device.interface, 1) + self.assertEquals(self._device.serial_number, None) + self.assertEquals(self._device._device_number, 1) + + def test_open(self): + self._device.interface = 'AD2USB' + + with patch.object(self._device._device, 'open') as mock: + self._device.open(no_reader_thread=True) + + mock.assert_any_calls() + + def test_open_failed(self): + self._device.interface = 'AD2USB' + + with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]): + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + with self.assertRaises(NoDeviceError): + self._device.open(no_reader_thread=True) + + def test_write(self): + self._device.interface = 'AD2USB' + self._device.open(no_reader_thread=True) + + with patch.object(self._device._device, 'write_data') as mock: + self._device.write(b'test') + + mock.assert_called_with(b'test') + + def test_write_exception(self): + with patch.object(self._device._device, 'write_data', side_effect=FtdiError): + with self.assertRaises(CommError): + self._device.write(b'test') + + def test_read(self): + self._device.interface = 'AD2USB' + self._device.open(no_reader_thread=True) + + with patch.object(self._device._device, 'read_data') as mock: + self._device.read() + + mock.assert_called_with(1) + + def test_read_exception(self): + with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]): + with self.assertRaises(CommError): + self._device.read() + + with self.assertRaises(CommError): + self._device.read() + + def test_read_line(self): + with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")): + ret = None + try: + ret = self._device.read_line() + except StopIteration: + pass + + self.assertEquals(ret, b"testing") + + def test_read_line_timeout(self): + with patch.object(self._device._device, 'read_data', return_value='a') as mock: + with self.assertRaises(TimeoutError): + self._device.read_line(timeout=0.1) + + self.assertIn('a', self._device._buffer) + + def test_read_line_exception(self): + with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]): + with self.assertRaises(CommError): + self._device.read_line() + + with self.assertRaises(CommError): + self._device.read_line() \ No newline at end of file