From ab62967151efa5e1f3dd6af405999f1c4be3f120 Mon Sep 17 00:00:00 2001 From: Scott Petersen Date: Thu, 5 Dec 2013 15:42:45 -0800 Subject: [PATCH] Added examples. --- examples/alarm_email.py | 68 +++++++++++++++++++++++++ examples/basics.py | 37 ++++++++++++++ examples/detection.py | 83 +++++++++++++++++++++++++++++++ examples/rf_device.py | 49 ++++++++++++++++++ examples/serialport.py | 37 ++++++++++++++ examples/socket_example.py | 38 ++++++++++++++ examples/ssl_socket.py | 51 +++++++++++++++++++ examples/virtual_zone_expander.py | 73 +++++++++++++++++++++++++++ 8 files changed, 436 insertions(+) create mode 100644 examples/alarm_email.py create mode 100644 examples/basics.py create mode 100644 examples/detection.py create mode 100644 examples/rf_device.py create mode 100644 examples/serialport.py create mode 100644 examples/socket_example.py create mode 100644 examples/ssl_socket.py create mode 100644 examples/virtual_zone_expander.py diff --git a/examples/alarm_email.py b/examples/alarm_email.py new file mode 100644 index 0000000..87ee83a --- /dev/null +++ b/examples/alarm_email.py @@ -0,0 +1,68 @@ +import time +import smtplib +from email.mime.text import MIMEText +from pyad2 import AD2 +from pyad2.devices import USBDevice + +# Configuration values +SUBJECT = "Alarm Decoder - ALARM" +FROM_ADDRESS = "root@localhost" +TO_ADDRESS = "root@localhost" # NOTE: Sending an SMS is as easy as looking + # up the email address format for your provider. +SMTP_SERVER = "localhost" +SMTP_USERNAME = None +SMTP_PASSWORD = None + +def main(): + """ + Example application that sends an email when an alarm event is + detected. + """ + try: + # Retrieve the first USB device + device = AD2(USBDevice.find()) + + # Set up an event handler and open the device + device.on_alarm += handle_alarm + device.open() + + # Retrieve the device configuration + device.get_config() + + # Wait for events + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_alarm(sender, *args, **kwargs): + """ + Handles alarm events from the AD2. + """ + status = kwargs['status'] + text = "Alarm status: {0}".format(status) + + # Build the email message + msg = MIMEText(text) + msg['Subject'] = SUBJECT + msg['From'] = FROM_ADDRESS + msg['To'] = TO_ADDRESS + + s = smtplib.SMTP(SMTP_SERVER) + + # Authenticate if needed + if SMTP_USERNAME is not None: + s.login(SMTP_USERNAME, SMTP_PASSWORD) + + # Send the email + s.sendmail(FROM_ADDRESS, TO_ADDRESS, msg.as_string()) + s.quit() + + print 'sent alarm email:', text + +if __name__ == '__main__': + main() diff --git a/examples/basics.py b/examples/basics.py new file mode 100644 index 0000000..3ce9073 --- /dev/null +++ b/examples/basics.py @@ -0,0 +1,37 @@ +import time +from pyad2 import AD2 +from pyad2.devices import USBDevice + +def main(): + """ + Example application that prints messages from the panel to the terminal. + """ + try: + # Retrieve the first USB device + device = AD2(USBDevice.find()) + + # Set up an event handler and open the device + device.on_message += handle_message + device.open() + device.get_config() + + # Wait for events. + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_message(sender, *args, **kwargs): + """ + Handles message events from the AD2. + """ + msg = kwargs['message'] + + print sender, msg.raw + +if __name__ == '__main__': + main() diff --git a/examples/detection.py b/examples/detection.py new file mode 100644 index 0000000..1805dd7 --- /dev/null +++ b/examples/detection.py @@ -0,0 +1,83 @@ +import time +from pyad2 import AD2 +from pyad2.devices import USBDevice + +__devices = {} + +def main(): + """ + Example application that shows how to handle attach/detach events generated + by the USB devices. + + In this case we open the device and listen for messages when it is attached. + And when it is detached we remove it from our list of monitored devices. + """ + try: + # Start up the detection thread such that handle_attached and handle_detached will + # be called when devices are attached and detached, respectively. + USBDevice.start_detection(on_attached=handle_attached, on_detached=handle_detached) + + # Wait for events. + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + # Close all devices and stop detection. + for sn, device in __devices.iteritems(): + device.close() + + USBDevice.stop_detection() + +def create_device(device_args): + """ + Creates an AD2 from the specified USB device arguments. + + :param device_args: Tuple containing information on the USB device to open. + :type device_args: Tuple (vid, pid, serialnumber, interface_count, description) + """ + device = AD2(USBDevice.find(device_args)) + device.on_message += handle_message + device.open() + + return device + +def handle_message(sender, *args, **kwargs): + """ + Handles message events from the AD2. + """ + msg = kwargs['message'] + + print sender, msg.raw + +def handle_attached(sender, *args, **kwargs): + """ + Handles attached events from USBDevice.start_detection(). + """ + device_args = kwargs['device'] + + # Create the device from the specified device arguments. + device = create_device(device_args) + __devices[device.id] = device + + print 'attached', device.id + +def handle_detached(sender, *args, **kwargs): + """ + Handles detached events from USBDevice.start_detection(). + """ + device = kwargs['device'] + vendor, product, sernum, ifcount, description = device + + # Close and remove the device from our list. + if sernum in __devices.keys(): + __devices[sernum].close() + + del __devices[sernum] + + print 'detached', sernum + +if __name__ == '__main__': + main() diff --git a/examples/rf_device.py b/examples/rf_device.py new file mode 100644 index 0000000..5984c24 --- /dev/null +++ b/examples/rf_device.py @@ -0,0 +1,49 @@ +import time +from pyad2 import AD2 +from pyad2.devices import USBDevice + +RF_DEVICE_SERIAL_NUMBER = '0252254' + +def main(): + """ + Example application that watches for an event from a specific RF device. + + This feature that allows you to watch for events from RF devices if you + have an RF receiver. This is useful in the case of internal sensors, + which don't emit a FAULT if the sensor is tripped and the panel is armed + STAY. It also will monitor sensors that aren't configured. + + NOTE: You must have an RF receiver installed and enabled in your panel + for RFX messages to be seen. + """ + try: + # Retrieve the first USB device + device = AD2(USBDevice.find()) + + # Set up an event handler and open the device + device.on_rfx_message += handle_rfx + device.open() + device.get_config() + + # Wait for events. + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_rfx(sender, *args, **kwargs): + """ + Handles RF message events from the AD2. + """ + msg = kwargs['message'] + + # Check for our target serial number and loop + if msg.serial_number == RF_DEVICE_SERIAL_NUMBER and msg.loop[0] == True: + print msg.serial_number, 'triggered loop #1' + +if __name__ == '__main__': + main() diff --git a/examples/serialport.py b/examples/serialport.py new file mode 100644 index 0000000..cdc71eb --- /dev/null +++ b/examples/serialport.py @@ -0,0 +1,37 @@ +import time +from pyad2 import AD2 +from pyad2.devices import SerialDevice + +def main(): + """ + Example application that opens a serial device and prints messages to the terminal. + """ + try: + # Retrieve the specified serial device. + device = AD2(SerialDevice(interface='/dev/ttyUSB0')) + + # Set up an event handler and open the device + device.on_message += handle_message + device.open(baudrate=115200) # Override the default SerialDevice baudrate. + device.get_config() + + # Wait for events. + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_message(sender, *args, **kwargs): + """ + Handles message events from the AD2. + """ + msg = kwargs['message'] + + print sender, msg.raw + +if __name__ == '__main__': + main() diff --git a/examples/socket_example.py b/examples/socket_example.py new file mode 100644 index 0000000..88509ef --- /dev/null +++ b/examples/socket_example.py @@ -0,0 +1,38 @@ +import time +from pyad2 import AD2 +from pyad2.devices import SocketDevice + +def main(): + """ + Example application that opens a device that has been exposed to the network + with ser2sock or similar serial->ip software. + """ + try: + # Retrieve an AD2 device that has been exposed with ser2sock on localhost:10000. + device = AD2(SocketDevice(interface=('localhost', 10000))) + + # Set up an event handler and open the device + device.on_message += handle_message + device.open() + device.get_config() + + # Wait for events. + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_message(sender, *args, **kwargs): + """ + Handles message events from the AD2. + """ + msg = kwargs['message'] + + print sender, msg.raw + +if __name__ == '__main__': + main() diff --git a/examples/ssl_socket.py b/examples/ssl_socket.py new file mode 100644 index 0000000..26625a2 --- /dev/null +++ b/examples/ssl_socket.py @@ -0,0 +1,51 @@ +import time +from pyad2 import AD2 +from pyad2.devices import SocketDevice + +def main(): + """ + Example application that opens a device that has been exposed to the network + with ser2sock and SSL encryption and authentication. + """ + try: + # Retrieve an AD2 device that has been exposed with ser2sock on localhost:10000. + ssl_device = SocketDevice(interface=('localhost', 10000)) + + # Enable SSL and set the certificates to be used. + # + # The key/cert attributes can either be a filesystem path or an X509/PKey + # object from pyopenssl. + ssl_device.ssl = True + ssl_device.ssl_key = 'cert.key' # Client private key + ssl_device.ssl_certificate = 'cert.pem' # Client certificate + ssl_device.ssl_ca = 'ca.pem' # CA certificate + + device = AD2(ssl_device) + + # Set up an event handler and open the device + device.on_message += handle_message + device.open() + + time.sleep(1) # Allow time for SSL handshake to complete. + device.get_config() + + # Wait for events. + while True: + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_message(sender, *args, **kwargs): + """ + Handles message events from the AD2. + """ + msg = kwargs['message'] + + print sender, msg.raw + +if __name__ == '__main__': + main() diff --git a/examples/virtual_zone_expander.py b/examples/virtual_zone_expander.py new file mode 100644 index 0000000..304e3b9 --- /dev/null +++ b/examples/virtual_zone_expander.py @@ -0,0 +1,73 @@ +import time +from pyad2 import AD2 +from pyad2.devices import USBDevice + +def main(): + """ + Example application that periodically faults a virtual zone and then + restores it. + + This is an advanced feature that allows you to emulate a virtual zone. When + the AD2 is configured to emulate a relay expander we can fault and restore + those zones programmatically at will. These events can also be seen by others, + such as home automation platforms which allows you to connect other devices or + services and monitor them as you would any pyhysical zone. + + For example, you could connect a ZigBee device and receiver and fault or + restore it's zone(s) based on the data received. + + In order for this to happen you need to perform a couple configuration steps: + + 1. Enable zone expander emulation on your AD2 device by hitting '!' in a + terminal and going through the prompts. + 2. Enable the zone expander in your panel programming. + """ + try: + # Retrieve the first USB device + device = AD2(USBDevice.find()) + + # Set up an event handlers and open the device + device.on_zone_fault += handle_zone_fault + device.on_zone_restore += handle_zone_restore + + device.open() + device.get_config() + + # Wait for events. + last_update = time.time() + while True: + if time.time() - last_update > 10: + # Fault zone 41 every 10 seconds. + device.fault_zone(41) + + last_update = time.time() + + time.sleep(1) + + except Exception, ex: + print 'Exception:', ex + + finally: + device.close() + +def handle_zone_fault(sender, *args, **kwargs): + """ + Handles zone fault messages. + """ + zone = kwargs['zone'] + + print 'zone faulted', zone + + # Restore the zone + sender.clear_zone(zone) + +def handle_zone_restore(sender, *args, **kwargs): + """ + Handles zone restore messages. + """ + zone = kwargs['zone'] + + print 'zone cleared', zone + +if __name__ == '__main__': + main()