Browse Source

Merged old 2to3 branch in with a few other tweaks.

pyserial_fix
Scott Petersen 7 years ago
parent
commit
c25a70ab85
10 changed files with 354 additions and 177 deletions
  1. +14
    -8
      alarmdecoder/decoder.py
  2. +39
    -21
      alarmdecoder/devices.py
  3. +5
    -3
      alarmdecoder/messages.py
  4. +1
    -0
      alarmdecoder/util.py
  5. +8
    -9
      bin/ad2-firmwareupload
  6. +1
    -2
      examples/alarm_email.py
  7. +1
    -0
      requirements.txt
  8. +7
    -2
      setup.py
  9. +81
    -79
      test/test_ad2.py
  10. +197
    -53
      test/test_devices.py

+ 14
- 8
alarmdecoder/decoder.py View File

@@ -9,6 +9,8 @@ Provides the main AlarmDecoder class.
import time
import re

from builtins import chr

from .event import event
from .util import InvalidMessageError
from .messages import Message, ExpanderMessage, RFMessage, LRRMessage
@@ -74,9 +76,9 @@ class AlarmDecoder(object):
"""The configuration bits set on the device."""
address_mask = 0xFFFFFFFF
"""The address mask configured on the device."""
emulate_zone = [False for _ in range(5)]
emulate_zone = [False for _ in list(range(5))]
"""List containing the devices zone emulation status."""
emulate_relay = [False for _ in range(4)]
emulate_relay = [False for _ in list(range(4))]
"""List containing the devices relay emulation status."""
emulate_lrr = False
"""The status of the devices LRR emulation."""
@@ -111,8 +113,8 @@ class AlarmDecoder(object):
self.address = 18
self.configbits = 0xFF00
self.address_mask = 0xFFFFFFFF
self.emulate_zone = [False for x in range(5)]
self.emulate_relay = [False for x in range(4)]
self.emulate_zone = [False for x in list(range(5))]
self.emulate_relay = [False for x in list(range(4))]
self.emulate_lrr = False
self.deduplicate = False
self.mode = ADEMCO
@@ -231,7 +233,7 @@ class AlarmDecoder(object):
"""

if self._device:
self._device.write(str(data))
self._device.write(str.encode(data))

def get_config(self):
"""
@@ -258,7 +260,9 @@ class AlarmDecoder(object):
''.join(['Y' if r else 'N' for r in self.emulate_relay])))
config_entries.append(('LRR', 'Y' if self.emulate_lrr else 'N'))
config_entries.append(('DEDUPLICATE', 'Y' if self.deduplicate else 'N'))
config_entries.append(('MODE', list(PANEL_TYPES.keys())[list(PANEL_TYPES.values()).index(self.mode)]))
config_entries.append(('MODE', list(PANEL_TYPES)[list(PANEL_TYPES.values()).index(self.mode)]))

config_string = '&'.join(['='.join(t) for t in config_entries])

return '&'.join(['='.join(t) for t in config_entries])

@@ -321,6 +325,8 @@ class AlarmDecoder(object):
:returns: :py:class:`~alarmdecoder.messages.Message`
"""

data = data.decode('utf-8')

if data is not None:
data = data.lstrip('\0')

@@ -444,9 +450,9 @@ class AlarmDecoder(object):
elif key == 'MASK':
self.address_mask = int(val, 16)
elif key == 'EXP':
self.emulate_zone = [val[z] == 'Y' for z in range(5)]
self.emulate_zone = [val[z] == 'Y' for z in list(range(5))]
elif key == 'REL':
self.emulate_relay = [val[r] == 'Y' for r in range(4)]
self.emulate_relay = [val[r] == 'Y' for r in list(range(4))]
elif key == 'LRR':
self.emulate_lrr = (val == 'Y')
elif key == 'DEDUPLICATE':


+ 39
- 21
alarmdecoder/devices.py View File

@@ -21,10 +21,12 @@ import serial
import serial.tools.list_ports
import socket
import select
from builtins import bytes

from .util import CommError, TimeoutError, NoDeviceError, InvalidMessageError
from .event import event

have_pyftdi = False
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
import usb.core
@@ -33,7 +35,15 @@ try:
have_pyftdi = True

except ImportError:
have_pyftdi = False
try:
from pyftdi.ftdi import Ftdi, FtdiError
import usb.core
import usb.util

have_pyftdi = True

except ImportError:
have_pyftdi = False

try:
from OpenSSL import SSL, crypto
@@ -41,8 +51,16 @@ try:
have_openssl = True

except ImportError:
from collections import namedtuple
SSL = namedtuple('SSL', ['Error', 'WantReadError', 'SysCallError'])
class SSL:
class Error(BaseException):
pass

class WantReadError(BaseException):
pass

class SysCallError(BaseException):
pass

have_openssl = False


@@ -62,7 +80,7 @@ class Device(object):
Constructor
"""
self._id = ''
self._buffer = ''
self._buffer = b''
self._device = None
self._running = False
self._read_thread = None
@@ -519,7 +537,7 @@ class USBDevice(Device):
timeout_event.reading = True

