sspi.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. """
  2. Helper classes for SSPI authentication via the win32security module.
  3. SSPI authentication involves a token-exchange "dance", the exact details
  4. of which depends on the authentication provider used. There are also
  5. a number of complex flags and constants that need to be used - in most
  6. cases, there are reasonable defaults.
  7. These classes attempt to hide these details from you until you really need
  8. to know. They are not designed to handle all cases, just the common ones.
  9. If you need finer control than offered here, just use the win32security
  10. functions directly.
  11. """
  12. # Based on Roger Upole's sspi demos.
  13. # $Id$
  14. import win32security, sspicon
  15. error = win32security.error
  16. class _BaseAuth(object):
  17. def __init__(self):
  18. self.reset()
  19. def reset(self):
  20. """Reset everything to an unauthorized state"""
  21. self.ctxt = None
  22. self.authenticated = False
  23. self.initiator_name = None
  24. self.service_name = None
  25. # The next seq_num for an encrypt/sign operation
  26. self.next_seq_num = 0
  27. def _get_next_seq_num(self):
  28. """Get the next sequence number for a transmission. Default
  29. implementation is to increment a counter
  30. """
  31. ret = self.next_seq_num
  32. self.next_seq_num = self.next_seq_num + 1
  33. return ret
  34. def encrypt(self, data):
  35. """Encrypt a string, returning a tuple of (encrypted_data, trailer).
  36. These can be passed to decrypt to get back the original string.
  37. """
  38. pkg_size_info=self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
  39. trailersize=pkg_size_info['SecurityTrailer']
  40. encbuf=win32security.PySecBufferDescType()
  41. encbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  42. encbuf.append(win32security.PySecBufferType(trailersize, sspicon.SECBUFFER_TOKEN))
  43. encbuf[0].Buffer=data
  44. self.ctxt.EncryptMessage(0,encbuf,self._get_next_seq_num())
  45. return encbuf[0].Buffer, encbuf[1].Buffer
  46. def decrypt(self, data, trailer):
  47. """Decrypt a previously encrypted string, returning the orignal data"""
  48. encbuf=win32security.PySecBufferDescType()
  49. encbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  50. encbuf.append(win32security.PySecBufferType(len(trailer), sspicon.SECBUFFER_TOKEN))
  51. encbuf[0].Buffer=data
  52. encbuf[1].Buffer=trailer
  53. self.ctxt.DecryptMessage(encbuf,self._get_next_seq_num())
  54. return encbuf[0].Buffer
  55. def sign(self, data):
  56. """sign a string suitable for transmission, returning the signature.
  57. Passing the data and signature to verify will determine if the data
  58. is unchanged.
  59. """
  60. pkg_size_info=self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
  61. sigsize=pkg_size_info['MaxSignature']
  62. sigbuf=win32security.PySecBufferDescType()
  63. sigbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  64. sigbuf.append(win32security.PySecBufferType(sigsize, sspicon.SECBUFFER_TOKEN))
  65. sigbuf[0].Buffer=data
  66. self.ctxt.MakeSignature(0,sigbuf,self._get_next_seq_num())
  67. return sigbuf[1].Buffer
  68. def verify(self, data, sig):
  69. """Verifies data and its signature. If verification fails, an sspi.error
  70. will be raised.
  71. """
  72. sigbuf=win32security.PySecBufferDescType()
  73. sigbuf.append(win32security.PySecBufferType(len(data), sspicon.SECBUFFER_DATA))
  74. sigbuf.append(win32security.PySecBufferType(len(sig), sspicon.SECBUFFER_TOKEN))
  75. sigbuf[0].Buffer=data
  76. sigbuf[1].Buffer=sig
  77. self.ctxt.VerifySignature(sigbuf,self._get_next_seq_num())
  78. def unwrap(self, token):
  79. """
  80. GSSAPI's unwrap with SSPI.
  81. https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi
  82. Usable mainly with Kerberos SSPI package, but this is not enforced.
  83. Return the clear text, and a boolean that is True if the token was encrypted.
  84. """
  85. buffer = win32security.PySecBufferDescType()
  86. # This buffer will contain a "stream", which is the token coming from the other side
  87. buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM))
  88. buffer[0].Buffer = token
  89. # This buffer will receive the clear, or just unwrapped text if no encryption was used.
  90. # Will be resized by the lib.
  91. buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA))
  92. pfQOP = self.ctxt.DecryptMessage(buffer, self._get_next_seq_num())
  93. r = buffer[1].Buffer
  94. return r, not (pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT)
  95. def wrap(self, msg, encrypt=False):
  96. """
  97. GSSAPI's wrap with SSPI.
  98. https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi
  99. Usable mainly with Kerberos SSPI package, but this is not enforced.
  100. Wrap a message to be sent to the other side. Encrypted if encrypt is True.
  101. """
  102. size_info = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
  103. trailer_size = size_info['SecurityTrailer']
  104. block_size = size_info['BlockSize']
  105. buffer = win32security.PySecBufferDescType()
  106. # This buffer will contain unencrypted data to wrap, and maybe encrypt.
  107. buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA))
  108. buffer[0].Buffer = msg
  109. # Will receive the token that forms the beginning of the msg
  110. buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN))
  111. # The trailer is needed in case of block encryption
  112. buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING))
  113. fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT
  114. self.ctxt.EncryptMessage(fQOP, buffer, self._get_next_seq_num())
  115. # Sec token, then data, then padding
  116. r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer
  117. return r
  118. def _amend_ctx_name(self):
  119. """Adds initiator and service names in the security context for ease of use"""
  120. if not self.authenticated:
  121. raise ValueError("Sec context is not completely authenticated")
  122. try:
  123. names = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES)
  124. except error:
  125. # The SSP doesn't provide these attributes.
  126. pass
  127. else:
  128. self.initiator_name, self.service_name = names
  129. class ClientAuth(_BaseAuth):
  130. """Manages the client side of an SSPI authentication handshake
  131. """
  132. def __init__(self,
  133. pkg_name, # Name of the package to used.
  134. client_name = None, # User for whom credentials are used.
  135. auth_info = None, # or a tuple of (username, domain, password)
  136. targetspn = None, # Target security context provider name.
  137. scflags=None, # security context flags
  138. datarep=sspicon.SECURITY_NETWORK_DREP):
  139. if scflags is None:
  140. scflags = sspicon.ISC_REQ_INTEGRITY|sspicon.ISC_REQ_SEQUENCE_DETECT|\
  141. sspicon.ISC_REQ_REPLAY_DETECT|sspicon.ISC_REQ_CONFIDENTIALITY
  142. self.scflags=scflags
  143. self.datarep=datarep
  144. self.targetspn=targetspn
  145. self.pkg_info=win32security.QuerySecurityPackageInfo(pkg_name)
  146. self.credentials, \
  147. self.credentials_expiry=win32security.AcquireCredentialsHandle(
  148. client_name, self.pkg_info['Name'],
  149. sspicon.SECPKG_CRED_OUTBOUND,
  150. None, auth_info)
  151. _BaseAuth.__init__(self)
  152. def authorize(self, sec_buffer_in):
  153. """Perform *one* step of the client authentication process. Pass None for the first round"""
  154. if sec_buffer_in is not None and type(sec_buffer_in) != win32security.PySecBufferDescType:
  155. # User passed us the raw data - wrap it into a SecBufferDesc
  156. sec_buffer_new=win32security.PySecBufferDescType()
  157. tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'],
  158. sspicon.SECBUFFER_TOKEN)
  159. tokenbuf.Buffer=sec_buffer_in
  160. sec_buffer_new.append(tokenbuf)
  161. sec_buffer_in = sec_buffer_new
  162. sec_buffer_out=win32security.PySecBufferDescType()
  163. tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'], sspicon.SECBUFFER_TOKEN)
  164. sec_buffer_out.append(tokenbuf)
  165. ## input context handle should be NULL on first call
  166. ctxtin=self.ctxt
  167. if self.ctxt is None:
  168. self.ctxt=win32security.PyCtxtHandleType()
  169. err, attr, exp=win32security.InitializeSecurityContext(
  170. self.credentials,
  171. ctxtin,
  172. self.targetspn,
  173. self.scflags,
  174. self.datarep,
  175. sec_buffer_in,
  176. self.ctxt,
  177. sec_buffer_out)
  178. # Stash these away incase someone needs to know the state from the
  179. # final call.
  180. self.ctxt_attr = attr
  181. self.ctxt_expiry = exp
  182. if err in (sspicon.SEC_I_COMPLETE_NEEDED,sspicon.SEC_I_COMPLETE_AND_CONTINUE):
  183. self.ctxt.CompleteAuthToken(sec_buffer_out)
  184. self.authenticated = err == 0
  185. if self.authenticated:
  186. self._amend_ctx_name()
  187. return err, sec_buffer_out
  188. class ServerAuth(_BaseAuth):
  189. """Manages the server side of an SSPI authentication handshake
  190. """
  191. def __init__(self,
  192. pkg_name,
  193. spn = None,
  194. scflags=None,
  195. datarep=sspicon.SECURITY_NETWORK_DREP):
  196. self.spn=spn
  197. self.datarep=datarep
  198. if scflags is None:
  199. scflags = sspicon.ASC_REQ_INTEGRITY|sspicon.ASC_REQ_SEQUENCE_DETECT|\
  200. sspicon.ASC_REQ_REPLAY_DETECT|sspicon.ASC_REQ_CONFIDENTIALITY
  201. # Should we default to sspicon.KerbAddExtraCredentialsMessage
  202. # if pkg_name=='Kerberos'?
  203. self.scflags=scflags
  204. self.pkg_info=win32security.QuerySecurityPackageInfo(pkg_name)
  205. self.credentials, \
  206. self.credentials_expiry=win32security.AcquireCredentialsHandle(spn,
  207. self.pkg_info['Name'], sspicon.SECPKG_CRED_INBOUND, None, None)
  208. _BaseAuth.__init__(self)
  209. def authorize(self, sec_buffer_in):
  210. """Perform *one* step of the server authentication process."""
  211. if sec_buffer_in is not None and type(sec_buffer_in) != win32security.PySecBufferDescType:
  212. # User passed us the raw data - wrap it into a SecBufferDesc
  213. sec_buffer_new=win32security.PySecBufferDescType()
  214. tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'],
  215. sspicon.SECBUFFER_TOKEN)
  216. tokenbuf.Buffer=sec_buffer_in
  217. sec_buffer_new.append(tokenbuf)
  218. sec_buffer_in = sec_buffer_new
  219. sec_buffer_out=win32security.PySecBufferDescType()
  220. tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'], sspicon.SECBUFFER_TOKEN)
  221. sec_buffer_out.append(tokenbuf)
  222. ## input context handle is None initially, then handle returned from last call thereafter
  223. ctxtin=self.ctxt
  224. if self.ctxt is None:
  225. self.ctxt=win32security.PyCtxtHandleType()
  226. err, attr, exp = win32security.AcceptSecurityContext(self.credentials, ctxtin,
  227. sec_buffer_in, self.scflags,
  228. self.datarep, self.ctxt, sec_buffer_out)
  229. # Stash these away incase someone needs to know the state from the
  230. # final call.
  231. self.ctxt_attr = attr
  232. self.ctxt_expiry = exp
  233. if err in (sspicon.SEC_I_COMPLETE_NEEDED,sspicon.SEC_I_COMPLETE_AND_CONTINUE):
  234. self.ctxt.CompleteAuthToken(sec_buffer_out)
  235. self.authenticated = err == 0
  236. if self.authenticated:
  237. self._amend_ctx_name()
  238. return err, sec_buffer_out
  239. if __name__=='__main__':
  240. # This is the security package (the security support provider / the security backend)
  241. # we want to use for this example.
  242. ssp = "Kerberos" # or "NTLM" or "Negotiate" which enable negotiation between
  243. # Kerberos (prefered) and NTLM (if not supported on the other side).
  244. flags = (
  245. sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication
  246. sspicon.ISC_REQ_INTEGRITY | # check for integrity
  247. sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages
  248. sspicon.ISC_REQ_CONFIDENTIALITY | # request confidentiality
  249. sspicon.ISC_REQ_REPLAY_DETECT # request replay detection
  250. )
  251. # Get our identity, mandatory for the Kerberos case *for this example*
  252. # Kerberos cannot be used if we don't tell it the target we want
  253. # to authenticate to.
  254. cred_handle, exp = win32security.AcquireCredentialsHandle(
  255. None, ssp, sspicon.SECPKG_CRED_INBOUND, None, None
  256. )
  257. cred = cred_handle.QueryCredentialsAttributes(sspicon.SECPKG_CRED_ATTR_NAMES)
  258. print("We are:", cred)
  259. # Setup the 2 contexts. In real life, only one is needed: the other one is
  260. # created in the process we want to communicate with.
  261. sspiclient=ClientAuth(ssp, scflags=flags, targetspn=cred)
  262. sspiserver=ServerAuth(ssp, scflags=flags)
  263. print("SSP : %s (%s)" % (sspiclient.pkg_info["Name"], sspiclient.pkg_info["Comment"]))
  264. # Perform the authentication dance, each loop exchanging more information
  265. # on the way to completing authentication.
  266. sec_buffer=None
  267. client_step = 0
  268. server_step = 0
  269. while not(sspiclient.authenticated) or len(sec_buffer[0].Buffer):
  270. client_step += 1
  271. err, sec_buffer = sspiclient.authorize(sec_buffer)
  272. print("Client step %s" % client_step)
  273. if sspiserver.authenticated and len(sec_buffer[0].Buffer) == 0:
  274. break
  275. server_step += 1
  276. err, sec_buffer = sspiserver.authorize(sec_buffer)
  277. print("Server step %s" % server_step)
  278. # Authentication process is finished.
  279. print("Initiator name from the service side:", sspiserver.initiator_name)
  280. print("Service name from the client side: ", sspiclient.service_name)
  281. data = "hello".encode("ascii") # py3k-friendly
  282. # Simple signature, not compatible with GSSAPI.
  283. sig = sspiclient.sign(data)
  284. sspiserver.verify(data, sig)
  285. # Encryption
  286. encrypted, sig = sspiclient.encrypt(data)
  287. decrypted = sspiserver.decrypt(encrypted, sig)
  288. assert decrypted == data
  289. # GSSAPI wrapping, no encryption (NTLM always encrypts)
  290. wrapped = sspiclient.wrap(data)
  291. unwrapped, was_encrypted = sspiserver.unwrap(wrapped)
  292. print("encrypted ?", was_encrypted)
  293. assert data == unwrapped
  294. # GSSAPI wrapping, with encryption
  295. wrapped = sspiserver.wrap(data, encrypt=True)
  296. unwrapped, was_encrypted = sspiclient.unwrap(wrapped)
  297. print("encrypted ?", was_encrypted)
  298. assert data == unwrapped
  299. print("cool!")