launcher.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import argparse
  2. import asyncio
  3. import datetime
  4. import io
  5. import json
  6. import logging
  7. import os
  8. import re
  9. import string
  10. import subprocess
  11. import sys
  12. import time
  13. import uuid
  14. from random import choice
  15. import aiohttp.web as aiohttp_web
  16. from . import _root_handler
  17. from wslink.launcher import (
  18. SessionManager,
  19. ProxyMappingManagerTXT,
  20. ProcessManager,
  21. validateKeySet,
  22. STATUS_BAD_REQUEST,
  23. STATUS_SERVICE_UNAVAILABLE,
  24. filterResponse,
  25. STATUS_OK,
  26. extractSessionId,
  27. STATUS_NOT_FOUND,
  28. )
  29. # ===========================================================================
  30. # Class to implement requests to POST, GET and DELETE methods
  31. # ===========================================================================
  32. class LauncherResource(object):
  33. def __init__(self, options, config):
  34. self._options = options
  35. self._config = config
  36. self.time_to_wait = int(config["configuration"]["timeout"])
  37. self.field_filter = config["configuration"]["fields"]
  38. self.session_manager = SessionManager(
  39. config, ProxyMappingManagerTXT(config["configuration"]["proxy_file"])
  40. )
  41. self.process_manager = ProcessManager(config)
  42. def __del__(self):
  43. try:
  44. # causes an exception when server is killed with Ctrl-C
  45. logging.warning("Server factory shutting down. Stopping all processes")
  46. except:
  47. pass
  48. # ========================================================================
  49. # Handle POST request
  50. # ========================================================================
  51. async def handle_post(self, request):
  52. payload = await request.json()
  53. # Make sure the request has all the expected keys
  54. if not validateKeySet(payload, ["application"], "Launch request"):
  55. return aiohttp_web.json_response(
  56. {"error": "The request is not complete"}, status=STATUS_BAD_REQUEST
  57. )
  58. # Try to free any available resource
  59. id_to_free = self.process_manager.listEndedProcess()
  60. for id in id_to_free:
  61. self.session_manager.deleteSession(id)
  62. self.process_manager.stopProcess(id)
  63. # Create new session
  64. session = self.session_manager.createSession(payload)
  65. # No resource available
  66. if not session:
  67. return aiohttp_web.json_response(
  68. {"error": "All the resources are currently taken"},
  69. status=STATUS_SERVICE_UNAVAILABLE,
  70. )
  71. # Start process
  72. proc = self.process_manager.startProcess(session)
  73. if not proc:
  74. err_msg = "The process did not properly start. %s" % str(session["cmd"])
  75. return aiohttp_web.json_response(
  76. {"error": err_msg}, status=STATUS_SERVICE_UNAVAILABLE
  77. )
  78. return await self._waitForReady(session, request)
  79. # ========================================================================
  80. # Wait for session to be ready
  81. # ========================================================================
  82. async def _waitForReady(self, session, request):
  83. start_time = datetime.datetime.now()
  84. check_line = "ready_line" in self._config["apps"][session["application"]]
  85. count = 0
  86. while True:
  87. if self.process_manager.isReady(session, count):
  88. filterkeys = self.field_filter
  89. if session["secret"] in session["cmd"]:
  90. filterkeys = self.field_filter + ["secret"]
  91. return aiohttp_web.json_response(
  92. filterResponse(session, filterkeys), status=STATUS_OK
  93. )
  94. elapsed_time = datetime.datetime.now() - start_time
  95. if elapsed_time.total_seconds() > self.time_to_wait:
  96. # Timeout is expired, if the process is not ready now, mark the
  97. # session as timed out, clean up the process, and return an error
  98. # response
  99. session["startTimedOut"] = True
  100. self.session_manager.deleteSession(session["id"])
  101. self.process_manager.stopProcess(session["id"])
  102. return aiohttp_web.json_response(
  103. {
  104. "error": "Session did not start before timeout expired. Check session logs."
  105. },
  106. status=STATUS_SERVICE_UNAVAILABLE,
  107. )
  108. await asyncio.sleep(1)
  109. count += 1
  110. # =========================================================================
  111. # Handle GET request
  112. # =========================================================================
  113. async def handle_get(self, request):
  114. id = extractSessionId(request)
  115. if not id:
  116. message = "id not provided in GET request"
  117. logging.error(message)
  118. return aiohttp_web.json_response(
  119. {"error": message}, status=STATUS_BAD_REQUEST
  120. )
  121. logging.info("GET request received for id: %s" % id)
  122. session = self.session_manager.getSession(id)
  123. if not session:
  124. message = "No session with id: %s" % id
  125. logging.error(message)
  126. return aiohttp_web.json_response(
  127. {"error": message}, status=STATUS_BAD_REQUEST
  128. )
  129. # Return session meta-data
  130. return aiohttp_web.json_response(
  131. filterResponse(session, self.field_filter), status=STATUS_OK
  132. )
  133. # =========================================================================
  134. # Handle DELETE request
  135. # =========================================================================
  136. async def handle_delete(self, request):
  137. id = extractSessionId(request)
  138. if not id:
  139. message = "id not provided in DELETE request"
  140. logging.error(message)
  141. return aiohttp_web.json_response(
  142. {"error": message}, status=STATUS_BAD_REQUEST
  143. )
  144. logging.info("DELETE request received for id: %s" % id)
  145. session = self.session_manager.getSession(id)
  146. if not session:
  147. message = "No session with id: %s" % id
  148. logging.error(message)
  149. return aiohttp_web.json_response(
  150. {"error": message}, status=STATUS_NOT_FOUND
  151. )
  152. # Remove session
  153. self.session_manager.deleteSession(id)
  154. self.process_manager.stopProcess(id)
  155. message = "Deleted session with id: %s" % id
  156. logging.info(message)
  157. return aiohttp_web.json_response(session, status=STATUS_OK)
  158. # =============================================================================
  159. # Start the web server
  160. # =============================================================================
  161. def startWebServer(options, config):
  162. # import pdb; pdb.set_trace()
  163. # Extract properties from config
  164. log_dir = str(config["configuration"]["log_dir"])
  165. content = str(config["configuration"]["content"])
  166. endpoint = str(config["configuration"]["endpoint"])
  167. host = str(config["configuration"]["host"])
  168. port = int(config["configuration"]["port"])
  169. sanitize = config["configuration"]["sanitize"]
  170. # Setup logging
  171. logFileName = log_dir + os.sep + "launcherLog.log"
  172. formatting = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
  173. logging.basicConfig(
  174. level=logging.DEBUG, filename=logFileName, filemode="w", format=formatting
  175. )
  176. if options.debug:
  177. console = logging.StreamHandler(sys.stdout)
  178. console.setLevel(logging.INFO)
  179. formatter = logging.Formatter(formatting)
  180. console.setFormatter(formatter)
  181. logging.getLogger("").addHandler(console)
  182. web_app = aiohttp_web.Application()
  183. launcher_resource = LauncherResource(options, config)
  184. if not endpoint.startswith("/"):
  185. endpoint = "/{0}/".format(endpoint)
  186. web_app.add_routes(
  187. [
  188. aiohttp_web.post(endpoint, launcher_resource.handle_post),
  189. aiohttp_web.get(endpoint, launcher_resource.handle_get),
  190. aiohttp_web.delete(endpoint, launcher_resource.handle_delete),
  191. ]
  192. )
  193. if len(content) > 0:
  194. web_app.router.add_route("GET", "/", _root_handler)
  195. web_app.add_routes([aiohttp_web.static("/", content)])
  196. aiohttp_web.run_app(web_app, host=host, port=port)