A WebRTC based tool to support low latency audio conferencing. This is targeted to allow musicians to be able to jam together.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

161 lines
4.2 KiB

  1. import aiohttp
  2. import json
  3. import logging
  4. import os.path
  5. import uuid
  6. from aiohttp import web
  7. from aiortc import RTCPeerConnection, RTCIceCandidate, RTCSessionDescription
  8. from aiortc.contrib.media import MediaPlayer, MediaBlackhole
  9. logger = logging.getLogger('pc')
  10. logger.setLevel(logging.INFO)
  11. # Implement https://w3c.github.io/webrtc-pc/#constructor-0 per the
  12. # spec.
  13. from aioice.candidate import Candidate
  14. from aiortc.rtcicetransport import candidate_from_aioice
  15. def RealRTCIceCandidate(candidateInitDict):
  16. candpref = 'candidate:'
  17. candstr = candidateInitDict['candidate']
  18. if not candstr.startswith(candpref):
  19. raise ValueError('does not start with proper string')
  20. candstr = candstr[len(candpref):]
  21. cand = Candidate.from_sdp(candstr)
  22. ric = candidate_from_aioice(cand)
  23. ric.sdpMid = candidateInitDict['sdpMid']
  24. ric.sdpMLineIndex = candidateInitDict['sdpMLineIndex']
  25. # XXX - exists as part of RTCIceParameters
  26. #ric.usernameFragment = candidateInitDict['usernameFragment']
  27. return ric
  28. class AudioMixer(object):
  29. @property
  30. def audio(self):
  31. '''The output audio track for this mixing.'''
  32. def addTrack(self, track):
  33. '''Add an import track that will be mixed with
  34. the other tracks.'''
  35. mixer = AudioMixer()
  36. pcs = set()
  37. shutdown = False
  38. ROOT = os.path.dirname(__file__)
  39. async def index(request):
  40. content = open(os.path.join(ROOT, '..', 'dist', 'audiotest.html'), 'r').read()
  41. return web.Response(content_type='text/html', text=content)
  42. async def jammingjs(request):
  43. content = open(os.path.join(ROOT, '..', 'dist', 'jamming.js'), 'r').read()
  44. return web.Response(content_type='application/javascript', text=content)
  45. # XXX - update hander to pass uuid and meeting id in the url
  46. async def ws_handler(request):
  47. ws = web.WebSocketResponse()
  48. await ws.prepare(request)
  49. pc_id = str(uuid.uuid4())
  50. def log_info(msg, *args):
  51. #print(repr(msg), repr(args))
  52. # shouldn't be warning, but can't get logging working otherwise
  53. logger.warning(pc_id + " " + msg, *args)
  54. log_info("Created for %s", request.remote)
  55. doexit = False
  56. async for msg in ws:
  57. if doexit:
  58. break
  59. if msg.type == aiohttp.WSMsgType.TEXT:
  60. data = json.loads(msg.data)
  61. log_info('got msg: %s', repr(data))
  62. if 'sdp' in data:
  63. offer = RTCSessionDescription(
  64. sdp=data['sdp'], type=data['type'])
  65. elif 'ice' in data:
  66. pc.addIceCandidate(RealRTCIceCandidate(data['ice']))
  67. continue
  68. pc = RTCPeerConnection()
  69. # add to the currect set
  70. pcs.add(pc)
  71. @pc.on("datachannel")
  72. def on_datachannel(channel):
  73. @channel.on("message")
  74. def on_message(message):
  75. if isinstance(message, str) and message.startswith("ping"):
  76. channel.send("pong" + message[4:])
  77. @pc.on("iceconnectionstatechange")
  78. async def on_iceconnectionstatechange():
  79. log_info("ICE connection state is %s", pc.iceConnectionState)
  80. if pc.iceConnectionState == "failed":
  81. await pc.close()
  82. pcs.discard(pc)
  83. doexit = True
  84. mixer = MediaPlayer('demo-instruct.wav')
  85. @pc.on("track")
  86. def on_track(track):
  87. log_info("Track %s received", track.kind)
  88. if track.kind == "audio":
  89. pc.addTrack(mixer.audio)
  90. MediaBlackhole().addTrack(track)
  91. #mixer.addTrack(track)
  92. @track.on("ended")
  93. async def on_ended():
  94. log_info("Track %s ended", track.kind)
  95. # XXX likely not correct
  96. await mixer.stop()
  97. log_info("Got offer: %s", repr(offer))
  98. # handle offer
  99. await pc.setRemoteDescription(offer)
  100. # send answer
  101. answer = await pc.createAnswer()
  102. await pc.setLocalDescription(answer)
  103. await ws.send_str(json.dumps({
  104. "sdp": pc.localDescription.sdp,
  105. "type": pc.localDescription.type,
  106. }))
  107. elif msg.type == aiohttp.WSMsgType.ERROR:
  108. print('ws connection closed with exception %s' %
  109. ws.exception())
  110. print('websocket connection closed')
  111. return ws
  112. async def on_shutdown(app):
  113. shutdown = True
  114. # close peer connections
  115. coros = [pc.close() for pc in pcs]
  116. await asyncio.gather(*coros)
  117. pcs.clear()
  118. def main():
  119. app = web.Application()
  120. app.on_shutdown.append(on_shutdown)
  121. app.router.add_get("/", index)
  122. app.router.add_get("/jamming.js", jammingjs)
  123. app.router.add_get("/ws", ws_handler)
  124. web.run_app(app, access_log=None, port=23854, ssl_context=None)
  125. if __name__ == '__main__':
  126. main()