|
- """
- Windows Process Control
- winprocess.run launches a child process and returns the exit code.
- Optionally, it can:
- redirect stdin, stdout & stderr to files
- run the command as another user
- limit the process's running time
- control the process window (location, size, window state, desktop)
- Works on Windows NT, 2000 & XP. Requires Mark Hammond's win32
- extensions.
- This code is free for any purpose, with no warranty of any kind.
- -- John B. Dell'Aquila <jbd@alum.mit.edu>
- """
- import win32api, win32process, win32security
- import win32event, win32con, msvcrt, win32gui
- import os
- def logonUser(loginString):
- """
- Login as specified user and return handle.
- loginString: 'Domain\nUser\nPassword'; for local
- login use . or empty string as domain
- e.g. '.\nadministrator\nsecret_password'
- """
- domain, user, passwd = loginString.split('\n')
- return win32security.LogonUser(
- user,
- domain,
- passwd,
- win32con.LOGON32_LOGON_INTERACTIVE,
- win32con.LOGON32_PROVIDER_DEFAULT
- )
- class Process:
- """
- A Windows process.
- """
- def __init__(self, cmd, login=None,
- hStdin=None, hStdout=None, hStderr=None,
- show=1, xy=None, xySize=None,
- desktop=None):
- """
- Create a Windows process.
- cmd: command to run
- login: run as user 'Domain\nUser\nPassword'
- hStdin, hStdout, hStderr:
- handles for process I/O; default is caller's stdin,
- stdout & stderr
- show: wShowWindow (0=SW_HIDE, 1=SW_NORMAL, ...)
- xy: window offset (x, y) of upper left corner in pixels
- xySize: window size (width, height) in pixels
- desktop: lpDesktop - name of desktop e.g. 'winsta0\\default'
- None = inherit current desktop
- '' = create new desktop if necessary
- User calling login requires additional privileges:
- Act as part of the operating system [not needed on Windows XP]
- Increase quotas
- Replace a process level token
- Login string must EITHER be an administrator's account
- (ordinary user can't access current desktop - see Microsoft
- Q165194) OR use desktop='' to run another desktop invisibly
- (may be very slow to startup & finalize).
- """
- si = win32process.STARTUPINFO()
- si.dwFlags = (win32con.STARTF_USESTDHANDLES ^
- win32con.STARTF_USESHOWWINDOW)
- if hStdin is None:
- si.hStdInput = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
- else:
- si.hStdInput = hStdin
- if hStdout is None:
- si.hStdOutput = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
- else:
- si.hStdOutput = hStdout
- if hStderr is None:
- si.hStdError = win32api.GetStdHandle(win32api.STD_ERROR_HANDLE)
- else:
- si.hStdError = hStderr
- si.wShowWindow = show
- if xy is not None:
- si.dwX, si.dwY = xy
- si.dwFlags ^= win32con.STARTF_USEPOSITION
- if xySize is not None:
- si.dwXSize, si.dwYSize = xySize
- si.dwFlags ^= win32con.STARTF_USESIZE
- if desktop is not None:
- si.lpDesktop = desktop
- procArgs = (None, # appName
- cmd, # commandLine
- None, # processAttributes
- None, # threadAttributes
- 1, # bInheritHandles
- win32process.CREATE_NEW_CONSOLE, # dwCreationFlags
- None, # newEnvironment
- None, # currentDirectory
- si) # startupinfo
- if login is not None:
- hUser = logonUser(login)
- win32security.ImpersonateLoggedOnUser(hUser)
- procHandles = win32process.CreateProcessAsUser(hUser, *procArgs)
- win32security.RevertToSelf()
- else:
- procHandles = win32process.CreateProcess(*procArgs)
- self.hProcess, self.hThread, self.PId, self.TId = procHandles
- def wait(self, mSec=None):
- """
- Wait for process to finish or for specified number of
- milliseconds to elapse.
- """
- if mSec is None:
- mSec = win32event.INFINITE
- return win32event.WaitForSingleObject(self.hProcess, mSec)
- def kill(self, gracePeriod=5000):
- """
- Kill process. Try for an orderly shutdown via WM_CLOSE. If
- still running after gracePeriod (5 sec. default), terminate.
- """
- win32gui.EnumWindows(self.__close__, 0)
- if self.wait(gracePeriod) != win32event.WAIT_OBJECT_0:
- win32process.TerminateProcess(self.hProcess, 0)
- win32api.Sleep(100) # wait for resources to be released
- def __close__(self, hwnd, dummy):
- """
- EnumWindows callback - sends WM_CLOSE to any window
- owned by this process.
- """
- TId, PId = win32process.GetWindowThreadProcessId(hwnd)
- if PId == self.PId:
- win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
- def exitCode(self):
- """
- Return process exit code.
- """
- return win32process.GetExitCodeProcess(self.hProcess)
- def run(cmd, mSec=None, stdin=None, stdout=None, stderr=None, **kw):
- """
- Run cmd as a child process and return exit code.
- mSec: terminate cmd after specified number of milliseconds
- stdin, stdout, stderr:
- file objects for child I/O (use hStdin etc. to attach
- handles instead of files); default is caller's stdin,
- stdout & stderr;
- kw: see Process.__init__ for more keyword options
- """
- if stdin is not None:
- kw['hStdin'] = msvcrt.get_osfhandle(stdin.fileno())
- if stdout is not None:
- kw['hStdout'] = msvcrt.get_osfhandle(stdout.fileno())
- if stderr is not None:
- kw['hStderr'] = msvcrt.get_osfhandle(stderr.fileno())
- child = Process(cmd, **kw)
- if child.wait(mSec) != win32event.WAIT_OBJECT_0:
- child.kill()
- raise WindowsError('process timeout exceeded')
- return child.exitCode()
- if __name__ == '__main__':
- # Pipe commands to a shell and display the output in notepad
- print('Testing winprocess.py...')
- import tempfile
- timeoutSeconds = 15
- cmdString = """\
- REM Test of winprocess.py piping commands to a shell.\r
- REM This 'notepad' process will terminate in %d seconds.\r
- vol\r
- net user\r
- _this_is_a_test_of_stderr_\r
- """ % timeoutSeconds
- cmd_name = tempfile.mktemp()
- out_name = cmd_name + '.txt'
- try:
- cmd = open(cmd_name, "w+b")
- out = open(out_name, "w+b")
- cmd.write(cmdString.encode('mbcs'))
- cmd.seek(0)
- print('CMD.EXE exit code:', run('cmd.exe', show=0, stdin=cmd,
- stdout=out, stderr=out))
- cmd.close()
- print('NOTEPAD exit code:', run('notepad.exe %s' % out.name,
- show=win32con.SW_MAXIMIZE,
- mSec=timeoutSeconds*1000))
- out.close()
- finally:
- for n in (cmd_name, out_name):
- try:
- os.unlink(cmd_name)
- except os.error:
- pass
|