A Python UPnP Media Server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

442 lines
12 KiB

  1. #!/usr/bin/env python
  2. # Copyright 2006 John-Mark Gurney <jmg@funkthat.com>
  3. '''Shoutcast Radio Feed'''
  4. __version__ = '$Change$'
  5. # $Id$
  6. # The handling of defers and state in this module is not very good. It
  7. # needs some work to ensure that error cases are properly handled. What
  8. # do we do if we get no URLs for a PLS? Do we delete ourselves just to be
  9. # readded (when we get PLS refeshing working)? Do we set a content-length
  10. # to zero till we get one?
  11. import ConfigParser
  12. import cStringIO as StringIO
  13. import os.path
  14. import random
  15. import time
  16. import traceback
  17. import xml.dom.minidom
  18. from py_shoutcast import *
  19. from DIDLLite import Container, MusicGenre, Item, AudioItem, Resource
  20. from FSStorage import registerklassfun
  21. from twisted.protocols import shoutcast
  22. from twisted.python import log, threadable, failure
  23. from twisted.internet import defer, protocol, reactor
  24. from twisted.web import error, http, resource, server
  25. from twisted.web.client import getPage, _parse
  26. PLSsection = 'playlist'
  27. def cmpStation(a, b, keys = ( 'MimeType', 'Name', 'PLS_URL', 'Bitrate' )):
  28. if filter(lambda k, x = a, y = b: x[k] != y[k], keys):
  29. return False
  30. return True
  31. def stationwbitratecmp(x, y):
  32. x, y = map(lambda a: a.title.split('-', 1)[1], (x, y))
  33. return cmp(x, y)
  34. class GenreFeedAsync(feeds.GenreFeed):
  35. genre_url = 'http://www.shoutcast.com/sbin/newxml.phtml'
  36. def __init__(self, *args, **kwargs):
  37. self.havegenre = False
  38. self.fetchinggenre = None
  39. feeds.GenreFeed.__init__(self, *args, **kwargs)
  40. def gotGenre(self, page):
  41. self.genre = page
  42. self.havegenre = True
  43. # Wake everyone up
  44. self.fetchinggenre.callback(1)
  45. def errGenre(self, failure):
  46. raise NotImplementedError, failure
  47. def fetch_genres(self):
  48. if self.havegenre:
  49. return self.genre
  50. if not self.fetchinggenre:
  51. # Need to start fetching
  52. getPage(self.genre_url.encode('ascii')) \
  53. .addCallbacks(self.gotGenre, self.errGenre)
  54. self.fetchinggenre = defer.Deferred()
  55. # Always raise this if we are waiting.
  56. raise self.fetchinggenre
  57. synchronized = ['fetch_genres', 'gotGenre', ]
  58. threadable.synchronize(GenreFeedAsync)
  59. class ShoutcastFeedAsync(feeds.ShoutcastFeed):
  60. def __init__(self, *args, **kwargs):
  61. feeds.ShoutcastFeed.__init__(self, *args, **kwargs)
  62. self.shout_url = \
  63. 'http://www.shoutcast.com/sbin/newxml.phtml?genre=' + \
  64. self.genre
  65. self.havestations = False
  66. self.fetchingstations = None
  67. def gotStations(self, page):
  68. self.stations = page
  69. self.havestations = True
  70. # Wake everyone up
  71. self.fetchingstations.callback(1)
  72. def errStations(self, failure):
  73. raise NotImplementedError, failure
  74. def fetch_stations(self):
  75. if self.havestations:
  76. return self.stations
  77. if not self.fetchingstations:
  78. # Need to start fetching
  79. getPage(self.shout_url.encode('ascii')) \
  80. .addCallbacks(self.gotStations, self.errStations)
  81. self.fetchingstations = defer.Deferred()
  82. # Always raise this if we are waiting.
  83. raise self.fetchingstations
  84. synchronized = ['fetch_stations', 'gotStations', ]
  85. threadable.synchronize(ShoutcastFeedAsync)
  86. class ShoutTransfer(shoutcast.ShoutcastClient):
  87. userAgent = 'because you block user-agents'
  88. def __init__(self, request, passback):
  89. shoutcast.ShoutcastClient.__init__(self)
  90. self.request = request
  91. self.passback = passback
  92. request.registerProducer(self, 1)
  93. def connectionLost(self, reason):
  94. #traceback.print_stack()
  95. log.msg('connectionLost:', `self.request`, `self.passback`)
  96. shoutcast.ShoutcastClient.connectionLost(self, reason)
  97. if self.request:
  98. self.request.unregisterProducer()
  99. if self.passback:
  100. self.passback(self.request)
  101. self.passback = None
  102. self.request = None
  103. def handleResponse(self, response):
  104. #Drop the data, the parts get the important data, if we got
  105. #here, the connection closed and we are going to die anyways.
  106. pass
  107. def stopProducing(self):
  108. if self.transport is not None:
  109. shoutcast.ShoutcastClient.stopProducing(self)
  110. self.request = None
  111. self.passback = None
  112. def gotMP3Data(self, data):
  113. if data[:6] == '<HTML>':
  114. print 'gd:', `data`
  115. if self.request is not None:
  116. self.request.write(data)
  117. def gotMetaData(self, data):
  118. log.msg("meta:", `data`)
  119. pass
  120. # Remotely relay producer interface.
  121. def view_resumeProducing(self, issuer):
  122. self.resumeProducing()
  123. def view_pauseProducing(self, issuer):
  124. self.pauseProducing()
  125. def view_stopProducing(self, issuer):
  126. self.stopProducing()
  127. synchronized = ['resumeProducing', 'stopProducing']
  128. threadable.synchronize(ShoutTransfer)
  129. class ShoutProxy(resource.Resource):
  130. # We should probably expire the PLS after a while.
  131. # setResponseCode(self, code, message=None)
  132. # setHeader(self, k, v)
  133. # write(self, data)
  134. # finish(self)
  135. isLeaf = True
  136. def __init__(self, url, mt):
  137. resource.Resource.__init__(self)
  138. self.shoutpls = url
  139. self.mt = mt
  140. self.urls = None
  141. self.fetchingurls = False
  142. def dump_exc(self, failure, request):
  143. exc = StringIO.StringIO()
  144. failure.printBriefTraceback(file=exc)
  145. failure.printTraceback()
  146. exc.seek(0)
  147. request.setHeader('content-type', 'text/html')
  148. request.write(error.ErrorPage(http.INTERNAL_SERVER_ERROR,
  149. http.RESPONSES[http.INTERNAL_SERVER_ERROR],
  150. '<pre>%s</pre>' % exc.read()).render(request))
  151. request.finish()
  152. def startNextConnection(self, request):
  153. url = self.urls[self.urlpos]
  154. self.urlpos = (self.urlpos + 1) % len(self.urls)
  155. scheme, host, port, path = _parse(url)
  156. #print `url`
  157. protocol.ClientCreator(reactor, ShoutTransfer, request,
  158. self.startNextConnection).connectTCP(host, port)
  159. def triggerdefered(self, fun):
  160. map(fun, self.afterurls)
  161. self.afterurls = None
  162. def gotURL(self, page):
  163. self.fetchingurls = False
  164. try:
  165. urls = self.parsePLS(page)
  166. except ConfigParser.MissingSectionHeaderError:
  167. urls = self.parseWAX(page)
  168. #log.msg('pls urls:', self.urls)
  169. self.urls = urls
  170. self.urlpos = random.randrange(len(self.urls))
  171. self.triggerdefered(lambda x: x.callback(True))
  172. def parseWAX(self, page):
  173. print 'trying WAX'
  174. dom = xml.dom.minidom.parseString(page)
  175. rootel = dom.documentElement
  176. if rootel.nodeName != 'asx':
  177. raise ValueError('Only asx allowed, got %s' %
  178. `rootel.nodeName`)
  179. urls = []
  180. for i in rootel.getElementsByTagName('entry'):
  181. urls.extend(str(x.getAttribute('href')) for x in
  182. i.getElementsByTagName('ref'))
  183. print 'returning:', `urls`
  184. return urls
  185. def parsePLS(self, page):
  186. pls = ConfigParser.SafeConfigParser()
  187. pls.readfp(StringIO.StringIO(page))
  188. # KCSM 91.1 doesn't provide a version
  189. #assert pls.getint(PLSsection, 'Version') == 2
  190. assert pls.has_option(PLSsection, 'numberofentries')
  191. cnt = pls.getint(PLSsection, 'numberofentries')
  192. urls = []
  193. for i in range(cnt):
  194. i += 1 # stupid one based arrays
  195. urls.append(pls.get(PLSsection,
  196. 'File%d' % i))
  197. return urls
  198. def errURL(self, failure):
  199. self.fetchingurls = False
  200. # XXX - retry?
  201. self.triggerdefered(lambda x: x.errback(failure))
  202. def processRequest(self, ign, request):
  203. self.startNextConnection(request)
  204. def errRequest(self, failure, request):
  205. self.dump_exc(failure, request)
  206. def render(self, request):
  207. request.setHeader('content-type', self.mt)
  208. # XXX - PS3 doesn't support streaming, this makes it think
  209. # that is has data, but it needs to d/l the entire thing.
  210. #request.setHeader('content-length', 1*1024*1024)
  211. if request.method == 'HEAD':
  212. return ''
  213. # need to start the state machine
  214. # a) fetch the playlist
  215. # b) choose a random starting point
  216. # c) connect to the server
  217. # d) select next server and goto c
  218. # return data
  219. if self.urls is None:
  220. if not self.fetchingurls:
  221. # Get the page
  222. self.fetchingurls = True
  223. # Not really sure if ascii is the correct one,
  224. # shouldn't getPage do proper escaping for me?
  225. self.afterurls = [ defer.Deferred() ]
  226. d = getPage(self.shoutpls.encode('ascii'))
  227. d.addCallback(self.gotURL)
  228. d.addErrback(self.errURL)
  229. else:
  230. self.afterurls.append(defer.Deferred())
  231. # Always add the callback if we don't have urls
  232. self.afterurls[-1].addCallbacks(self.processRequest,
  233. errback=self.errRequest, callbackArgs=(request, ),
  234. errbackArgs=(request, ))
  235. else:
  236. self.startNextConnection(request)
  237. # and make sure the connection doesn't get closed
  238. return server.NOT_DONE_YET
  239. synchronized = [ 'gotURL', 'render', 'startNextConnection',
  240. 'triggerdefered', ]
  241. threadable.synchronize(ShoutProxy)
  242. class ShoutURL(AudioItem):
  243. def __init__(self, *args, **kwargs):
  244. url = kwargs.pop('url')
  245. mimetype = kwargs.pop('mimetype', 'audio/mpeg')
  246. bitrate = kwargs.pop('bitrate', None)
  247. kwargs['content'] = ShoutProxy(url, mimetype)
  248. AudioItem.__init__(self, *args, **kwargs)
  249. self.url = '%s/%s' % (self.cd.urlbase, self.id)
  250. self.res = Resource(self.url, 'http-get:*:%s:*' % mimetype)
  251. #self.res = Resource(self.url + '/pcm', 'http-get:*:%s:*' % \
  252. # 'audio/x-wav')
  253. if bitrate is not None:
  254. self.bitrate = bitrate
  255. class ShoutFile(ShoutURL):
  256. def __init__(self, *args, **kwargs):
  257. file = kwargs.pop('file')
  258. kwargs['url'] = open(file).read().strip()
  259. ShoutURL.__init__(self, *args, **kwargs)
  260. class ShoutStation(ShoutURL):
  261. def __init__(self, *args, **kwargs):
  262. self.station = kwargs.pop('station')
  263. kwargs['url'] = self.station['PLS_URL']
  264. kwargs['mimetype'] = self.station['MimeType'].encode('ascii')
  265. kwargs['bitrate'] = self.station['Bitrate'] * 128 # 1024k / 8bit
  266. ShoutURL.__init__(self, *args, **kwargs)
  267. class ShoutGenre(MusicGenre):
  268. def __init__(self, *args, **kwargs):
  269. self.genre = kwargs['genre']
  270. del kwargs['genre']
  271. #self.feeds = ShoutcastFeedAsync(self.genre)
  272. self.feeds = feeds.ShoutcastFeed(self.genre)
  273. self.sl = None
  274. MusicGenre.__init__(self, *args, **kwargs)
  275. def genStations(self, stations):
  276. ret = {}
  277. dupcnt = {}
  278. for i in stations:
  279. name = i['Name']
  280. if name in ret:
  281. # got a dup
  282. if name not in dupcnt:
  283. dupcnt[name] = 2
  284. ret['%s - %d' % (name, dupcnt[name])] = i
  285. dupcnt[name] += 1
  286. else:
  287. ret[name] = i
  288. return ret
  289. def checkUpdate(self):
  290. # Sometimes the shoutcast server returns a 503 (busy) which
  291. # isn't valid XML, try again.
  292. while True:
  293. try:
  294. stations = self.feeds.parse_stations()
  295. break
  296. except:
  297. traceback.print_exc()
  298. time.sleep(1)
  299. if stations == self.sl:
  300. return
  301. self.sl = stations
  302. self.doUpdate()
  303. def genChildren(self):
  304. return self.genStations(self.sl)
  305. def genCurrent(self):
  306. return ((x.id, x.station) for x in self)
  307. def createObject(self, name, arg):
  308. return ShoutStation, '%sk-%s' % (arg['Bitrate'], name), (), \
  309. { 'station': arg }
  310. def sort(self):
  311. super(ShoutGenre, self).sort(lambda *a: stationwbitratecmp(*a))
  312. class ShoutCast(Container):
  313. def __init__(self, *args, **kwargs):
  314. Container.__init__(self, *args, **kwargs)
  315. #self.genres = GenreFeedAsync()
  316. self.genres = feeds.GenreFeed()
  317. self.genre_list = None
  318. def checkUpdate(self):
  319. while True:
  320. try:
  321. nl = self.genres.parse_genres()
  322. if nl == self.genre_list:
  323. return
  324. break
  325. except Exception, x:
  326. print 'genre:', `self.genres.genre`
  327. print 'genres:', `self.genres.genres`
  328. log.msg('parse_genres exception:', `x`)
  329. time.sleep(1)
  330. self.genre_list = nl
  331. super(ShoutCast, self).doUpdate()
  332. def genChildren(self):
  333. return self.genre_list
  334. def genCurrent(self):
  335. return ((x.id, x.genre) for x in self)
  336. def createObject(self, i):
  337. return ShoutGenre, i, (), { 'genre': i }
  338. def detectshoutcastfile(origpath, fobj):
  339. path = os.path.basename(origpath)
  340. if path == 'SHOUTcast Radio':
  341. return ShoutCast, { }
  342. ext = os.path.splitext(path)[1]
  343. if ext == '.scst':
  344. return ShoutFile, { 'file': origpath }
  345. return None, None
  346. registerklassfun(detectshoutcastfile)