if purge_buffer:
self._buffer = ''
self._buffer = b''

got_line, ret = False, None

@@ -531,11 +549,11 @@ class USBDevice(Device):
while timeout_event.reading:
buf = self._device.read_data(1)

if buf != '':
if buf != b'':
self._buffer += buf

if buf == "\n":
self._buffer = self._buffer.rstrip("\r\n")
if buf == b"\n":
self._buffer = self._buffer.rstrip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
@@ -548,7 +566,7 @@ class USBDevice(Device):

else:
if got_line:
ret, self._buffer = self._buffer, ''
ret, self._buffer = self._buffer, b''

self.on_read(data=ret)

@@ -814,7 +832,7 @@ class SerialDevice(Device):
timeout_event.reading = True

if purge_buffer:
self._buffer = ''
self._buffer = b''

got_line, ret = False, None

@@ -824,14 +842,14 @@ class SerialDevice(Device):

try:
while timeout_event.reading:
buf = self._device.read(1)
buf = bytes(self._device.read(1), 'utf-8')

# NOTE: AD2SERIAL apparently sends down \xFF on boot.
if buf != '' and buf != "\xff":
if buf != b'' and buf != b"\xff":
self._buffer += buf

if buf == "\n":
self._buffer = self._buffer.rstrip("\r\n")
if buf == b"\n":
self._buffer = self._buffer.rstrip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
@@ -844,7 +862,7 @@ class SerialDevice(Device):

else:
if got_line:
ret, self._buffer = self._buffer, ''
ret, self._buffer = self._buffer, b''

self.on_read(data=ret)

@@ -1112,7 +1130,7 @@ class SocketDevice(Device):
timeout_event.reading = True

if purge_buffer:
self._buffer = ''
self._buffer = b''

got_line, ret = False, None

@@ -1128,13 +1146,13 @@ class SocketDevice(Device):
time.sleep(0.01)
continue

buf = self._device.recv(1)
buf = bytes(self._device.recv(1), 'utf-8')

if buf != '':
if buf != b'':
self._buffer += buf

if buf == "\n":
self._buffer = self._buffer.rstrip("\r\n")
if buf == b"\n":
self._buffer = self._buffer.rstrip(b"\r\n")

if len(self._buffer) > 0:
got_line = True
@@ -1155,7 +1173,7 @@ class SocketDevice(Device):

else:
if got_line:
ret, self._buffer = self._buffer, ''
ret, self._buffer = self._buffer, b''

self.on_read(data=ret)



+ 5
- 3
alarmdecoder/messages.py View File

@@ -15,6 +15,8 @@ devices.
import re
import datetime

from reprlib import repr

from .util import InvalidMessageError
from .panels import PANEL_TYPES, ADEMCO, DSC

@@ -136,7 +138,7 @@ class Message(BaseMessage):

