Yet Another Denon Python Module
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

744 lines
17 KiB

  1. #!/usr/bin/env python
  2. __author__ = 'John-Mark Gurney'
  3. __copyright__ = 'Copyright 2017 John-Mark Gurney. All rights reserved.'
  4. __license__ = '2-clause BSD license'
  5. # Copyright 2017, John-Mark Gurney
  6. # All rights reserved.
  7. #
  8. # Redistribution and use in source and binary forms, with or without
  9. # modification, are permitted provided that the following conditions are met:
  10. #
  11. # 1. Redistributions of source code must retain the above copyright notice, this
  12. # list of conditions and the following disclaimer.
  13. # 2. Redistributions in binary form must reproduce the above copyright notice,
  14. # this list of conditions and the following disclaimer in the documentation
  15. # and/or other materials provided with the distribution.
  16. #
  17. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  18. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  19. # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  20. # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  21. # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  22. # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  23. # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  24. # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  26. # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  27. #
  28. # The views and conclusions contained in the software and documentation are those
  29. # of the authors and should not be interpreted as representing official policies,
  30. # either expressed or implied, of the Project.
  31. from twisted.internet import reactor
  32. from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
  33. from twisted.protocols import basic
  34. from twisted.test import proto_helpers
  35. from twisted.trial import unittest
  36. import mock
  37. import time
  38. import twisted.internet.serialport
  39. __all__ = [ 'DenonAVR' ]
  40. class DenonAVR(object,basic.LineReceiver):
  41. '''A Twisted Protocol Handler for Denon Receivers. This is not yet
  42. complete, but has basic functionally, and more will be added as
  43. needed.'''
  44. delimiter = '\r' # line delimiter is the CR
  45. timeOut = 1
  46. def __init__(self, serdev):
  47. '''Specify the serial device connected to the Denon AVR.'''
  48. # XXX - is this needed?
  49. self._ser = twisted.internet.serialport.SerialPort(self, serdev, reactor, baudrate=9600)
  50. self._cmdswaiting = {}
  51. self._power = None
  52. self._vol = None
  53. self._mute = None
  54. self._volmax = None
  55. self._speakera = None
  56. self._speakerb = None
  57. self._diginput = None
  58. self._source = None
  59. self._z2mute = None
  60. self._input = None
  61. self._zm = None
  62. self._ms = None
  63. self._eventsfuns = []
  64. def register(self, fun):
  65. '''Register a callback for when an attribute gets
  66. modified or changed.
  67. As this is async, this is useful to get notifications of
  68. when you change an attribute and the value of the amp does
  69. change.'''
  70. self._eventsfuns.append(fun)
  71. def unregister(self, fun):
  72. '''Unregister a function that was previously registered with
  73. register.'''
  74. self._eventsfuns.remove(fun)
  75. def _notify(self, attr):
  76. for i in self._eventsfuns:
  77. # XXX - supress exceptions?
  78. i(attr)
  79. def _magic(cmd, attrname, settrans, args, doc):
  80. def getter(self):
  81. return getattr(self, attrname)
  82. def setter(self, arg):
  83. arg = settrans(arg)
  84. if arg != getattr(self, attrname):
  85. try:
  86. self._sendcmd(cmd, args[arg])
  87. except KeyError:
  88. raise ValueError(arg)
  89. return property(getter, setter, doc=doc)
  90. @property
  91. def ms(self):
  92. 'Surround mode'
  93. return self._ms
  94. power = _magic('PW', '_power', bool, { True: 'ON', False: 'STANDBY' }, 'Power status, True if on')
  95. input = _magic('SI', '_input', str, { x:x for x in ('PHONO', 'TUNER', 'CD', 'V.AUX', 'DVD', 'TV', 'SAT/CBL', 'DVR', ) }, 'Audio Input Source')
  96. source = _magic('SD', '_source', str, { x:x for x in ('AUTO', 'HDMI', 'DIGITAL', 'ANALOG', ) }, 'Source type, can be one of AUTO, HDMI, DIGITAL, or ANALOG')
  97. diginput = _magic('DC', '_diginput', str, { x:x for x in ('AUTO', 'PCM', 'DTS', ) }, 'Digital input mode, can be one of AUTO, PCM, or DTS')
  98. mute = _magic('MU', '_mute', bool, { True: 'ON', False: 'OFF' }, 'Mute speakers, True speakers are muted (no sound)')
  99. zm = _magic('ZM', '_zm', bool, { True: 'ON', False: 'OFF' }, 'Main Zone On, True if on')
  100. z2mute = _magic('Z2MU', '_z2mute', bool, { True: 'ON', False: 'OFF' }, 'Mute Zone 2 speakers, True speakers are muted (no sound)')
  101. @staticmethod
  102. def _makevolarg(arg):
  103. arg = int(arg)
  104. if arg < 0 or arg > 99:
  105. raise ValueError('Volume out of range.')
  106. arg -= 1
  107. arg %= 100
  108. return '%02d' % arg
  109. @staticmethod
  110. def _parsevolarg(arg):
  111. arg = int(arg)
  112. if arg < 0 or arg > 99:
  113. raise ValueError('Volume out of range.')
  114. arg += 1
  115. arg %= 100
  116. return arg
  117. @property
  118. def vol(self):
  119. 'Volumn, range 0 through 99'
  120. return self._vol
  121. @vol.setter
  122. def vol(self, arg):
  123. if arg == self._vol:
  124. return
  125. if self._volmax is not None and arg > self._volmax:
  126. raise ValueError('volume %d, exceeds max: %d' % (arg,
  127. self._volmax))
  128. arg = self._makevolarg(arg)
  129. self._sendcmd('MV', arg)
  130. def vol_up(self):
  131. self._sendcmd('MV', 'UP')
  132. def vol_down(self):
  133. self._sendcmd('MV', 'DOWN')
  134. @property
  135. def volmax(self):
  136. 'Maximum volume supported.'
  137. return self._volmax
  138. def proc_PW(self, arg):
  139. if arg == 'STANDBY':
  140. self._power = False
  141. elif arg == 'ON':
  142. self._power = True
  143. else:
  144. raise RuntimeError('unknown PW arg: %s' % `arg`)
  145. self._notify('power')
  146. def proc_MU(self, arg):
  147. if arg == 'ON':
  148. self._mute = True
  149. elif arg == 'OFF':
  150. self._mute = False
  151. else:
  152. raise RuntimeError('unknown MU arg: %s' % `arg`)
  153. self._notify('mute')
  154. def proc_ZM(self, arg):
  155. if arg == 'ON':
  156. self._zm = True
  157. elif arg == 'OFF':
  158. self._zm = False
  159. else:
  160. raise RuntimeError('unknown ZM arg: %s' % `arg`)
  161. self._notify('zm')
  162. def proc_MV(self, arg):
  163. if arg[:4] == 'MAX ':
  164. self._volmax = self._parsevolarg(arg[4:])
  165. self._notify('volmax')
  166. else:
  167. self._vol = self._parsevolarg(arg)
  168. self._notify('vol')
  169. def proc_MS(self, arg):
  170. self._ms = arg
  171. def proc_SI(self, arg):
  172. self._input = arg
  173. self._notify('input')
  174. def proc_SD(self, arg):
  175. self._source = arg
  176. self._notify('source')
  177. def proc_DC(self, arg):
  178. self._diginput = arg
  179. def proc_CV(self, arg):
  180. pass
  181. def proc_PS(self, arg):
  182. if arg == 'FRONT A':
  183. self._speakera = True
  184. self._speakerb = False
  185. else:
  186. raise RuntimeError('unknown PS arg: %s' % `arg`)
  187. def proc_Z2(self, arg):
  188. if arg == 'MUOFF':
  189. self._z2mute = False
  190. else:
  191. raise RuntimeError('unknown Z2 arg: %s' % `arg`)
  192. def _sendcmd(self, cmd, args):
  193. cmd = '%s%s' % (cmd, args)
  194. #print 'sendcmd:', `cmd`
  195. self.sendLine(cmd)
  196. def lineReceived(self, event):
  197. '''Process a line from the AVR. This is internal and will
  198. be called by LineReceiver.'''
  199. #print 'lR:', `event`
  200. if len(event) >= 2:
  201. fun = getattr(self, 'proc_%s' % event[:2])
  202. fun(event[2:])
  203. for d in self._cmdswaiting.pop(event[:2], []):
  204. d.callback(event)
  205. def _waitfor(self, resp):
  206. d = Deferred()
  207. cmd = resp[:2]
  208. self._cmdswaiting.setdefault(cmd, []).append(d)
  209. if len(resp) > 2:
  210. @inlineCallbacks
  211. def extraresp(d=d):
  212. while True:
  213. r = yield d
  214. if r.startswith(resp):
  215. returnValue(r)
  216. d = self._waitfor(cmd)
  217. d = extraresp()
  218. return d
  219. @inlineCallbacks
  220. def update(self):
  221. '''Update the status of the AVR. This ensures that the
  222. state of the object matches the amp. Returns a Deferred.
  223. When the deferred fires, then all the internal state has
  224. been updated and can be examined.'''
  225. d = self._waitfor('PW')
  226. self._sendcmd('PW', '?')
  227. d = yield d
  228. d = self._waitfor('MVMAX')
  229. self._sendcmd('MV', '?')
  230. d = yield d
  231. d = self._waitfor('SI')
  232. self._sendcmd('SI', '?')
  233. d = yield d
  234. class TestDenon(unittest.TestCase):
  235. TEST_DEV = '/dev/tty.usbserial-FTC8DHBJ'
  236. def test_comms(self): # pragma: no cover
  237. # comment out to make it easy to restore skip
  238. self.skipTest('perf')
  239. avr = DenonAVR(self.TEST_DEV)
  240. self.assertIsNone(avr.power)
  241. avr.update()
  242. self.assertIsNotNone(avr.power)
  243. self.assertIsNotNone(avr.vol)
  244. avr.power = False
  245. time.sleep(1)
  246. avr.power = True
  247. self.assertTrue(avr.power)
  248. print 'foostart'
  249. time.sleep(1)
  250. avr.update()
  251. time.sleep(1)
  252. avr.vol = 0
  253. self.assertEqual(avr.vol, 0)
  254. time.sleep(1)
  255. avr.vol = 5
  256. avr.update()
  257. self.assertEqual(avr.vol, 5)
  258. avr.vol = 50
  259. avr.update()
  260. self.assertEqual(avr.vol, 50)
  261. avr.power = False
  262. self.assertFalse(avr.power)
  263. self.assertIsNotNone(avr.volmax)
  264. class TestStaticMethods(unittest.TestCase):
  265. def test_makevolarg(self):
  266. self.assertRaises(ValueError, DenonAVR._makevolarg, -1)
  267. self.assertRaises(ValueError, DenonAVR._makevolarg, 3874)
  268. self.assertRaises(ValueError, DenonAVR._makevolarg, 100)
  269. self.assertEqual(DenonAVR._makevolarg(0), '99')
  270. self.assertEqual(DenonAVR._makevolarg(1), '00')
  271. self.assertEqual(DenonAVR._makevolarg(99), '98')
  272. def test_parsevolarg(self):
  273. self.assertEqual(DenonAVR._parsevolarg('99'), 0)
  274. self.assertEqual(DenonAVR._parsevolarg('00'), 1)
  275. self.assertEqual(DenonAVR._parsevolarg('98'), 99)
  276. self.assertRaises(ValueError, DenonAVR._parsevolarg, '-1')
  277. class TestMethods(unittest.TestCase):
  278. @mock.patch('twisted.internet.serialport.SerialPort')
  279. def setUp(self, sfu):
  280. self.avr = DenonAVR('null')
  281. self.tr = proto_helpers.StringTransport()
  282. self.avr.makeConnection(self.tr)
  283. @staticmethod
  284. def getTimeout():
  285. return .3
  286. @inlineCallbacks
  287. def test_update(self):
  288. avr = self.avr
  289. dfr = avr.update()
  290. # get the first stage
  291. self.assertEqual(self.tr.value(), 'PW?\r')
  292. avr.dataReceived('PWSTANDBY\r')
  293. avr.dataReceived('MV51\rMVMAX 80\r')
  294. avr.dataReceived('SIPHONO\r')
  295. d = yield dfr
  296. # get the second stage
  297. self.assertEqual(self.tr.value(), 'PW?\rMV?\rSI?\r')
  298. self.assertEqual(avr.power, False)
  299. self.assertIsNone(d)
  300. d = yield dfr
  301. self.assertEqual(self.tr.value(), 'PW?\rMV?\rSI?\r')
  302. self.assertEqual(avr.input, 'PHONO')
  303. self.assertIsNone(d)
  304. self.tr.clear()
  305. d = avr.update()
  306. self.assertEqual(self.tr.value(), 'PW?\r')
  307. avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
  308. avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
  309. avr.dataReceived('SIDVD\r')
  310. d = yield d
  311. self.assertEqual(self.tr.value(), 'PW?\rMV?\rSI?\r')
  312. self.assertEqual(avr.power, True)
  313. self.assertIsNone(d)
  314. self.assertEqual(avr.input, 'DVD')
  315. def test_realsequences(self):
  316. avr = self.avr
  317. avr.dataReceived('PSFRONT A\rSITUNER\rMSSTEREO\rSDANALOG\rDCAUTO\rCVFL 50\r')
  318. avr.dataReceived('PSFRONT A\rSIPHONO\rMSSTEREO\rSDANALOG\rDCAUTO\r')
  319. @inlineCallbacks
  320. def test_waitfor(self):
  321. avr = self.avr
  322. avr.proc_AB = lambda arg: None
  323. d = avr._waitfor('AB123')
  324. # make sure that matching, but different response doesn't trigger
  325. avr.dataReceived('ABABC\r')
  326. self.assertFalse(d.called)
  327. # make sure that it triggers
  328. avr.dataReceived('AB123\r')
  329. self.assertTrue(d.called)
  330. d = yield d
  331. # and we get correct response
  332. self.assertEqual(d, 'AB123')
  333. def test_register(self):
  334. avr = self.avr
  335. efun = mock.MagicMock()
  336. avr.register(efun)
  337. avr.proc_MV('41')
  338. efun.assert_called_once_with('vol')
  339. efun.reset_mock()
  340. avr.proc_MV('MAX 80')
  341. efun.assert_called_once_with('volmax')
  342. efun.reset_mock()
  343. avr.proc_PW('ON')
  344. efun.assert_called_once_with('power')
  345. efun.reset_mock()
  346. avr.proc_MU('ON')
  347. efun.assert_called_once_with('mute')
  348. efun.reset_mock()
  349. avr.proc_ZM('ON')
  350. efun.assert_called_once_with('zm')
  351. efun.reset_mock()
  352. avr.proc_SI('TUNER')
  353. efun.assert_called_once_with('input')
  354. efun.reset_mock()
  355. avr.proc_SD('ANALOG')
  356. efun.assert_called_once_with('source')
  357. efun.reset_mock()
  358. avr.unregister(efun)
  359. avr.proc_PW('ON')
  360. self.assertEqual(efun.call_count, 0)
  361. @inlineCallbacks
  362. def test_vol(self):
  363. avr = self.avr
  364. d = avr.update()
  365. self.assertEqual(self.tr.value(), 'PW?\r')
  366. avr.dataReceived('PWON\rZMON\rMUOFF\rZ2MUOFF\rMUOFF\rPSFRONT A\r')
  367. avr.dataReceived('MSDIRECT\rMSDIRECT\rMSDIRECT\rMV51\rMVMAX 80\r')
  368. avr.dataReceived('SIPHOTO\r')
  369. d = yield d
  370. self.tr.clear()
  371. avr.vol = 20
  372. self.assertEqual(self.tr.value(), 'MV19\r')
  373. def test_proc_events(self):
  374. avr = self.avr
  375. avr.dataReceived('PWON\r')
  376. self.assertEqual(avr.power, True)
  377. avr.dataReceived('MUON\r' + 'PWON\r')
  378. self.assertEqual(avr.mute, True)
  379. self.assertEqual(avr.power, True)
  380. avr.dataReceived('PWSTANDBY\r')
  381. self.assertEqual(avr.power, False)
  382. @mock.patch('yadenon.DenonAVR.sendLine')
  383. def test_proc_PW(self, sendline):
  384. avr = self.avr
  385. avr.proc_PW('STANDBY')
  386. self.assertEqual(avr.power, False)
  387. avr.proc_PW('ON')
  388. self.assertEqual(avr.power, True)
  389. self.assertRaises(RuntimeError, avr.proc_PW, 'foobar')
  390. avr.power = False
  391. sendline.assert_any_call('PWSTANDBY')
  392. def test_proc_MU(self):
  393. avr = self.avr
  394. avr.proc_MU('ON')
  395. self.assertEqual(avr.mute, True)
  396. avr.proc_MU('OFF')
  397. self.assertEqual(avr.mute, False)
  398. self.assertRaises(RuntimeError, avr.proc_MU, 'foobar')
  399. @mock.patch('yadenon.DenonAVR.sendLine')
  400. def test_mute(self, sendline):
  401. avr = self.avr
  402. avr.mute = True
  403. sendline.assert_any_call('MUON')
  404. # Verify the transition doesn't happen
  405. self.assertFalse(avr.mute)
  406. # till we get notification
  407. avr.proc_MU('ON')
  408. self.assertTrue(avr.mute)
  409. avr.mute = False
  410. sendline.assert_any_call('MUOFF')
  411. def test_proc_PS(self):
  412. avr = self.avr
  413. avr.proc_PS('FRONT A')
  414. self.assertEqual(avr._speakera, True)
  415. self.assertEqual(avr._speakerb, False)
  416. self.assertRaises(RuntimeError, avr.proc_PS, 'foobar')
  417. def test_proc_Z2(self):
  418. avr = self.avr
  419. avr.proc_Z2('MUOFF')
  420. self.assertEqual(avr.z2mute, False)
  421. self.assertRaises(RuntimeError, avr.proc_Z2, 'foobar')
  422. def test_proc_MS(self):
  423. avr = self.avr
  424. avr.proc_MS('STEREO')
  425. self.assertEqual(avr.ms, 'STEREO')
  426. def test_proc_ZM(self):
  427. avr = self.avr
  428. avr.proc_ZM('ON')
  429. self.assertEqual(avr._zm, True)
  430. avr.proc_ZM('OFF')
  431. self.assertEqual(avr._zm, False)
  432. self.assertRaises(RuntimeError, avr.proc_ZM, 'foobar')
  433. @mock.patch('yadenon.DenonAVR.sendLine')
  434. def test_zm(self, sendline):
  435. avr = self.avr
  436. avr.zm = True
  437. sendline.assert_any_call('ZMON')
  438. # Verify the transition doesn't happen
  439. self.assertFalse(avr.zm)
  440. # till we get notification
  441. avr.proc_ZM('ON')
  442. self.assertTrue(avr.zm)
  443. avr.zm = False
  444. sendline.assert_any_call('ZMOFF')
  445. def test_proc_MV(self):
  446. avr = self.avr
  447. avr.proc_MV('MAX 80')
  448. self.assertEqual(avr.volmax, 81)
  449. avr.proc_MV('99')
  450. self.assertEqual(avr.vol, 0)
  451. avr.vol = 0
  452. self.assertRaises(ValueError, setattr, avr, 'vol', 82)
  453. def test_proc_SI(self):
  454. avr = self.avr
  455. avr.proc_SI('PHONO')
  456. self.assertEqual(avr.input, 'PHONO')
  457. avr.proc_SI('TUNER')
  458. self.assertEqual(avr.input, 'TUNER')
  459. @mock.patch('yadenon.DenonAVR.sendLine')
  460. def test_input(self, sendline):
  461. avr = self.avr
  462. avr.input = 'PHONO'
  463. sendline.assert_any_call('SIPHONO')
  464. # Verify the transition doesn't happen
  465. self.assertIsNone(avr.input)
  466. # till we get notification
  467. avr.proc_SI('PHONO')
  468. self.assertEqual(avr.input, 'PHONO')
  469. avr.input = 'TUNER'
  470. sendline.assert_any_call('SITUNER')
  471. avr.input = 'CD'
  472. avr.input = 'V.AUX'
  473. avr.input = 'DVD'
  474. avr.input = 'TV'
  475. avr.input = 'SAT/CBL'
  476. avr.input = 'DVR'
  477. self.assertRaises(ValueError, setattr, avr, 'input', 'bogus')
  478. self.assertRaises(ValueError, setattr, avr, 'input', True)
  479. self.assertRaises(ValueError, setattr, avr, 'input', 34)
  480. @mock.patch('yadenon.DenonAVR.sendLine')
  481. def test_source(self, sendline):
  482. avr = self.avr
  483. avr.source = 'AUTO'
  484. sendline.assert_any_call('SDAUTO')
  485. # Verify the transition doesn't happen
  486. self.assertIsNone(avr.source)
  487. # till we get notification
  488. avr.proc_SD('AUTO')
  489. self.assertEqual(avr.source, 'AUTO')
  490. avr.source = 'HDMI'
  491. sendline.assert_any_call('SDHDMI')
  492. avr.source = 'DIGITAL'
  493. avr.source = 'ANALOG'
  494. self.assertRaises(ValueError, setattr, avr, 'source', 'bogus')
  495. self.assertRaises(ValueError, setattr, avr, 'source', True)
  496. self.assertRaises(ValueError, setattr, avr, 'source', 34)
  497. @mock.patch('yadenon.DenonAVR.sendLine')
  498. def test_diginput(self, sendline):
  499. avr = self.avr
  500. avr.diginput = 'AUTO'
  501. sendline.assert_any_call('DCAUTO')
  502. # Verify the transition doesn't happen
  503. self.assertIsNone(avr.diginput)
  504. # till we get notification
  505. avr.proc_DC('AUTO')
  506. self.assertEqual(avr.diginput, 'AUTO')
  507. avr.diginput = 'PCM'
  508. sendline.assert_any_call('DCPCM')
  509. avr.diginput = 'DTS'
  510. self.assertRaises(ValueError, setattr, avr, 'diginput', 'bogus')
  511. self.assertRaises(ValueError, setattr, avr, 'diginput', True)
  512. self.assertRaises(ValueError, setattr, avr, 'diginput', 34)
  513. @mock.patch('yadenon.DenonAVR.sendLine')
  514. def test_volupdown(self, sendline):
  515. avr = self.avr
  516. avr.vol_up()
  517. sendline.assert_any_call('MVUP')
  518. avr.vol_down()
  519. sendline.assert_any_call('MVDOWN')