#!/usr/bin/env python # # Copyright 2008-2010 John-Mark Gurney. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $Id: //depot/python/misc/hdhomerun.py#2 $ # # # Last tested with libhdhomerun_20100828. # from ctypes import * import struct import sys __all__ = [ 'BUFFERSIZE', 'HDHOMERUN_DEVICE_ID_WILDCARD', 'VIDEO_DATA_PACKET_SIZE', 'HDHomeRun', ] HDHOMERUN_DEVICE_ID_WILDCARD = 0xFFFFFFFF VIDEO_DATA_PACKET_SIZE = 1316 BUFFERSIZE = VIDEO_DATA_PACKET_SIZE*118 # ~64ms per hdhomerun_device.h hdhr = cdll.LoadLibrary("libhdhomerun.so") assert sizeof(c_uint32) == 4 bool_t = c_int class hdhomerun_device_t(Structure): pass hdhomerun_device_t = POINTER(hdhomerun_device_t) class hdhomerun_channelscan_result_t(Structure): pass hdhomerun_channelscan_result_t = POINTER(hdhomerun_channelscan_result_t) class hdhomerun_debug_t(Structure): pass hdhomerun_debug_t = POINTER(hdhomerun_debug_t) class hdhomerun_video_stats_t(Structure): _fields_ = [ ('packet_count', c_uint32 ), ('network_error_count', c_uint32 ), ('transport_error_count', c_uint32 ), ('sequence_error_count', c_uint32 ), ('overflow_error_count', c_uint32 ), ] def __repr__(self): return '' % ', '.join(map( lambda x: '%s: %s' % (x[0], `getattr(self, x[0])`), hdhomerun_tuner_status_t._fields_)) class hdhomerun_tuner_status_t(Structure): _fields_ = [ ("channel", c_char * 32 ), ("lock_str", c_char * 32 ), ("signal_presnet", bool_t ), ("lock_supported", bool_t ), ("lock_unsupported", bool_t ), ("signal_strength", c_uint ), ("signal_to_noise_quality", c_uint ), ("symbol_error_quality", c_uint ), ("raw_bits_per_second", c_uint32 ), ("packets_per_second", c_uint32 ), ] def __repr__(self): return '' % ', '.join(map( lambda x: '%s: %s' % (x[0], `getattr(self, x[0])`), hdhomerun_tuner_status_t._fields_)) hdhr.hdhomerun_device_create.argtypes = [ c_uint32, c_uint32, c_uint, hdhomerun_debug_t ] hdhr.hdhomerun_device_create.restype = hdhomerun_device_t hdhr.hdhomerun_device_create_from_str.argtypes = [ c_char_p, hdhomerun_debug_t ] hdhr.hdhomerun_device_create_from_str.restype = hdhomerun_device_t hdhr.hdhomerun_device_destroy.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_destroy.restype = None hdhr.hdhomerun_device_get_device_id.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_device_id.restype = c_uint32 hdhr.hdhomerun_device_get_device_ip.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_device_ip.restype = c_uint32 hdhr.hdhomerun_device_get_device_id_requested.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_device_id_requested.restype = c_uint32 hdhr.hdhomerun_device_get_device_ip_requested.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_device_ip_requested.restype = c_uint32 hdhr.hdhomerun_device_get_tuner.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_tuner.restype = c_uint hdhr.hdhomerun_device_set_device.argtypes = [ hdhomerun_device_t, c_int32, c_int32 ] hdhr.hdhomerun_device_set_device.restype = None hdhr.hdhomerun_device_set_tuner.argtypes = [ hdhomerun_device_t, c_uint ] hdhr.hdhomerun_device_set_tuner.restype = None hdhr.hdhomerun_device_set_tuner_from_str.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_tuner_from_str.restype = c_int hdhr.hdhomerun_device_get_local_machine_addr.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_local_machine_addr.restype = c_uint32 hdhr.hdhomerun_device_get_tuner_status.argtypes = [ hdhomerun_device_t, POINTER(c_char_p), POINTER(hdhomerun_tuner_status_t) ] hdhr.hdhomerun_device_get_tuner_status.restype = c_int hdhr.hdhomerun_device_get_tuner_streaminfo.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_tuner_streaminfo.restype = c_int hdhr.hdhomerun_device_get_tuner_channel.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_tuner_channel.restype = c_int hdhr.hdhomerun_device_get_tuner_channelmap.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_tuner_channelmap.restype = c_int hdhr.hdhomerun_device_get_tuner_filter.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_tuner_filter.restype = c_int hdhr.hdhomerun_device_get_tuner_program.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_tuner_program.restype = c_int hdhr.hdhomerun_device_get_tuner_target.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_tuner_target.restype = c_int # XXX - hdhomerun_device_get_tuner_plotsample hdhr.hdhomerun_device_get_ir_target.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_ir_target.restype = c_int hdhr.hdhomerun_device_get_lineup_location.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_lineup_location.restype = c_int hdhr.hdhomerun_device_get_version.argtypes = [ hdhomerun_device_t, POINTER(c_char_p) ] hdhr.hdhomerun_device_get_version.restype = c_int hdhr.hdhomerun_device_get_tuner_status_ss_color.argtypes = [ POINTER(hdhomerun_tuner_status_t) ] hdhr.hdhomerun_device_get_tuner_status_ss_color.restype = c_uint32 hdhr.hdhomerun_device_get_tuner_status_snq_color.argtypes = [ POINTER(hdhomerun_tuner_status_t) ] hdhr.hdhomerun_device_get_tuner_status_snq_color.restype = c_uint32 hdhr.hdhomerun_device_get_tuner_status_seq_color.argtypes = [ POINTER(hdhomerun_tuner_status_t) ] hdhr.hdhomerun_device_get_tuner_status_seq_color.restype = c_uint32 hdhr.hdhomerun_device_get_model_str.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_get_model_str.restype = c_char_p hdhr.hdhomerun_device_set_tuner_channel.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_tuner_channel.restype = c_int hdhr.hdhomerun_device_set_tuner_channelmap.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_tuner_channelmap.restype = c_int hdhr.hdhomerun_device_set_tuner_filter.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_tuner_filter.restype = c_int hdhr.hdhomerun_device_set_tuner_filter_by_array.argtypes = [ hdhomerun_device_t, c_ubyte * 0x2000 ] hdhr.hdhomerun_device_set_tuner_filter_by_array.restype = c_int hdhr.hdhomerun_device_set_tuner_program.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_tuner_program.restype = c_int hdhr.hdhomerun_device_set_tuner_target.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_tuner_target.restype = c_int #hdhr.hdhomerun_device_set_tuner_target_to_local_protocol.argtypes = [ # hdhomerun_device_t, c_char_p ] #hdhr.hdhomerun_device_set_tuner_target_to_local_protocol.restype = c_int #hdhr.hdhomerun_device_set_tuner_target_to_local.argtypes = [ # hdhomerun_device_t ] #hdhr.hdhomerun_device_set_tuner_target_to_local.restype = c_int hdhr.hdhomerun_device_set_ir_target.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_ir_target.restype = c_int hdhr.hdhomerun_device_set_lineup_location.argtypes = [ hdhomerun_device_t, c_char_p ] hdhr.hdhomerun_device_set_lineup_location.restype = c_int hdhr.hdhomerun_device_get_var.argtypes = [ hdhomerun_device_t, c_char_p, POINTER(c_char_p), POINTER(c_char_p) ] hdhr.hdhomerun_device_get_var.restype = c_int hdhr.hdhomerun_device_set_var.argtypes = [ hdhomerun_device_t, c_char_p, POINTER(c_char_p), POINTER(c_char_p), POINTER(c_char_p) ] hdhr.hdhomerun_device_set_var.restype = c_int hdhr.hdhomerun_device_wait_for_lock.argtypes = [ hdhomerun_device_t, POINTER(hdhomerun_tuner_status_t) ] hdhr.hdhomerun_device_wait_for_lock.restype = c_int hdhr.hdhomerun_device_stream_start.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_stream_start.restype = c_int #hdhr.hdhomerun_device_stream_refresh_target.argtypes = [ hdhomerun_device_t ] #hdhr.hdhomerun_device_stream_refresh_target.restype = c_int hdhr.hdhomerun_device_stream_recv.argtypes = [ hdhomerun_device_t, c_size_t, POINTER(c_size_t) ] hdhr.hdhomerun_device_stream_recv.restype = POINTER(c_char) hdhr.hdhomerun_device_stream_flush.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_stream_flush.restype = None hdhr.hdhomerun_device_stream_stop.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_stream_stop.restype = None hdhr.hdhomerun_device_channelscan_init.argtypes = [ hdhomerun_device_t, c_uint32, c_uint32 ] hdhr.hdhomerun_device_channelscan_init.restype = c_int hdhr.hdhomerun_device_channelscan_advance.argtypes = [ hdhomerun_device_t, hdhomerun_channelscan_result_t ] hdhr.hdhomerun_device_channelscan_advance.restype = c_int hdhr.hdhomerun_device_channelscan_detect.argtypes = [ hdhomerun_device_t, hdhomerun_channelscan_result_t ] hdhr.hdhomerun_device_channelscan_detect.restype = c_int hdhr.hdhomerun_device_channelscan_get_progress.argtypes = [ hdhomerun_device_t ] hdhr.hdhomerun_device_channelscan_get_progress.restype = c_ubyte #hdhr.hdhomerun_device_firmware_version_check.argtypes = [ hdhomerun_device_t, # c_uint32 ] #hdhr.hdhomerun_device_firmware_version_check.restype = c_int # XXX - should be able to pass in a file object #hdhr.hdhomerun_device_upgrade.argtypes = [ hdhomerun_device_t, # c_file_p ] #hdhr.hdhomerun_device_upgrade.restype = c_int hdhr.hdhomerun_device_get_video_stats.argtypes = [ hdhomerun_device_t, POINTER(hdhomerun_video_stats_t) ] hdhr.hdhomerun_device_get_video_stats.restype = None def printip(i): return '%d.%d.%d.%d' % (i >> 24, (i >> 16) & 0xff, (i >> 8) & 0xff, i & 0xff) def strfetch(fun, obj): s = c_char_p() r = fun(obj, pointer(s)) if r == 1: return s.value[:] elif r == 0: raise RuntimeError, 'operation rejected' elif r == -1: raise RuntimeError, 'communication error' else: assert False def ctypeStructtodict(s): d = {} for i in s._fields_: f = i[0] d[f] = getattr(s, f) return d class HDHomeRun: def __init__(self, dev=HDHOMERUN_DEVICE_ID_WILDCARD, ip=0, tuner=0): if isinstance(dev, basestring): self.obj = hdhr.hdhomerun_device_create_from_str(dev, None) else: self.obj = hdhr.hdhomerun_device_create(dev, ip, tuner, None) if self.id == 0: raise ValueError, 'unable to find device: %s' % dev self.buffer = [] def __del__(self): try: hdhr.hdhomerun_device_destroy(self.obj) except: pass def set_filter(self, filt): return hdhr.hdhomerun_device_set_tuner_filter(self.obj, filt) def set_tuner(self, t): if isinstance(t, basestring): hdhr.hdhomerun_device_set_tuner_from_str(self.obj, t) else: hdhr.hdhomerun_device_set_tuner(self.obj, t) id = property(lambda s: hdhr.hdhomerun_device_get_device_id(s.obj)) ip = property(lambda s: hdhr.hdhomerun_device_get_device_ip(s.obj)) tuner = property(lambda s: hdhr.hdhomerun_device_get_tuner(s.obj), set_tuner) local_addr = property(lambda s: hdhr.hdhomerun_device_get_local_machine_addr(s.obj)) def setchannel(self, ch): if not isinstance(ch, basestring): ch = str(ch) hdhr.hdhomerun_device_set_tuner_channel(self.obj, ch) def getstatus(self): status = hdhomerun_tuner_status_t() s = c_char_p() r = hdhr.hdhomerun_device_get_tuner_status(self.obj, pointer(s), pointer(status)) return (s.value[:], status) def getvideostats(self): vstats = hdhomerun_video_stats_t() hdhr.hdhomerun_device_get_video_stats(self.obj, pointer(vstats)) return vstats def istuned(self): return not not self.getstatus()[1].lock_supported def waitfortuned(self): status = hdhomerun_tuner_status_t() r = hdhr.hdhomerun_device_wait_for_lock(self.obj, pointer(status)) return not not status.lock_supported def start(self): hdhr.hdhomerun_device_stream_start(self.obj) def stop(self): hdhr.hdhomerun_device_stream_stop(self.obj) def getstats(self): sp = self.getstatus()[1] d = ctypeStructtodict(sp) d.update(ctypeStructtodict(self.getvideostats())) d['signal_strength_color'] = \ hdhr.hdhomerun_device_get_tuner_status_ss_color(sp) d['signal_to_noise_quality_color'] = \ hdhr.hdhomerun_device_get_tuner_status_snq_color(sp) d['symbol_error_quality_color'] = \ hdhr.hdhomerun_device_get_tuner_status_seq_color(sp) return d def getdata(self, bufsize=BUFFERSIZE): actual = c_size_t() data = hdhr.hdhomerun_device_stream_recv(self.obj, bufsize, pointer(actual)) return data[:actual.value] def __repr__(self, v=[ ('id', hex), ('ip', printip), ('tuner', lambda x: x) ]): return '' % ', '.join(map(lambda x: '%s: %s' % (x[0], x[1](getattr(self, x[0]))), v)) if __name__ == '__main__': import time i = HDHomeRun('ffffffff-1') if False: print `i` hdhr.hdhomerun_device_set_tuner_channelmap(i.obj, 'us-bcast') hdhr.hdhomerun_device_set_tuner_channel(i.obj, '12') status = hdhomerun_tuner_status_t() r = hdhr.hdhomerun_device_wait_for_lock(i.obj, pointer(status)) print `r` print `status` time.sleep(.5) print 'get_tuner_streaminfo:', `strfetch(hdhr.hdhomerun_device_get_tuner_streaminfo, i.obj)` print hdhr.hdhomerun_device_firmware_version_check(i.obj, 0) actual = c_size_t() i.start() time.sleep(.1) data = i.getdata() print len(data) else: i.setchannel('12') i.waitfortuned() print i.istuned() if i.istuned(): i.start() time.sleep(.1) data = i.getdata() print len(data) print `i.getstats()`