:raises: :py:class:`~alarmdecoder.util.InvalidMessageError`
"""
match = self._regex.match(data)
match = self._regex.match(str(data))

if match is None:
raise InvalidMessageError('Received invalid message: {0}'.format(data))
@@ -163,7 +165,7 @@ class Message(BaseMessage):
self.check_zone = is_bit_set(15)
self.perimeter_only = is_bit_set(16)
self.system_fault = is_bit_set(17)
if self.bitfield[18] in list(PANEL_TYPES.keys()):
if self.bitfield[18] in list(PANEL_TYPES):
self.panel_type = PANEL_TYPES[self.bitfield[18]]
# pos 20-21 - Unused.
self.text = alpha.strip('"')
@@ -291,7 +293,7 @@ class RFMessage(BaseMessage):
"""Low battery indication"""
supervision = False
"""Supervision required indication"""
loop = [False for _ in range(4)]
loop = [False for _ in list(range(4))]
"""Loop indicators"""

def __init__(self, data=None):


+ 1
- 0
alarmdecoder/util.py View File

@@ -8,6 +8,7 @@ Provides utility classes for the `AlarmDecoder`_ (AD2) devices.

import time
import threading
from io import open


class NoDeviceError(Exception):


+ 8
- 9
bin/ad2-firmwareupload View File

@@ -16,10 +16,10 @@ def handle_firmware(stage, **kwargs):
sys.stdout.write('.')
sys.stdout.flush()
elif stage == alarmdecoder.util.Firmware.STAGE_BOOT:
if handle_firmware.wait_tick > 0: print ""
print "Rebooting device.."
if handle_firmware.wait_tick > 0: print("")
print("Rebooting device..")
elif stage == alarmdecoder.util.Firmware.STAGE_LOAD:
print 'Waiting for boot loader..'
print('Waiting for boot loader..')
elif stage == alarmdecoder.util.Firmware.STAGE_UPLOADING:
if handle_firmware.upload_tick == 0:
sys.stdout.write('Uploading firmware.')
@@ -30,11 +30,11 @@ def handle_firmware(stage, **kwargs):
sys.stdout.write('.')
sys.stdout.flush()
elif stage == alarmdecoder.util.Firmware.STAGE_DONE:
print "\r\nDone!"
print("\r\nDone!")
elif stage == alarmdecoder.util.Firmware.STAGE_ERROR:
print "\r\nError: {0}".format(kwargs.get("error", ""))
print("\r\nError: {0}".format(kwargs.get("error", "")))
elif stage == alarmdecoder.util.Firmware.STAGE_DEBUG:
print "\r\nDBG: {0}".format(kwargs.get("data", ""))
print("\r\nDBG: {0}".format(kwargs.get("data", "")))

def main():
device = '/dev/ttyUSB0'
@@ -42,7 +42,7 @@ def main():
baudrate = 115200

if len(sys.argv) < 2:
print "Syntax: {0} <firmware> [device path or hostname:port] [baudrate]".format(sys.argv[0])
print("Syntax: {0} <firmware> [device path or hostname:port] [baudrate]".format(sys.argv[0]))
sys.exit(1)

firmware = sys.argv[1]
@@ -53,8 +53,7 @@ def main():
baudrate = sys.argv[3]

debug = os.environ.get("ALARMDECODER_DEBUG") is not None

print "Flashing device: {0} - {2} baud\r\nFirmware: {1}".format(device, firmware, baudrate)
print("Flashing device: {0} - {2} baud\r\nFirmware: {1}".format(device, firmware, baudrate))

dev = None
try:


+ 1
- 2
examples/alarm_email.py View File

@@ -38,9 +38,8 @@ def handle_alarm(sender, **kwargs):
"""
Handles alarm events from the AlarmDecoder.
"""
status = kwargs.pop('status', None)
zone = kwargs.pop('zone', None)
text = "Alarm status: {0} - Zone {1}".format(status, zone)
text = "Alarm: Zone {0}".format(zone)

# Build the email message
msg = MIMEText(text)


+ 1
- 0
requirements.txt View File

@@ -1 +1,2 @@
future==0.14.3
pyserial==2.7

+ 7
- 2
setup.py View File

@@ -1,5 +1,6 @@
"""Setup script"""

import sys
from setuptools import setup

def readme():
@@ -8,6 +9,10 @@ def readme():
with open('README.rst') as readme_file:
return readme_file.read()

extra_requirements = []
if sys.version_info < (3,):
extra_requirements.append('future==0.14.3')

setup(name='alarmdecoder',
version='0.10.3',
description='Python interface for the AlarmDecoder (AD2) family '
@@ -30,8 +35,8 @@ setup(name='alarmdecoder',
license='MIT',
packages=['alarmdecoder', 'alarmdecoder.event'],
install_requires=[
'pyserial>=2.7',
],
'pyserial==2.7',
] + extra_requirements,
test_suite='nose.collector',
tests_require=['nose', 'mock'],
scripts=['bin/ad2-sslterm', 'bin/ad2-firmwareupload'],


+ 81
- 79
test/test_ad2.py View File

@@ -1,5 +1,7 @@
import time

from builtins import bytes

from unittest import TestCase
from mock import Mock, MagicMock, patch

@@ -133,11 +135,11 @@ class TestAlarmDecoder(TestCase):

def test_send(self):
self._decoder.send('test')
self._device.write.assert_called_with('test')
self._device.write.assert_called_with(b'test')

def test_get_config(self):
self._decoder.get_config()
self._device.write.assert_called_with("C\r")
self._device.write.assert_called_with(b"C\r")

def test_save_config(self):
self._decoder.save_config()
@@ -145,171 +147,171 @@ class TestAlarmDecoder(TestCase):

def test_reboot(self):
self._decoder.reboot()
self._device.write.assert_called_with('=')
self._device.write.assert_called_with(b'=')

def test_fault(self):
self._decoder.fault_zone(1)
self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 1))
self._device.write.assert_called_with(bytes("L{0:02}{1}\r".format(1, 1), 'utf-8'))

def test_fault_wireproblem(self):
self._decoder.fault_zone(1, simulate_wire_problem=True)
self._device.write.assert_called_with("L{0:02}{1}\r".format(1, 2))
self._device.write.assert_called_with(bytes("L{0:02}{1}\r".format(1, 2), 'utf-8'))

def test_clear_zone(self):
self._decoder.clear_zone(1)
self._device.write.assert_called_with("L{0:02}0\r".format(1))
self._device.write.assert_called_with(bytes("L{0:02}0\r".format(1), 'utf-8'))

def test_message(self):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertIsInstance(msg, Message)

self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self._decoder._on_read(self, data=b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._message_received)

def test_message_kpm(self):
msg = self._decoder._handle_message('!KPM:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
msg = self._decoder._handle_message(b'!KPM:[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertIsInstance(msg, Message)

self._decoder._on_read(self, data='[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self._decoder._on_read(self, data=b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertTrue(self._message_received)

def test_expander_message(self):
msg = self._decoder._handle_message('!EXP:07,01,01')
msg = self._decoder._handle_message(b'!EXP:07,01,01')
self.assertIsInstance(msg, ExpanderMessage)

self._decoder._on_read(self, data='!EXP:07,01,01')
self._decoder._on_read(self, data=b'!EXP:07,01,01')
self.assertTrue(self._expander_message_received)

def test_relay_message(self):
self._decoder.open()
msg = self._decoder._handle_message('!REL:12,01,01')
msg = self._decoder._handle_message(b'!REL:12,01,01')
self.assertIsInstance(msg, ExpanderMessage)
self.assertEqual(self._relay_changed, True)

def test_rfx_message(self):
msg = self._decoder._handle_message('!RFX:0180036,80')
msg = self._decoder._handle_message(b'!RFX:0180036,80')
self.assertIsInstance(msg, RFMessage)
self.assertTrue(self._rfx_message_received)

def test_panic(self):
self._decoder.open()

msg = self._decoder._handle_message('!LRR:012,1,ALARM_PANIC')
self.assertEqual(self._panicked, True)
msg = self._decoder._handle_message(b'!LRR:012,1,ALARM_PANIC')
self.assertEquals(self._panicked, True)

msg = self._decoder._handle_message('!LRR:012,1,CANCEL')
self.assertEqual(self._panicked, False)
msg = self._decoder._handle_message(b'!LRR:012,1,CANCEL')
self.assertEquals(self._panicked, False)
self.assertIsInstance(msg, LRRMessage)

def test_config_message(self):
self._decoder.open()

msg = self._decoder._handle_message('!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
self.assertEqual(self._decoder.address, 18)
self.assertEqual(self._decoder.configbits, int('ff00', 16))
self.assertEqual(self._decoder.address_mask, int('ffffffff', 16))
self.assertEqual(self._decoder.emulate_zone, [False for x in range(5)])
self.assertEqual(self._decoder.emulate_relay, [False for x in range(4)])
self.assertEqual(self._decoder.emulate_lrr, False)
self.assertEqual(self._decoder.deduplicate, False)

msg = self._decoder._handle_message(b'!CONFIG>ADDRESS=18&CONFIGBITS=ff00&LRR=N&EXP=NNNNN&REL=NNNN&MASK=ffffffff&DEDUPLICATE=N')
self.assertEquals(self._decoder.address, 18)
self.assertEquals(self._decoder.configbits, int('ff00', 16))
self.assertEquals(self._decoder.address_mask, int('ffffffff', 16))
self.assertEquals(self._decoder.emulate_zone, [False for x in range(5)])
self.assertEquals(self._decoder.emulate_relay, [False for x in range(4)])
self.assertEquals(self._decoder.emulate_lrr, False)
self.assertEquals(self._decoder.deduplicate, False)
self.assertEqual(self._got_config, True)

def test_power_changed_event(self):
msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._power_changed, False) # Not set first time we hit it.
msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._power_changed, False)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, False)

msg = self._decoder._handle_message('[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._power_changed, True)
msg = self._decoder._handle_message(b'[0000000100000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._power_changed, True)

def test_alarm_event(self):
msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._alarmed, False) # Not set first time we hit it.
msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._alarmed, False)
self.assertEqual(self._alarm_restored, True)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, False)
self.assertEquals(self._alarm_restored, True)

msg = self._decoder._handle_message('[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._alarmed, True)
msg = self._decoder._handle_message(b'[0000000000100000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._alarmed, True)

def test_zone_bypassed_event(self):
msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._bypassed, False) # Not set first time we hit it.
msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._bypassed, False)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, False)

msg = self._decoder._handle_message('[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._bypassed, True)
msg = self._decoder._handle_message(b'[0000001000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._bypassed, True)

def test_armed_away_event(self):
msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._armed, False) # Not set first time we hit it.
msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._armed, False)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._decoder._handle_message('[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._armed, True)
msg = self._decoder._handle_message(b'[0100000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)

self._armed = False

msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._armed, False) # Not set first time we hit it.
msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False) # Not set first time we hit it.

msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._armed, False)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, False)

msg = self._decoder._handle_message('[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._armed, True)
msg = self._decoder._handle_message(b'[0010000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._armed, True)

def test_battery_low_event(self):
msg = self._decoder._handle_message('[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._battery, True)
msg = self._decoder._handle_message(b'[0000000000010000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._battery, False)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._battery, False)

def test_fire_alarm_event(self):
msg = self._decoder._handle_message('[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._fire, True)
msg = self._decoder._handle_message(b'[0000000000000100----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, True)

# force the timeout to expire.
with patch.object(time, 'time', return_value=self._decoder._battery_status[1] + 35):
msg = self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEqual(self._fire, False)
msg = self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000]," "')
self.assertEquals(self._fire, False)

def test_hit_for_faults(self):
self._decoder._handle_message('[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')
self._decoder._handle_message(b'[0000000000000000----],000,[f707000600e5800c0c020000],"Hit * for faults "')

self._decoder._device.write.assert_called_with('*')
self._decoder._device.write.assert_called_with(b'*')

def test_sending_received(self):
self._decoder._on_read(self, data='!Sending.done')
self._decoder._on_read(self, data=b'!Sending.done')
self.assertTrue(self._sending_received_status)

self._decoder._on_read(self, data='!Sending.....done')
self._decoder._on_read(self, data=b'!Sending.....done')
self.assertFalse(self._sending_received_status)

def test_boot(self):
self._decoder._on_read(self, data='!Ready')
self._decoder._on_read(self, data=b'!Ready')
self.assertTrue(self._on_boot_received)

def test_zone_fault_and_restore(self):
self._decoder._on_read(self, data='[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "')
self.assertEqual(self._zone_faulted, 3)
self._decoder._on_read(self, data=b'[00010001000000000A--],003,[f70000051003000008020000000000],"FAULT 03 "')
self.assertEquals(self._zone_faulted, 3)

self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_faulted, 4)

self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEqual(self._zone_faulted, 4)
self._decoder._on_read(self, data=b'[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "')
self.assertEquals(self._zone_faulted, 5)

self._decoder._on_read(self, data='[00010001000000000A--],005,[f70000051003000008020000000000],"FAULT 05 "')
self.assertEqual(self._zone_faulted, 5)
self._decoder._on_read(self, data=b'[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEquals(self._zone_restored, 3)

self._decoder._on_read(self, data='[00010001000000000A--],004,[f70000051003000008020000000000],"FAULT 04 "')
self.assertEqual(self._zone_restored, 3)

+ 197
- 53
test/test_devices.py View File

@@ -1,14 +1,16 @@
from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException
from pyftdi.ftdi import Ftdi, FtdiError
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
except:
from pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice
import socket
import time
import tempfile
import os
import select
from OpenSSL import SSL, crypto
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError

@@ -118,52 +120,30 @@ class TestUSBDevice(TestCase):

mock.assert_called_with('test')

def test_write_exception(self):
with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
with self.assertRaises(CommError):
self._device.write('test')

def test_read(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'read_data') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read()

with self.assertRaises(CommError):
self._device.read()
from unittest import TestCase
from mock import Mock, MagicMock, patch
from serial import Serial, SerialException

def test_read_line(self):
with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass
from alarmdecoder.devices import USBDevice, SerialDevice, SocketDevice
from alarmdecoder.util import NoDeviceError, CommError, TimeoutError

self.assertEqual(ret, "testing")
# Optional FTDI tests
try:
from pyftdi.pyftdi.ftdi import Ftdi, FtdiError
from usb.core import USBError, Device as USBCoreDevice

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read_data', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)
have_pyftdi = True

self.assertIn('a', self._device._buffer)
except ImportError:
have_pyftdi = False

def test_read_line_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read_line()
# Optional SSL tests
try:
from OpenSSL import SSL, crypto

with self.assertRaises(CommError):
self._device.read_line()
have_openssl = True
except ImportError:
have_openssl = False


class TestSerialDevice(TestCase):
@@ -204,14 +184,14 @@ class TestSerialDevice(TestCase):
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write') as mock:
self._device.write('test')
self._device.write(b'test')

mock.assert_called_with('test')
mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'write', side_effect=SerialException):
with self.assertRaises(CommError):
self._device.write('test')
self._device.write(b'test')

def test_read(self):
self._device.interface = '/dev/ttyS0'
@@ -235,14 +215,14 @@ class TestSerialDevice(TestCase):
except StopIteration:
pass

self.assertEqual(ret, "testing")
self.assertEquals(ret, b"testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)
self.assertIn('a', self._device._buffer.decode('utf-8'))

def test_read_line_exception(self):
with patch.object(self._device._device, 'read', side_effect=[OSError, SerialException]):
@@ -279,14 +259,18 @@ class TestSocketDevice(TestCase):
self._device.open(no_reader_thread=True)

with patch.object(socket.socket, 'send') as mock:
self._device.write('test')
self._device.write(b'test')

mock.assert_called_with('test')
mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'send', side_effect=[SSL.Error, socket.error]):
side_effects = [socket.error]
if (have_openssl):
side_effects.append(SSL.Error)

with patch.object(self._device._device, 'send', side_effect=side_effects):
with self.assertRaises(CommError):
self._device.write('test')
self._device.write(b'test')

def test_read(self):
with patch.object(socket.socket, '__init__', return_value=None):
@@ -317,7 +301,7 @@ class TestSocketDevice(TestCase):
except StopIteration:
pass

self.assertEqual(ret, "testing")
self.assertEquals(ret, b"testing")

def test_read_line_timeout(self):
with patch('socket.socket.fileno', return_value=1):
@@ -326,7 +310,7 @@ class TestSocketDevice(TestCase):
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)
self.assertIn('a', self._device._buffer.decode('utf-8'))

def test_read_line_exception(self):
with patch('socket.socket.fileno', return_value=1):
@@ -339,6 +323,9 @@ class TestSocketDevice(TestCase):
self._device.read_line()

def test_ssl(self):
if not have_openssl:
return

ssl_key = crypto.PKey()
ssl_key.generate_key(crypto.TYPE_RSA, 2048)
ssl_cert = crypto.X509()
@@ -372,6 +359,9 @@ class TestSocketDevice(TestCase):
self.assertIsInstance(self._device._device, SSL.Connection)

def test_ssl_exception(self):
if not have_openssl:
return

self._device.ssl = True
self._device.ssl_key = 'None'
self._device.ssl_certificate = 'None'
@@ -392,3 +382,157 @@ class TestSocketDevice(TestCase):

os.close(fileno)
os.unlink(path)


if have_pyftdi:
class TestUSBDevice(TestCase):
def setUp(self):
self._device = USBDevice()
self._device._device = Mock(spec=Ftdi)
self._device._device.usb_dev = Mock(spec=USBCoreDevice)
self._device._device.usb_dev.bus = 0
self._device._device.usb_dev.address = 0

self._attached = False
self._detached = False

def tearDown(self):
self._device.close()

def attached_event(self, sender, *args, **kwargs):
self._attached = True

def detached_event(self, sender, *args, **kwargs):
self._detached = True

def test_find_default_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2', 1, 'AD2')]):
device = USBDevice.find()

self.assertEquals(device.interface, 'AD2')

def test_find_with_param(self):
with patch.object(Ftdi, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
device = USBDevice.find((0, 0, 'AD2-1', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-1')

device = USBDevice.find((0, 0, 'AD2-2', 1, 'AD2'))
self.assertEquals(device.interface, 'AD2-2')

def test_events(self):
self.assertEquals(self._attached, False)
self.assertEquals(self._detached, False)

# this is ugly, but it works.
with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-1', 1, 'AD2'), (0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.start_detection(on_attached=self.attached_event, on_detached=self.detached_event)

with patch.object(USBDevice, 'find_all', return_value=[(0, 0, 'AD2-2', 1, 'AD2')]):
USBDevice.find_all()
time.sleep(1)
USBDevice.stop_detection()

self.assertEquals(self._attached, True)
self.assertEquals(self._detached, True)

def test_find_all(self):
with patch.object(USBDevice, 'find_all', return_value=[]) as mock:
devices = USBDevice.find_all()

self.assertEquals(devices, [])

def test_find_all_exception(self):
with patch.object(Ftdi, 'find_all', side_effect=[USBError('testing'), FtdiError]) as mock:
with self.assertRaises(CommError):
devices = USBDevice.find_all()

with self.assertRaises(CommError):
devices = USBDevice.find_all()

def test_interface_serial_number(self):
self._device.interface = 'AD2USB'

self.assertEquals(self._device.interface, 'AD2USB')
self.assertEquals(self._device.serial_number, 'AD2USB')
self.assertEquals(self._device._device_number, 0)

def test_interface_index(self):
self._device.interface = 1

self.assertEquals(self._device.interface, 1)
self.assertEquals(self._device.serial_number, None)
self.assertEquals(self._device._device_number, 1)

def test_open(self):
self._device.interface = 'AD2USB'

with patch.object(self._device._device, 'open') as mock:
self._device.open(no_reader_thread=True)

mock.assert_any_calls()

def test_open_failed(self):
self._device.interface = 'AD2USB'

with patch.object(self._device._device, 'open', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

with self.assertRaises(NoDeviceError):
self._device.open(no_reader_thread=True)

def test_write(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'write_data') as mock:
self._device.write(b'test')

mock.assert_called_with(b'test')

def test_write_exception(self):
with patch.object(self._device._device, 'write_data', side_effect=FtdiError):
with self.assertRaises(CommError):
self._device.write(b'test')

def test_read(self):
self._device.interface = 'AD2USB'
self._device.open(no_reader_thread=True)

with patch.object(self._device._device, 'read_data') as mock:
self._device.read()

mock.assert_called_with(1)

def test_read_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read()

with self.assertRaises(CommError):
self._device.read()

def test_read_line(self):
with patch.object(self._device._device, 'read_data', side_effect=list("testing\r\n")):
ret = None
try:
ret = self._device.read_line()
except StopIteration:
pass

self.assertEquals(ret, b"testing")

def test_read_line_timeout(self):
with patch.object(self._device._device, 'read_data', return_value='a') as mock:
with self.assertRaises(TimeoutError):
self._device.read_line(timeout=0.1)

self.assertIn('a', self._device._buffer)

def test_read_line_exception(self):
with patch.object(self._device._device, 'read_data', side_effect=[USBError('testing'), FtdiError]):
with self.assertRaises(CommError):
self._device.read_line()

with self.assertRaises(CommError):
self._device.read_line()

Loading…
Cancel
Save