18. Samples of code — PythonForWindows 1.0.4 documentation
18.1. Processes¶
18.1.1. windows.current_process¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.native_exec.simple_x86 as x86 import windows.native_exec.simple_x64 as x64 # Here is our current process cp = windows.current_process print("current process is {cp}".format(cp=windows.current_process)) print("current process is a <{cp.bitness}> bits process".format(cp=cp)) print("current process is a SysWow64 process ? <{cp.is_wow_64}>".format(cp=cp)) print("current process pid <{cp.pid}> and ppid <{cp.ppid}>".format(cp=cp)) print("Here are the current process threads: <{cp.threads}>".format(cp=cp)) print("Let's execute some native code ! (0x41 + 1)") if windows.current_process.bitness == 32: # Let's generate some native code code = x86.MultipleInstr() code += x86.Mov("Eax", 0x41) code += x86.Inc("EAX") code += x86.Ret() else: code = x64.MultipleInstr() code += x64.Mov("RAX", 0x41) code += x64.Inc("RAX") code += x64.Ret() native_code = code.get_code() v = windows.current_process.execute(native_code) print("Native code returned <{0}>".format(hex(v))) print("Allocating memory in current process") addr = cp.virtual_alloc(0x1000) # Default alloc is RWX (so secure !) print("Allocated memory is at <{0}>".format(hex(addr))) print("Writing 'SOME STUFF' in allocation memory") cp.write_memory(addr, "SOME STUFF") print("Reading memory : <{0}>".format(repr(cp.read_memory(addr, 20))))
Output
(cmd) python process\current_process.py current process is <windows.winobject.process.CurrentProcess object at 0x000001DD8BC5CA10> current process is a <64> bits process current process is a SysWow64 process ? <False> current process pid <26976> and ppid <28256> Here are the current process threads: <[<WinThread 15220 owner "CurrentProcess" at 0x1dd8d20b0d0>, <WinThread 27912 owner "CurrentProcess" at 0x1dd8d20afd0>, <WinThread 27832 owner "CurrentProcess" at 0x1dd8d20af10>, <WinThread 26820 owner "CurrentProcess" at 0x1dd8d20ae90>]> Let's execute some native code ! (0x41 + 1) Native code returned <0x42> Allocating memory in current process Allocated memory is at <0x1dd8d2f0000> Writing 'SOME STUFF' in allocation memory Reading memory : <b'SOME STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'>
18.1.2. Remote process : WinProcess¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.native_exec.simple_x86 as x86 import windows.native_exec.simple_x64 as x64 print("Creating a notepad") ## Replaced calc.exe by notepad.exe cause of windows 10. notepad = windows.utils.create_process(r"C:\windows\system32\notepad.exe") # You don't need to do that in our case, but it's useful to now print("Looking for notepads in the processes") all_notepads = [proc for proc in windows.system.processes if proc.name == "notepad.exe"] print("They are currently <{0}> notepads running on the system".format(len(all_notepads))) print("Let's play with our notepad: <{notepad}>".format(notepad=notepad)) print("Our notepad pid is {notepad.pid}".format(notepad=notepad)) print("Our notepad is a <{notepad.bitness}> bits process".format(notepad=notepad)) print("Our notepad is a SysWow64 process ? <{notepad.is_wow_64}>".format(notepad=notepad)) print("Our notepad have threads ! <{notepad.threads}>".format(notepad=notepad)) # PEB STUFF peb = notepad.peb print("Exploring our notepad PEB ! {peb}".format(peb=peb)) print("Command line is {peb.commandline}".format(peb=peb)) modules = peb.modules print("Here are 3 loaded modules: {0}".format(modules[:3])) # See iat_hook.py for module exploration # Remote alloc / read / write print("Allocating memory in our notepad") addr = notepad.virtual_alloc(0x1000) print("Allocated memory is at <{0}>".format(hex(addr))) print("Writing 'SOME STUFF' in allocated memory") notepad.write_memory(addr, "SOME STUFF") print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20)))) # Remote Execution print("Execution some native code in our notepad (write 0x424242 at allocated address + return 0x1337)") if notepad.bitness == 32: # Let's generate some native code code = x86.MultipleInstr() code += x86.Mov(x86.deref(addr), 0x42424242) code += x86.Mov("EAX", 0x1337) code += x86.Ret() else: code = x64.MultipleInstr() code += x64.Mov('RAX', addr) code += x64.Mov(x64.mem("[RAX]"), 0x42424242) code += x64.Mov("RAX", 0x1337) code += x64.Ret() print("Executing native code !") t = notepad.execute(code.get_code()) t.wait() print("Return code = {0}".format(hex(t.exit_code))) print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20)))) print("Executing python code !") # Make 'windows' importable in remote python notepad.execute_python("import sys; sys.path.append(r'{0}')".format(sys.path[-1])) notepad.execute_python("import windows") # Let's write in the notepad 'current_process' memory :) notepad.execute_python("addr = {addr}; windows.current_process.write_memory(addr, 'HELLO FROM notepad')".format(addr=addr)) print("Reading allocated memory : <{0}>".format(repr(notepad.read_memory(addr, 20)))) # python_execute is 'safe': # - it waits for the thread completion # - it raise an error if remote code raised some try: print("Trying to import in remote module 'FAKE_MODULE'") notepad.execute_python("def func():\n import FAKE_MODULE\nfunc()") except windows.injection.RemotePythonError as e: print("Exception in remote process!") print(e) print("That's all ! killing the notepad") notepad.exit()
Output
(cmd) python process\remote_process.py Creating a notepad Looking for notepads in the processes They are currently <1> notepads running on the system Let's play with our notepad: <<WinProcess "notepad.exe" pid 8400 at 0x6238510>> Our notepad pid is 8400 Our notepad is a <32> bits process Our notepad is a SysWow64 process ? <True> Our notepad have threads ! <[<WinThread 16028 owner "notepad.exe" at 0x6238f10>, <WinThread 5924 owner "notepad.exe" at 0x6238a10>, <WinThread 11620 owner "notepad.exe" at 0x6238fb0>, <WinThread 3480 owner "notepad.exe" at 0x6238ff0>, <WinThread 200 owner "notepad.exe" at 0x62389b0>, <WinThread 16804 owner "notepad.exe" at 0x62389f0>, <WinThread 13340 owner "notepad.exe" at 0x6238930>]> Exploring our notepad PEB ! <windows.winobject.process.RemotePEB object at 0x061BDA80> Command line is <Remote_LSA_UNICODE_STRING ""C:\windows\system32\notepad.exe"" at 0x61bdb20> Here are 3 loaded modules: [<RemoteLoadedModule "notepad.exe" at 0x61bdad0>, <RemoteLoadedModule "ntdll.dll" at 0x61bd940>, <RemoteLoadedModule "kernel32.dll" at 0x61bd8f0>] Allocating memory in our notepad Allocated memory is at <0x6f00000> Writing 'SOME STUFF' in allocated memory Reading allocated memory : <'SOME STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'> Execution some native code in our notepad (write 0x424242 at allocated address + return 0x1337) Executing native code ! Return code = 0x1337L Reading allocated memory : <'BBBB STUFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'> Executing python code ! Reading allocated memory : <'HELLO FROM notepad\x00\x00'> Trying to import in remote module 'FAKE_MODULE' Exception in remote process! Traceback (most recent call last): File "<string>", line 3, in <module> File "<string>", line 2, in func ImportError: No module named FAKE_MODULE That's all ! killing the notepad
18.1.3. PEB exploration¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows print("Exploring the current process PEB") peb = windows.current_process.peb print("PEB is <{0}>".format(peb)) commandline = peb.commandline print("Commandline object is {0}".format(commandline)) print("Commandline string is {0}".format(repr(commandline.Buffer))) imagepath = peb.imagepath print("Imagepath {0}".format(imagepath)) modules = peb.modules print("Printing some modules: {0}".format("\n".join(str(m) for m in modules[:6]))) print("=== K32 ===") print("Looking for kernel32.dll") k32 = [m for m in modules if m.name == "kernel32.dll"][0] print("Kernel32 module: {0}".format(k32)) print("Module name = <{0}> | Fullname = <{1}>".format(k32.name, k32.fullname)) print("Kernel32 is loaded at address {0}".format(hex(k32.baseaddr))) print("=== K32 PE ===") k32pe = k32.pe print("PE Representation of k32: {0}".format(k32pe)) exports = k32pe.exports some_exports = dict((k,v) for k,v in exports.items() if k in [0, 42, "VirtualAlloc", "CreateFileA"]) print("Here are some exports {0}".format(some_exports)) imports = k32pe.imports print("Import DLL dependancies are (without api-*): {0}".format([x for x in imports.keys() if not x.startswith("api-")])) NtCreateFile_iat = [x for x in imports["ntdll.dll"] if x.name == "NtCreateFile"][0] print("IAT Entry for ntdll!NtCreateFile = {0} | addr = {1}".format(NtCreateFile_iat, hex(NtCreateFile_iat.addr))) print("Sections: {0}".format(k32pe.sections))
Output
(cmd) python process\peb.py
Exploring the current process PEB
PEB is <<windows.winobject.process.PEB object at 0x05A6F710>>
Commandline object is <_LSA_UNICODE_STRING "C:\Python27\python.exe process\peb.py" at 0x5a6f8a0>
Commandline string is 47063282
Imagepath <_LSA_UNICODE_STRING "C:\Python27\python.exe" at 0x5a6f990>
Printing some modules: <LoadedModule "python.exe" at 0x60d2080>
<LoadedModule "ntdll.dll" at 0x60d2030>
<LoadedModule "kernel32.dll" at 0x60d2b20>
<LoadedModule "kernelbase.dll" at 0x60d2b70>
<LoadedModule "python27.dll" at 0x60d2bc0>
<LoadedModule "msvcr90.dll" at 0x60d2c10>
=== K32 ===
Looking for kernel32.dll
Kernel32 module: <LoadedModule "kernel32.dll" at 0x60d2b20>
Module name = <kernel32.dll> | Fullname = <c:\windows\system32\kernel32.dll>
Kernel32 is loaded at address 0x76930000
=== K32 PE ===
PE Representation of k32: <windows.pe_parse.PEFile object at 0x060CCA10>
Here are some exports {0: 1989445168L, 'CreateFileA': 1989795280L, 42: 1989636592L, 'VirtualAlloc': 1989437552L}
Import DLL dependancies are (without api-*): ['kernelbase.dll', 'ntdll.dll']
IAT Entry for ntdll!NtCreateFile = <IATEntry "NtCreateFile" ordinal 272> | addr = 0x769a1a28L
Sections: [<PESection ".text">, <PESection ".rdata">, <PESection ".data">, <PESection ".rsrc">, <PESection ".reloc">]
18.1.4. ApiSetMap¶
import windows print("Computer is a <{0}>".format(windows.system.version_name)) cp = windows.current_process apism = cp.peb.apisetmap print("ApiSetMap: {0} (version = {1})".format(apism, apism.version)) # Find the current version of "api-ms-win-core-processthreads" used by windows dll_demos_fullname = [x for x in windows.current_process.peb.apisetmap.apisetmap_dict if "api-ms-win-core-processthreads" in x][0] dll_demos_utilname = 'api-ms-win-core-processthreads-l1-1-' print("Entries in 'apisetmap_dict' are the full api-dll path extracted") print(" * apisetmap.apisetmap_dict['{0}'] -> {1}".format(dll_demos_fullname, apism.apisetmap_dict[dll_demos_fullname])) print("Entries in 'resolution_dict' are the contains the util-part check by windows") print(" * apisetmap.resolution_dict['{0}'] -> {1}".format(dll_demos_utilname, apism.resolution_dict[dll_demos_utilname])) print("ApiSetMap.resolve resolve a api-dll based on the util part") for suffix in ["1", "2", "PART_IS_IGNORED"]: testname = dll_demos_utilname + suffix print(" * apisetmap.resolve('{0}') -> {1}".format(testname, apism.resolve(testname))) testname = "BAD_DLL-3.dll" try: print(" * apisetmap.resolve('{0}') -> {1}".format(testname, apism.resolve(testname))) except KeyError as e: print(" * apisetmap.resolve('{0}') -> raised: {1!r}".format(testname, e))
Output
(cmd) python process\apisetmap.py Computer is a <Windows 10> ApiSetMap: <windows.winobject.apisetmap.ApiSetMapVersion6 object at 0x0645ECB0> (version = 6) Entries in 'apisetmap_dict' are the full api-dll path extracted * apisetmap.apisetmap_dict['api-ms-win-core-processthreads-l1-1-3'] -> kernelbase.dll Entries in 'resolution_dict' are the contains the util-part check by windows * apisetmap.resolution_dict['api-ms-win-core-processthreads-l1-1-'] -> kernelbase.dll ApiSetMap.resolve resolve a api-dll based on the util part * apisetmap.resolve('api-ms-win-core-processthreads-l1-1-1') -> kernelbase.dll * apisetmap.resolve('api-ms-win-core-processthreads-l1-1-2') -> kernelbase.dll * apisetmap.resolve('api-ms-win-core-processthreads-l1-1-PART_IS_IGNORED') -> kernelbase.dll * apisetmap.resolve('BAD_DLL-3.dll') -> raised: KeyError('BAD_DLL-',)
18.1.5. IAT hooking¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) try: import winreg except ImportError: import _winreg as winreg import windows # Here is a demo of IAT hooking in python # We will hook the 'RegOpenKeyExA' entry of Python27.dll because it is easy to trigger ! # First: let's create our hook # windows.hooks.RegOpenKeyExACallback is generated based on windows.generated_def.winfuncs @windows.hooks.RegOpenKeyExACallback def open_reg_hook(hKey, lpSubKey, ulOptions, samDesired, phkResult, real_function): print("<in hook> Hook called | hKey = {0} | lpSubKey = <{1}>".format(hex(hKey), lpSubKey.value)) # Our hook can choose to call the real_function or not if "SECRET" in lpSubKey.value: print("<in hook> Secret key asked, returning magic handle 0x12345678") # We must respect the hooked method return-value interface phkResult[0] = 0x12345678 return 0 if "FAIL" in lpSubKey.value: print("<in hook> Asked for a failing key: returning 0x2a") return 42 print("<in hook> Non-secret key : calling normal function") return real_function() ## Wide version of the Hook for python3 ! @windows.hooks.RegOpenKeyExWCallback def open_reg_hookw(hKey, lpSubKey, ulOptions, samDesired, phkResult, real_function): print("<in hook> Hook called | hKey = {0} | lpSubKey = <{1}>".format(hex(hKey), lpSubKey.value)) # Our hook can choose to call the real_function or not if "SECRET" in lpSubKey.value: print("<in hook> Secret key asked, returning magic handle 0x12345678") # We must respect the hooked method return-value interface phkResult[0] = 0x12345678 return 0 if "FAIL" in lpSubKey.value: print("<in hook> Asked for a failing key: returning 0x2a") return 42 print("<in hook> Non-secret key : calling normal function") return real_function() # Get the peb of our process peb = windows.current_process.peb # Get the pythonxx.dll module pythondll_module = [m for m in peb.modules if m.name.startswith("python") and m.name.endswith(".dll")][0] # Get the iat entries for DLL advapi32.dll adv_imports = pythondll_module.pe.imports['advapi32.dll'] # Get RegOpenKeyExA iat entry RegOpenKeyEx_iat = [n for n in adv_imports if n.name == "RegOpenKeyExA"] if not RegOpenKeyEx_iat: # Py3 RegOpenKeyEx_iat = [n for n in adv_imports if n.name == "RegOpenKeyExW"] open_reg_hook = open_reg_hookw RegOpenKeyEx_iat = RegOpenKeyEx_iat[0] # Setup our hook RegOpenKeyEx_iat.set_hook(open_reg_hook) ### !!!! You must keep the iat_entry alive !!!! ### If the hook is garbage collected while active -> python will crash # Use python native module _winreg that call 'RegOpenKeyExA' print("Asking for <MY_SECRET_KEY>") v = winreg.OpenKey(1234567, "MY_SECRET_KEY") print("Result = " + hex(v.handle)) print("") print("Asking for <MY_FAIL_KEY>") try: v = winreg.OpenKey(1234567, "MY_FAIL_KEY") print("Result = " + hex(v.handle)) except WindowsError as e: print(repr(e)) print("") print("Asking for <HKEY_CURRENT_USER/Software>") try: v = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Software") print("Result = " + hex(v.handle)) except WindowsError as e: print(repr(e))
Output
(cmd) python process\iat_hook.py Asking for <MY_SECRET_KEY> <in hook> Hook called | hKey = 0x12d687 | lpSubKey = <MY_SECRET_KEY> <in hook> Secret key asked, returning magic handle 0x12345678 Result = 0x12345678 Asking for <MY_FAIL_KEY> <in hook> Hook called | hKey = 0x12d687 | lpSubKey = <MY_FAIL_KEY> <in hook> Asked for a failing key: returning 0x2a WindowsError(42, 'Windows Error 0x2A') Asking for <HKEY_CURRENT_USER/Software> <in hook> Hook called | hKey = 0x80000001L | lpSubKey = <Software> <in hook> Non-secret key : calling normal function Result = 0x428
18.1.6. Microsoft Store Python Injection¶
Python execution in remote process fails with Microsoft Store builds of pythons (mspython), as the interpreter DLLs do not grant execute to Users. This sample shows a workaround by user https://github.com/dariushoule by copying needed mspython files to a temporary directory and injecting those instead.
# Some python interpreters run in environments with restrictive ACLs (no Users/* execute) on bundled DLLs. # The Microsoft Store version of python is the prime example of this. # # Remote execution of python is still possible by creating a minimal set of the dependencies outside of the restricted directory. # # This can be very helpful when operating PFW in environments with restrive GPOs / AppLocker. import ctypes import glob import os import shutil import tempfile import time import sys import struct import windows from windows.generated_def.ntstatus import STATUS_THREAD_IS_TERMINATING from windows.generated_def.windef import CREATE_SUSPENDED from windows.generated_def.winstructs import PROCESS_INFORMATION, STARTUPINFOW from windows.injection import RemotePythonError, \ find_python_dll_to_inject, get_dll_name_from_python_version, inject_python_command, load_dll_in_remote_process, retrieve_exc print("Executable is: {0}".format(sys.executable)) CACHE_DIR = os.path.join(tempfile.gettempdir(), 'pfw_dllcache') INTERPRETER_DIR = os.path.dirname(find_python_dll_to_inject(64)) # Tailor bitness to your needs def mspython_acl_workaround(target, pydll_path): """ Works around mspython ACL restrictions on mspython interpreters by copying the critical DLLs to a TEMP dir and orienting the interpreter against that TEMP dir. """ if not os.path.exists(CACHE_DIR): os.mkdir(CACHE_DIR) for dll in [os.path.join(INTERPRETER_DIR, 'vcruntime140.dll'), pydll_path]: cache_dll_path = os.path.join(CACHE_DIR, os.path.basename(dll)) try: # Creates a copy of the DLL without bringing over restrictive ACLs shutil.copyfile(dll, cache_dll_path) except Exception as e: # If its not writeable good chance these DLLs are just already loaded somewhere print(e) # Preloading python DLL and vcruntime so they don't get loaded from the path tree with restrictive ACLs print("Injecting: {0}".format(cache_dll_path)) load_dll_in_remote_process(target, cache_dll_path) for dll in glob.glob(os.path.join(INTERPRETER_DIR, 'dlls', '*')): cache_dll_path = os.path.join(CACHE_DIR, os.path.basename(dll)) try: # Dynamic lib DLLs with restrictive ACLs copied to unrestricted parent shutil.copyfile(dll, cache_dll_path) except Exception as e: print(e) target._workaround_applied = True # Adapted from windows\winobject\process.py def execute_python_code(process, code): py_dll_name = get_dll_name_from_python_version() pydll_path = find_python_dll_to_inject(process.bitness) if not getattr(process, "_workaround_applied", None): mspython_acl_workaround(process, pydll_path) shellcode, pythoncode = inject_python_command(process, code, py_dll_name) t = process.create_thread(shellcode, pythoncode) return t def safe_execute_python(process, code): t = execute_python_code(process, code) t.wait() # Wait termination of the thread if t.exit_code == 0: return True if t.exit_code == STATUS_THREAD_IS_TERMINATING or process.is_exit: raise WindowsError("{0} died during execution of python command".format(process)) if t.exit_code != 0xffffffff: raise ValueError("Unknown exit code {0}".format(hex(t.exit_code))) data = retrieve_last_exception_data(process) raise RemotePythonError(data) # Adapted from windows\injection.py def retrieve_last_exception_data(process): with process.allocated_memory(0x1000) as mem: execute_python_code(process, retrieve_exc.format(mem)).wait() size = struct.unpack("<I", process.read_memory(mem, ctypes.sizeof(ctypes.c_uint)))[0] data = process.read_memory(mem + ctypes.sizeof(ctypes.c_uint), size) return data # First: show what happen when injecting mspython normally print("Trying normal execute_python()") proc1 = windows.utils.create_process(r"C:\Windows\system32\winver.exe") try: proc1.execute_python("2 + 2 == 5") except Exception as e: print(" Exception during proc1.execute_python():") print(" {0}".format(repr(e))) proc1.exit() print("Trying mspython workaround:") proc_info = PROCESS_INFORMATION() StartupInfo = STARTUPINFOW() StartupInfo.cb = ctypes.sizeof(StartupInfo) windows.winproxy.CreateProcessW( r"C:\Windows\system32\winver.exe", dwCreationFlags=CREATE_SUSPENDED, # Point PYTHONHOME to the interpreter dir so non-DLL libs can load # Point PYTHONPATH to the newly created cache directory so DLL libs are loaded from there lpEnvironment=('\0'.join('{}={}'.format(e, v) for e, v in os.environ.items()) + \ '\0PYTHONHOME={}\0PYTHONPATH={}\0\0'.format(INTERPRETER_DIR, CACHE_DIR)).encode(), lpProcessInformation=ctypes.byref(proc_info), lpStartupInfo=ctypes.byref(StartupInfo)) process = windows.winobject.process.WinProcess(pid=proc_info.dwProcessId, handle=proc_info.hProcess) print(" Executing python code!") safe_execute_python(process, """ import windows windows.utils.create_console() print('hello from inside the suspended process!', flush=True) """) process.threads[0].resume() print(" Executing more python code!") safe_execute_python(process, """ print('hello from inside the resumed process!', flush=True) """) print(" Executing an error python code!") try: safe_execute_python(process, """BAD_VARIABLE""") except RemotePythonError as e: print(" Expected error during safe_execute_python") print(" {0}".format(e)) print(" Sleeping a little") time.sleep(5) print(" Killing target process !") process.exit()
Output
PS C:\Users\hakril\PythonForWindows> py .\samples\process\msstore_interpreter_remote_python.py
Executable is: C:\Users\hakril\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\python.exe
Trying normal execute_python()
Exception during proc1.execute_python():
InjectionFailedError('Injection of <c:\\program files\\windowsapps\\pythonsoftwarefoundation.python.3.13_3.13.496.0_x64__qbz5n2kfra8p0\\vcruntime140.dll> failed')
Trying mspython workaround:
Executing python code!
Injecting: C:\Users\hakril\AppData\Local\Temp\pfw_dllcache\vcruntime140.dll
Injecting: C:\Users\hakril\AppData\Local\Temp\pfw_dllcache\python313.dll
Executing more python code!
Executing an error python code!
Expected error during safe_execute_python
b'Traceback (most recent call last):\n File "<string>", line 1, in <module>\nNameError: name \'BAD_VARIABLE\' is not defined\n'
Sleeping a little
Killing target process !
18.2. Token¶
import windows import windows.generated_def as gdef tok = windows.current_process.token print("Our process token is {0}".format(tok)) print("Retrieving some infos") print("Username: <{0}>".format(tok.username)) print("User: {0!r}".format(tok.user)) print(" - lookup : {0}".format(windows.utils.lookup_sid(tok.user))) print("Primary group: {0!r}".format(tok.primary_group)) print(" - lookup : {0}".format(windows.utils.lookup_sid(tok.primary_group))) print("") groups = tok.groups print("Token Groups is {0}".format(groups)) print("First group SID is {0!r}".format(groups.sids[0])) print("Some sid and attributes:") for i, group in zip(range(3), groups.sids_and_attributes): print(" - {0}: {1}".format(group.Sid, group.Attributes)) # Let's play with duplicate ! print("") imp_tok = tok.duplicate(type=gdef.TokenImpersonation, impersonation_level=gdef.SecurityImpersonation) print("Duplicate token is {0}".format(imp_tok)) print("Enabling <SeShutDownPrivilege>") imp_tok.enable_privilege("SeShutDownPrivilege") cur_thread = windows.current_thread print("Current thread token is <{0}>".format(cur_thread.token)) print("Setting impersonation token !") cur_thread.token = imp_tok print("Current thread token is {0}".format(cur_thread.token))
Output
(cmd) python token\token_demo.py
Our process token is <Token TokenId=0x3f1f98fd Type=TokenPrimary(0x1L)>
Retrieving some infos
Username: <hakril>
User: <PSID "S-1-5-21-184905214-2723199098-2761450773-1001">
- lookup : ('WILLIE', 'hakril')
Primary group: <PSID "S-1-5-21-184905214-2723199098-2761450773-513">
- lookup : ('WILLIE', 'Aucun')
Token Groups is <TokenGroups count=15>
First group SID is <PSID "S-1-5-21-184905214-2723199098-2761450773-513">
Some sid and attributes:
- S-1-5-21-184905214-2723199098-2761450773-513: 7
- S-1-1-0: 7
- S-1-5-114: 16
Duplicate token is <Token TokenId=0x3f1fac85 Type=TokenImpersonation(0x2L) ImpersonationLevel=SecurityImpersonation(0x2L)>
Enabling <SeShutDownPrivilege>
Current thread token is <None>
Setting impersonation token !
Current thread token is <Token TokenId=0x3f1fac85 Type=TokenImpersonation(0x2L) ImpersonationLevel=SecurityImpersonation(0x2L)>
18.3. windows.system¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows system = windows.system print("Basic system infos:") print(" version = {0}".format(system.version)) print(" bitness = {0}".format(system.bitness)) print(" computer_name = {0}".format(system.computer_name)) print(" product_type = {0}".format(system.product_type)) print(" version_name = {0}".format(system.version_name)) print("") print("There is {0} processes".format(len(system.processes))) print("There is {0} threads".format(len(system.threads))) print("") print("Dumping first logical drive:") drive = system.logicaldrives[0] print(" " + str(drive)) print((" " * 8) + "name = {0}".format(drive.name)) print((" " * 8) + "type = {0}".format(drive.type)) print((" " * 8) + "path = {0}".format(drive.path)) print("") print("Dumping first service:") serv = windows.system.services[0] print(" " + str(serv)) print((" " * 8) + "name = {0}".format(serv.name)) print((" " * 8) + "description = {0}".format(serv.description)) print((" " * 8) + "status = {0}".format(serv.status)) print((" " * 8) + "process = {0}".format(repr(serv.process))) print("") print("Finding a service in a user process:") serv = [s for s in windows.system.services if s.process][0] print(" " + str(serv)) print((" " * 8) + "name = {0}".format(serv.name)) print((" " * 8) + "description = {0}".format(serv.description)) print((" " * 8) + "status = {0}".format(serv.status)) print((" " * 8) + "process = {0}".format(repr(serv.process))) print("") print("Enumerating handles:") handles = system.handles print(" There are {0} handles:".format(len(handles))) print(" First handle is: " + str(handles[0])) print(" Enumerating handles of the current process:") cp_handles = [h for h in system.handles if h.dwProcessId == windows.current_process.pid] print(" There are {0} handles for this process".format(len(cp_handles))) print(" Looking for a File handle:") file_h = [h for h in cp_handles if h.type == "File"][0] print(" Handle is {0}".format(file_h)) print(" Name is <{0}>".format(file_h.name)) print("") print("Dumping the first system module") kmod = windows.system.modules[0] print(" " + str(kmod)) print((" " * 8) + "ImageName = {0}".format(kmod.ImageName)) print((" " * 8) + "Base = {0:#x}".format(kmod.Base)) print((" " * 8) + "Size = {0:#x}".format(kmod.Size)) print((" " * 8) + "Flags = {0:#x}".format(kmod.Flags)) print((" " * 8) + "LoadCount = {0}".format(kmod.LoadCount))
Output
(cmd) python system.py Basic system infos: version = (10, 0) bitness = 64 computer_name = WILLIE product_type = 1 version_name = Windows 10 There is 331 processes There is 4010 threads Dumping first logical drive: <LogicalDrive "B:\" (DRIVE_FIXED)> name = B:\ type = 3 path = \Device\HarddiskVolume7 Dumping first service: <Service "1394ohci" SERVICE_STOPPED(0x1)> name = 1394ohci description = 1394 OHCI Compliant Host Controller status = <_SERVICE_STATUS_PROCESS type=SERVICE_KERNEL_DRIVER(0x1) state=SERVICE_STOPPED(0x1)> process = None Finding a service in a user process: <Service "AppIDSvc" SERVICE_RUNNING(0x4)> name = AppIDSvc description = Application Identity status = <_SERVICE_STATUS_PROCESS type=48L state=SERVICE_RUNNING(0x4)> process = <WinProcess "!cannot-retrieve-name" pid 6060 at 0x35851d0> Enumerating handles: There are 208332 handles: First handle is: <HandleWow64 value=<0x4> in process pid=4> Enumerating handles of the current process: There are 275 handles for this process Looking for a File handle: Handle is <HandleWow64 value=<0x4> in process pid=15236> Name is <\Device\ConDrv> Dumping the first system module <SystemModuleWow64 name="\SystemRoot\system32\ntoskrnl.exe" base=0xfffff80023000000> ImageName = \SystemRoot\system32\ntoskrnl.exe Base = 0xfffff80023000000 Size = 0xab7000 Flags = 0x8804000 LoadCount = 240
18.4. Services¶
import windows import windows.generated_def as gdef print("Listing the first 3 services:") for service in windows.system.services[:3]: print(" * {0}".format(service)) print("") TARGET_SERVICE = "TapiSrv" print("Retriving service <{0}>".format(TARGET_SERVICE)) service = windows.system.services[TARGET_SERVICE] print("{0}".format(service)) print(" - name: {0!r}".format(service.name)) print(" - description: {0!r}".format(service.description)) print(" - state: {0!r}".format(service.status.state)) print(" - type: {0!r}".format(service.status.type)) print(" - process: {0!r}".format(service.process)) print(" - security-description: {0}".format(service.security_descriptor)) if service.status.state == gdef.SERVICE_RUNNING: print("Service already running, not trying to start it") else: print("Trying to start the service") service.start() while service.status.state != gdef.SERVICE_RUNNING: pass print("Service started !") print("{0}".format(service)) print(" - state: {0!r}".format(service.status.state)) print(" - process: {0!r}".format(service.process))
Output
(cmd) python service\service_demo.py Listing the first 3 services: * <Service "1394ohci" SERVICE_STOPPED(0x1)> * <Service "3ware" SERVICE_STOPPED(0x1)> * <Service "ACPI" SERVICE_RUNNING(0x4)> Retriving service <TapiSrv> <Service "TapiSrv" SERVICE_STOPPED(0x1)> - name: 'TapiSrv' - description: 'Telephony' - state: SERVICE_STOPPED(0x1) - type: 48L - process: None - security-description: O:SYG:SYD:(A;;CCLCSWRPWPDTLOCRRC;;;SY)(A;;CCDCLCSWRPWPDTLOCRSDRCWDWO;;;BA)(A;;CCLCSWRPLOCRRC;;;IU)(A;;CCLCSWRPLOCRRC;;;SU) Trying to start the service Service started ! <Service "TapiSrv" SERVICE_RUNNING(0x4)> - state: SERVICE_RUNNING(0x4) - process: <WinProcess "!cannot-retrieve-name" pid 14700 at 0x4a2c4f0>
18.4.1. Windows Service in Python¶
This script can register itself as a service and act on stop request. It also can delete its own service.
# A python script able to install itself as a service import sys import os.path import argparse import datetime import ctypes import windows import windows.generated_def as gdef SERVICE_NAME = u"PFW_SERVICE_DEMO" SERVICE_DESCRIPTION = u"PythonForWindows demo service" SERVICE_LOGFILE = os.path.join(os.path.dirname(__file__), "pfw_service_logs.txt") SERVICE_HANDLE = None def install_demo_service(): path = "{executable} {pyfile} --run".format(executable=sys.executable, pyfile=__file__) print("Registering service <{0}> as : <{1}>".format(SERVICE_NAME, path)) newservice = windows.system.services.create( name=SERVICE_NAME, description=SERVICE_DESCRIPTION, access=gdef.SERVICE_ALL_ACCESS, type=gdef.SERVICE_WIN32_OWN_PROCESS, start=gdef.SERVICE_DEMAND_START, path=path, user=None ) print(newservice) return def uninstall_demo_service(): print("Deleting service") print(windows.system.services[SERVICE_NAME].delete()) def log(s): with open(SERVICE_LOGFILE, "a") as f: f.write("[{time}] {s}\n".format(time=datetime.datetime.now(), s=s)) @ctypes.WINFUNCTYPE(gdef.DWORD, gdef.DWORD, gdef.DWORD, gdef.PVOID, gdef.PVOID) def service_handlerex(dwControl, dwEventType, lpEventData, lpContext): log("in service_handlerex") log("service_handlerex: called with {0}".format(dwControl)) if dwControl == gdef.SERVICE_CONTROL_STOP: log("Stopping the service") running_status = gdef.SERVICE_STATUS( dwServiceSpecificExitCode=0, dwServiceType =gdef.SERVICE_WIN32_OWN_PROCESS, dwCurrentState=gdef.SERVICE_STOPPED, dwWin32ExitCode=gdef.NO_ERROR, dwControlsAccepted=0 ) try: windows.winproxy.SetServiceStatus(SERVICE_HANDLE, running_status) except Exception as e: log(str(e)) return 0 @ctypes.WINFUNCTYPE(gdef.PVOID, gdef.DWORD, ctypes.POINTER(gdef.LPWSTR)) def service_main(dwNumServicesArgs, lpServiceArgVectors): global SERVICE_HANDLE log("In service_main") log("service_main: {0}".format(dwNumServicesArgs)) log("service_main: {0}".format(lpServiceArgVectors[0])) try: log("Calling RegisterServiceCtrlHandlerExW") SERVICE_HANDLE = windows.winproxy.RegisterServiceCtrlHandlerExW(SERVICE_NAME, ctypes.cast(service_handlerex, gdef.PVOID), None) log("RegisterServiceCtrlHandlerExW handle: {0}".format(SERVICE_HANDLE)) running_status = gdef.SERVICE_STATUS( dwServiceSpecificExitCode=0, dwServiceType =gdef.SERVICE_WIN32_OWN_PROCESS, dwCurrentState=gdef.SERVICE_RUNNING, dwWin32ExitCode=gdef.NO_ERROR, dwControlsAccepted=gdef.SERVICE_ACCEPT_STOP | gdef.SERVICE_ACCEPT_PAUSE_CONTINUE ) res = windows.winproxy.SetServiceStatus(SERVICE_HANDLE, running_status) log("Service is running : {0}".format(res)) except Exception as e: log(str(e)) raise return None def run_demo_service(): log("start of run_demo_service()") try: SERVICE_TABLE = (gdef.SERVICE_TABLE_ENTRYW * 2)( gdef.SERVICE_TABLE_ENTRYW(SERVICE_NAME, ctypes.cast(service_main, gdef.PVOID)), gdef.SERVICE_TABLE_ENTRYW(None, None), ) log("Calling: StartServiceCtrlDispatcherW()") result = windows.winproxy.StartServiceCtrlDispatcherW(SERVICE_TABLE) log("StartServiceCtrlDispatcherW returned: {0}".format(result)) log("Quitting") except Exception as e: log(str(e)) raise parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--install", action="store_true", help="Install the service in registry") group.add_argument("--uninstall", action="store_true", help="UnInstall the service in registry") group.add_argument("--run", action="store_true", help="Called by the services.exe to run the service") if __name__ == "__main__": args = parser.parse_args() if args.install: install_demo_service() elif args.uninstall: uninstall_demo_service() elif args.run: log("calling run_demo_service()") log(sys.argv) run_demo_service() else: raise ValueError("Unknown argument")
Output
(cmd-admin) sc.exe query PFW_SERVICE_DEMO [SC] EnumQueryServicesStatus:OpenService FAILED 1060: The specified service does not exist as an installed service. (cmd-admin) py .\service\python_service.py --install Registering service <PFW_SERVICE_DEMO> as : <C:\Users\cleme\AppData\Local\Programs\Python\Python311\python.exe C:\Users\cleme\Documents\projets\PythonForWindows\samples\service\python_service.py --run> <Service "PFW_SERVICE_DEMO" SERVICE_STOPPED(0x1)> (cmd-admin) sc.exe start PFW_SERVICE_DEMO SERVICE_NAME: PFW_SERVICE_DEMO TYPE : 10 WIN32_OWN_PROCESS STATE : 2 START_PENDING (NOT_STOPPABLE, NOT_PAUSABLE, IGNORES_SHUTDOWN) WIN32_EXIT_CODE : 0 (0x0) SERVICE_EXIT_CODE : 0 (0x0) CHECKPOINT : 0x0 WAIT_HINT : 0x7d0 PID : 21132 FLAGS : (cmd-admin) sc.exe query PFW_SERVICE_DEMO SERVICE_NAME: PFW_SERVICE_DEMO TYPE : 10 WIN32_OWN_PROCESS STATE : 4 RUNNING (STOPPABLE, PAUSABLE, IGNORES_SHUTDOWN) WIN32_EXIT_CODE : 0 (0x0) SERVICE_EXIT_CODE : 0 (0x0) CHECKPOINT : 0x0 WAIT_HINT : 0x0 (cmd-admin) cat .\service\pfw_service_logs.txt [2024-12-31 10:34:36.033158] calling run_demo_service() [2024-12-31 10:34:36.033158] ['C:\\Users\\XXX\\PythonForWindows\\samples\\service\\python_service.py', '--run'] [2024-12-31 10:34:36.033158] start of run_demo_service() [2024-12-31 10:34:36.033158] Calling: StartServiceCtrlDispatcherW() [2024-12-31 10:34:36.033158] In service_main [2024-12-31 10:34:36.033158] service_main: 1 [2024-12-31 10:34:36.033158] service_main: PFW_SERVICE_DEMO [2024-12-31 10:34:36.033158] Calling RegisterServiceCtrlHandlerExW [2024-12-31 10:34:36.033158] RegisterServiceCtrlHandlerExW handle: 2591650953424 [2024-12-31 10:34:36.038274] Service is running : 1 (cmd-admin) sc.exe stop PFW_SERVICE_DEMO SERVICE_NAME: PFW_SERVICE_DEMO TYPE : 10 WIN32_OWN_PROCESS STATE : 1 STOPPED WIN32_EXIT_CODE : 0 (0x0) SERVICE_EXIT_CODE : 0 (0x0) CHECKPOINT : 0x0 WAIT_HINT : 0x0 (cmd-admin) cat .\service\pfw_service_logs.txt [...] [2024-12-31 10:34:36.038274] Service is running : 1 [2024-12-31 10:34:53.422375] in service_handlerex [2024-12-31 10:34:53.422375] service_handlerex: called with 1 [2024-12-31 10:34:53.432530] Stopping the service [2024-12-31 10:34:53.433035] StartServiceCtrlDispatcherW returned: 1 [2024-12-31 10:34:53.433035] Quitting (cmd-admin) py .\service\python_service.py --uninstall Deleting service 1 (cmd-admin) sc.exe query PFW_SERVICE_DEMO [SC] EnumQueryServicesStatus:OpenService FAILED 1060: The specified service does not exist as an installed service.
18.5. Network - socket exploration¶
import sys import os.path import socket sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows if not windows.utils.check_is_elevated(): print("!!! Demo will fail because closing a connection require elevated process !!!") print("Working on ipv4") conns = windows.system.network.ipv4 print("== Listening ==") print("Some listening connections: {0}".format([c for c in conns if not c.established][:3])) print("Listening ports are : {0}".format([c.local_port for c in conns if not c.established])) print("== Established ==") print("Some established connections: {0}".format([c for c in conns if c.established][:3])) TARGET_HOST = "localhost" TARGET_PORT = 80 print("== connection to {0}:{1} ==".format(TARGET_HOST, TARGET_PORT)) s = socket.create_connection((TARGET_HOST, TARGET_PORT)) our_connection = [c for c in windows.system.network.ipv4 if c.established and c.remote_port == TARGET_PORT and c.remote_addr == s.getpeername()[0]] print("Our connection is {0}".format(our_connection)) print("Sending YOP") s.send("YOP") print("Closing socket") our_connection[0].close() print("Sending LAIT") s.send("LAIT")
Output-New
(cmd) python network\network.py Working on ipv4 == Listening == Some listening connections: [<TCP IPV4 Listening socket on 0.0.0.0:80>, <TCP IPV4 Listening socket on 0.0.0.0:135>, <TCP IPV4 Listening socket on 0.0.0.0:445>] Listening ports are : [80, 135, 445, 902, 912, 27036, 49664, 49665, 49666, 49667, 49671, 49673, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 1120, 5556, 6463, 22885, 22886, 27060, 49330, 49331, 49794, 49795, 49867, 52541, 57125, 57138, 65000, 65001, 139, 5556, 57046, 57109, 57110, 57143, 57144, 139, 5556, 139, 5556] == Established == Some established connections: [<TCP IPV4 Connection 127.0.0.1:912 -> 127.0.0.1:49488>, <TCP IPV4 Connection 127.0.0.1:912 -> 127.0.0.1:52332>, <TCP IPV4 Connection 127.0.0.1:49488 -> 127.0.0.1:912>] == connection to localhost:80 == Our connection is [<TCP IPV4 Connection 127.0.0.1:57167 -> 127.0.0.1:80>] Sending YOP Closing socket Sending LAIT Traceback (most recent call last): File "network\network.py", line 34, in <module> s.send("LAIT") socket.error: [Errno 10054] An existing connection was forcibly closed by the remote host
18.6. Registry¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows registry = windows.system.registry print("Registry is <{0}>".format(registry)) current_user = registry("HKEY_CURRENT_USER") print("HKEY_CURRENT_USER is <{0}>".format(current_user)) subkeys_name = [s.name for s in current_user.subkeys] print("HKEY_CURRENT_USER subkeys names are:") pprint.pprint(subkeys_name) print("Opening 'Software' in HKEY_CURRENT_USER: {0}".format(current_user("Software"))) print("We can also open it in one access: {0}".format(registry(r"HKEY_CURRENT_USER\Sofware"))) print("Looking at CurrentVersion") windows_info = registry("HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion") print("Key is {0}".format(windows_info)) print("values are:") pprint.pprint(windows_info.values) registered_owner = windows_info["RegisteredOwner"] print("registered owner = <{0}>".format(registered_owner))
Output
(cmd) python registry\registry.py Registry is <<windows.winobject.registry.Registry object at 0x061F89F0>> HKEY_CURRENT_USER is <<PyHKey "HKEY_CURRENT_USER">> HKEY_CURRENT_USER subkeys names are: ['AppEvents', 'AppXBackupContentType', 'Console', 'Control Panel', 'Environment', 'EUDC', 'Keyboard Layout', 'Network', 'Printers', 'Software', 'System', 'Uninstall', 'Volatile Environment'] Opening 'Software' in HKEY_CURRENT_USER: <PyHKey "HKEY_CURRENT_USER\Software"> We can also open it in one access: <PyHKey "HKEY_CURRENT_USER\Sofware"> Looking at CurrentVersion Key is <PyHKey "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion"> values are: [KeyValue(name='SoftwareType', value=u'System', type=1), KeyValue(name='RegisteredOwner', value=u'hakril', type=1), ... KeyValue(name='PathName', value=u'C:\\WINDOWS', type=1)] registered owner = <KeyValue(name='RegisteredOwner', value=u'hakril', type=1)>
18.7. Scheduled tasks¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.generated_def as gdef tscheduler = windows.system.task_scheduler print("Task scheduler is {0}".format(tscheduler)) root = tscheduler.root print("Root folder is {0}".format(root)) print ("Listing sub folders") for subfolder in root.folders: print(" * {0}".format(subfolder)) last_name = subfolder.name demo_folder = "\Microsoft\Windows\AppID" print("Manually opening subfolder <{0}>".format(demo_folder)) subfolder = root(demo_folder) print("Working into {0}".format(subfolder)) for task in subfolder.tasks: print(" * {task.name}".format(task=task)) print("") print("Analysing task {task}".format(task=task)) print(" * Name: <{task.name}>".format(task=task)) print(" * Path: <{task.path}>".format(task=task)) print(" * Definition: <{task.definition}>".format(task=task)) print("Listing actions:") for action in task.definition.actions: print(" * Action: <{action}>".format(action=action)) print(" * Type: <{action.type!r}>".format(action=action)) if getattr(action, "path", None): print(" * path: <{action.path}>".format(action=action)) print(" * arguments: <{action.arguments}>".format(action=action)) # import pdb;pdb.set_trace() print("Listing triggers:") for trigger in task.definition.triggers: print(" * Trigger type: <{trigger.type!r}>".format(trigger=trigger)) print("") DEMO_FOLDER_NAME = "PFW_DEMO_FOLDER" DEMO_TASK_NAME = "PFW_DEMO_TASK" print("Creating folder <{0}>".format(DEMO_FOLDER_NAME)) demo_folder = root.create_folder(DEMO_FOLDER_NAME) print("Demo folder is {0}".format(demo_folder)) print("Creating Task definition") # Create a Task definition new_task_definition = tscheduler.create() actions = new_task_definition.actions # Add an TASK_ACTION_EXEC action to the task def new_action = actions.create(gdef.TASK_ACTION_EXEC) new_action.path = sys.executable new_action.arguments = "-c 'Hello !'" # Register the new task under 'DEMO_TASK_NAME' print("Registering task definition as <{0}> in <{1}>".format(DEMO_TASK_NAME, demo_folder)) new_task = demo_folder.register(DEMO_TASK_NAME, new_task_definition) print("Created task is {0}".format(new_task)) print("Deleting the demo task") del demo_folder[DEMO_TASK_NAME] print("Deleting the demo folder") root.delete_folder(DEMO_FOLDER_NAME)
Output
(cmd) python scheduled_tasks\scheduled_task.py Task scheduler is <TaskService at 0x6f2a1c0> Root folder is <TaskFolder "\" at 0x6f2a2b0> Listing sub folders * <TaskFolder "\ASUS" at 0x6f2a350> * <TaskFolder "\Intel" at 0x6f2a300> * <TaskFolder "\Microsoft" at 0x6f2a3a0> Manually opening subfolder <\Microsoft\Windows\AppID> Working into <TaskFolder "\Microsoft\Windows\AppID" at 0x6f2a3f0> * PolicyConverter * SmartScreenSpecific * VerifiedPublisherCertStoreCheck Analysing task <Task "VerifiedPublisherCertStoreCheck" at 0x6f2a300> * Name: <VerifiedPublisherCertStoreCheck> * Path: <\Microsoft\Windows\AppID\VerifiedPublisherCertStoreCheck> * Definition: <<TaskDefinition at 0x6f2a3a0>> Listing actions: * Action: <<ExecAction at 0x6f2a350>> * Type: <TASK_ACTION_EXEC(0x0L)> * path: <%windir%\system32\appidcertstorecheck.exe> * arguments: <None> Listing triggers: * Trigger type: <TASK_TRIGGER_BOOT(0x8L)> Creating folder <PFW_DEMO_FOLDER> Demo folder is <TaskFolder "\PFW_DEMO_FOLDER" at 0x6f2a3a0> Creating Task definition Registering task definition as <PFW_DEMO_TASK> in <<TaskFolder "\PFW_DEMO_FOLDER" at 0x6f2a3a0>> Created task is <Task "PFW_DEMO_TASK" at 0x6f2a530> Deleting the demo task Deleting the demo folder
18.8. Event Log¶
import windows import windows.generated_def as gdef evtlogmgr = windows.system.event_log print("Event log Manager is: {0}".format(evtlogmgr)) print("They are <{0}> channels".format(len(list(evtlogmgr.channels)))) print("They are <{0}> publishers".format(len(list(evtlogmgr.publishers)))) FIREWALL_CHANNEL = "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall" print("Openning channel <{0}>".format(FIREWALL_CHANNEL)) evtchan = evtlogmgr[FIREWALL_CHANNEL] print("Channel is {0}".format(evtchan)) # Note that `evtchan.events` is an alias for `evtchan.query().all()` print("The channel contains <{0}> events".format(len(evtchan.events))) print("") EVT_QUERY = "Event/EventData[Data='C:\\WINDOWS\\System32\\svchost.exe'] and Event/System[EventID=2006]" print("""Querying "{0}">""".format(EVT_QUERY)) query = evtchan.query(EVT_QUERY) print("Query is {0}".format(query)) event_list = list(query) print("List contains {0} event".format(len(event_list))) event = event_list[0] print("") print("First event is {0}".format(event)) print("System values:") print(" * ID: {0}".format(event.id)) print(" * version: {0}".format(event.version)) print(" * level: {0}".format(event.level)) print(" * opcode: {0}".format(event.opcode)) print(" * time_created: {0}".format(event.time_created)) print(" * ID: {0}".format(event.id)) print("Event specific values:") for name, value in event.data.items(): print(" * <{0}> -> <{1}>".format(name, value)) print("") evtmeta = event.metadata print("Event metadata is {0}".format(evtmeta)) print(" * id : {0}".format(evtmeta.id)) print(" * channel_id : {0}".format(evtmeta.channel_id)) print(" * message_id : {0}".format(evtmeta.message_id)) print(" * event_data : {0}".format(evtmeta.event_data)) print(" * EventData template :\n{0}".format(evtmeta.template.replace("\r\n", "\n"))) print("") print("Exploring complex Evt types:") print("Channel is still {0}".format(evtchan)) print("Channel config is {0}".format(evtchan.config)) publisher = evtchan.config.publisher print("Channel publisher is {0}".format(publisher)) print("Channel publisher metadata is {0}".format(publisher.metadata)) print("Publisher's channels are:") for chan in publisher.metadata.channels: print(" * {0}".format(chan)) print("Some publisher's event metadata are:") for evtmeta in list(publisher.metadata.events_metadata)[:3]: print(" * {0}: id={1}".format(evtmeta, evtmeta.id))
Output
(cmd) python event_log\eventlog.py Event log Manager is: <windows.winobject.event_log.EvtlogManager object at 0x0592DB70> They are <1155> channels They are <1179> publishers Openning channel <Microsoft-Windows-Windows Firewall With Advanced Security/Firewall> Channel is <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall"> The channel contains <1037> events Querying "Event/EventData[Data='C:\WINDOWS\System32\svchost.exe'] and Event/System[EventID=2006]"> Query is <EvtQuery object at 0x06E9F440> List contains 304 event First event is <EvtEvent id="2006" time="2018-05-06 08:03:06.210109"> System values: * ID: 2006 * version: 0 * level: 4 * opcode: 0 * time_created: 131700673862101088 * ID: 2006 Event specific values: * <ModifyingUser> -> <108703760> * <RuleName> -> <ByteCodeGeneration> * <ModifyingApplication> -> <C:\WINDOWS\System32\svchost.exe> * <RuleId> -> <{318EF1CF-A3FA-4B04-8AAC-712276661117}> Event metadata is <EventMetadata object at 0x06EB96C0> * id : 2006 * channel_id : 16 * message_id : 2986346454 * event_data : [u'RuleId', u'RuleName', u'ModifyingUser', u'ModifyingApplication'] * EventData template : <template xmlns="http://schemas.microsoft.com/win/2004/08/events"> <data name="RuleId" inType="win:UnicodeString" outType="xs:string"/> <data name="RuleName" inType="win:UnicodeString" outType="xs:string"/> <data name="ModifyingUser" inType="win:SID" outType="xs:string"/> <data name="ModifyingApplication" inType="win:UnicodeString" outType="xs:string"/> </template> Exploring complex Evt types: Channel is still <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall"> Channel config is <ChannelConfig "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall"> Channel publisher is <EvtPublisher "Microsoft-Windows-Windows Firewall With Advanced Security"> Channel publisher metadata is <PublisherMetadata "Microsoft-Windows-Windows Firewall With Advanced Security"> Publisher's channels are: * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/Firewall"> * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/ConnectionSecurity"> * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/FirewallVerbose"> * <EvtChannel "Microsoft-Windows-Windows Firewall With Advanced Security/ConnectionSecurityVerbose"> * <EvtChannel "Network Isolation Operational"> Some publisher's event metadata are: * <EventMetadata object at 0x06EBE710>: id=2000 * <EventMetadata object at 0x06EBE7B0>: id=2001 * <EventMetadata object at 0x06EBE3F0>: id=2002
18.9. Object manager¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.generated_def as gdef object_manager = windows.system.object_manager print("Object manager is {0}".format(object_manager)) root = object_manager.root print("Root object is {0}".format(root)) print("") print("Listing some of root-subobject:") # Kernel object of type 'Directory' are iterable for i, (name, obj) in enumerate(root.items()): print(" * {0}: {1}".format(name, obj)) if i == 3: break print("") print(r"Retrieving <\Rpc Control\lsasspirpc>:") # You can retrieve this value in one request x1 = root[r"\Rpc Control\lsasspirpc"] # Sub-directory also allow __getitem__ x2 = root["Rpc Control"]["lsasspirpc"] # You can directly request the object manager that will request `root` x3 = object_manager[r"\Rpc Control\lsasspirpc"] assert x1.fullname == x2.fullname == x3.fullname lsasspirpc = x1 print("Object is: {0}".format(lsasspirpc)) print(" * name: <{0}>".format(lsasspirpc.name)) print(" * path: <{0}>".format(lsasspirpc.path)) print(" * fullname: <{0}>".format(lsasspirpc.fullname)) print(" * type: <{0}>".format(lsasspirpc.type)) print(" * target: <{0}>".format(lsasspirpc.target)) # None on non-symlink print("") print("Looking for a SymbolicLink in <ArcName>") slo = [o for o in root["ArcName"].values() if o.type == "SymbolicLink"][0] print("Object is: {0}".format(slo)) print(" * name: <{0}>".format(slo.name)) print(" * target: <{0}>".format(slo.target))
Output
(cmd) python object_manager\object_manager.py Object manager is <windows.winobject.object_manager.ObjectManager object at 0x0370ED10> Root object is <KernelObject "\" (type="Directory")> Listing some of root-subobject: * PendingRenameMutex: <KernelObject "\PendingRenameMutex" (type="Mutant")> * ObjectTypes: <KernelObject "\ObjectTypes" (type="Directory")> * storqosfltport: <KernelObject "\storqosfltport" (type="FilterConnectionPort")> * MicrosoftMalwareProtectionRemoteIoPortWD: <KernelObject "\MicrosoftMalwareProtectionRemoteIoPortWD" (type="FilterConnectionPort")> Retrieving <\Rpc Control\lsasspirpc>: Object is: <KernelObject "\Rpc Control\lsasspirpc" (type="ALPC Port")> * name: <lsasspirpc> * path: <\Rpc Control> * fullname: <\Rpc Control\lsasspirpc> * type: <ALPC Port> * target: <None> Looking for a SymbolicLink in <ArcName> Object is: <KernelObject "\ArcName\multi(0)disk(0)rdisk(0)" (type="SymbolicLink")> * name: <multi(0)disk(0)rdisk(0)> * target: <\Device\Harddisk0\Partition0>
18.9.1. find objects¶
import argparse import windows import windows.generated_def as gdef def obj_with_link(obj): target = obj.target if target is None: return str(obj) return "{0} -> <{1}>".format(obj, target) def find_name(root, findname): TODO = [root] while TODO: try: for name, obj in TODO.pop().items(): if findname in name or findname in obj.type: print("* {0}".format(obj_with_link(obj))) if obj.type == "Directory": TODO.append(obj) except gdef.NtStatusException as e: print("<{0}> -> {1}".format(obj.fullname, e.name)) parser = argparse.ArgumentParser(prog=__file__) parser.add_argument('name', nargs='?', default="ls", help='The name of the object to find') res = parser.parse_args() objmanag = windows.system.object_manager print("Looking for object name containing <{0}>".format(res.name)) find_name(objmanag.root, res.name)
Output
(cmd) python object_manager\findobj.py Looking for object name containing <ls> * <KernelObject "\KnownDlls32" (type="Directory")> * <KernelObject "\Win32kCrossSessionGlobals" (type="Section")> * <KernelObject "\KnownDlls" (type="Directory")> <\DriverStores\SYSTEM> -> STATUS_ACCESS_DENIED * <KernelObject "\Device\MailslotRedirector" (type="SymbolicLink")> -> <\Device\Mup\;MailslotRedirector> * <KernelObject "\Device\Mailslot" (type="Device")> <\Device\00000020> -> STATUS_ACCESS_DENIED <\Device\00000020> -> STATUS_ACCESS_DENIED <\Device\00000020> -> STATUS_ACCESS_DENIED <\Device\00000020> -> STATUS_ACCESS_DENIED <\Device\00000020> -> STATUS_ACCESS_DENIED <\KernelObjects\PrefetchTracesReady> -> STATUS_ACCESS_DENIED <\KnownDlls\powrprof.dll> -> STATUS_ACCESS_DENIED * <KernelObject "\RPC Control\lsapolicylookup" (type="ALPC Port")> * <KernelObject "\RPC Control\lsacap" (type="ALPC Port")> * <KernelObject "\RPC Control\lsasspirpc" (type="ALPC Port")> <\Windows\SbApiPort> -> STATUS_ACCESS_DENIED <\Windows\SbApiPort> -> STATUS_ACCESS_DENIED <\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED <\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED <\Sessions\BNOLINKS\1> -> STATUS_ACCESS_DENIED
18.10. Device manager¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.pipe import windows.generated_def as gdef devmgr = windows.system.device_manager print("Device manager is {0}".format(devmgr)) print("Enumerating the first 3 device classes") for cls in devmgr.classes[:3]: print(" * {0}".format(cls)) print("Finding device class 'System'") # Allow devmgr.classes["name"] ? system_cls = [cls for cls in devmgr.classes if cls.name == "System"][0] print(" * {0}".format(system_cls)) print(" Enumerating some devices of 'System'") devices = system_cls.devices.all() for devinst in (devices[0], devices[25], devices[35]): # Some "random" devices to have interesting ones print(" * {0}".format(devinst)) devconf = devinst.allocated_configuration if not devconf: continue print(" Enumerating allocated resources:") for resource in devconf.resources: print(" * {0}".format(resource)) # python64 samples\device\device_manager.py # Device manager is <windows.winobject.device_manager.DeviceManager object at 0x0000000003669908> # Enumerating the first 3 device classes # * <DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84> # * <DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362> # * <DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C> # Finding device class 'System' # * <DeviceClass name="System" guid=4D36E97D-E325-11CE-BFC1-08002BE10318> # Enumerating some devices of 'System' # * <DeviceInstance "Motherboard resources" (id=1)> # * <DeviceInstance "Microsoft ACPI-Compliant Embedded Controller" (id=26)> # Enumerating allocated resources: # * <IoResource : [0x00000000000062-0x00000000000062]> # * <IoResource : [0x00000000000066-0x00000000000066]> # * <DeviceInstance "High Definition Audio Controller" (id=36)> # Enumerating allocated resources: # * <MemoryResource : [0x000000f7080000-0x000000f7083fff]> # * <DevicePrivateResource type=ResType_DevicePrivate(0x8001)> # * <IrqResource : [0x00000000000011]>
Output
(cmd) python device_manager\device_manager.py
Device manager is <windows.winobject.device_manager.DeviceManager object at 0x00000000033442E8>
Enumerating the first 3 device classes
* <DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
* <DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
* <DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
Finding device class 'System'
* <DeviceClass name="System" guid=4D36E97D-E325-11CE-BFC1-08002BE10318>
Enumerating some devices of 'System'
* <DeviceInstance "Motherboard resources" (id=1)>
* <DeviceInstance "Microsoft ACPI-Compliant Embedded Controller" (id=26)>
Enumerating allocated resources:
* <IoResource : [0x00000000000062-0x00000000000062]>
* <IoResource : [0x00000000000066-0x00000000000066]>
* <DeviceInstance "High Definition Audio Controller" (id=36)>
Enumerating allocated resources:
* <MemoryResource : [0x000000f7080000-0x000000f7083fff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <IrqResource : [0x00000000000011]>
18.10.1. Enumerate devices¶
import argparse import windows import windows.generated_def as gdef devmgr = windows.system.device_manager def class_generator(filter=None): for cls in devmgr.classes: if filter and cls.name != filter.encode(): continue yield cls def main(clsfilter, enumerate_devices, print_devinst_resources, attributes): for devcls in class_generator(clsfilter): print(devcls) if not enumerate_devices: continue # Enumerate devices for devinst in devcls.devices: print(" * {0}".format(devinst)) # Print attributes if attributes: print(" Attributes:") for attr in attributes: value = getattr(devinst, attr) print(" * {0}={1}".format(attr, value)) if not print_devinst_resources: continue # Device resources devconf = devinst.allocated_configuration if not devconf: # No allocated configuration # Check boot conf ? continue for resource in devconf.resources: print(" * {0}".format(resource)) if __name__ == "__main__": parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--class", dest="clsfilter", default=None, help="The classe to list: default all") parser.add_argument("--no-print-devices", action="store_true", help="Prevent the listing of devices in the matching classes") parser.add_argument("--print-resources", action="store_true", help="Print the resources allocated to the device instance") parser.add_argument("--attributes", nargs="+", help="The list of attributes to print for each discovered device instance") args = parser.parse_args() print(args) main(args.clsfilter, enumerate_devices=not args.no_print_devices, print_devinst_resources=args.print_resources, attributes=args.attributes)
$ python64 device_manager\enum_devices.py --no-print-devices
Namespace(clsfilter=None, no_print_devices=True, print_resources=False)
<DeviceClass name="XboxComposite" guid=05F5CFE2-4733-4950-A6BB-07AAD01A3A84>
<DeviceClass name="DXGKrnl" guid=1264760F-A5C8-4BFE-B314-D56A7B44A362>
<DeviceClass name="RemotePosDevice" guid=13E42DFA-85D9-424D-8646-28A70F864F9C>
<DeviceClass name="DigitalMediaDevices" guid=14B62F50-3F15-11DD-AE16-0800200C9A66>
<DeviceClass name="PrintQueue" guid=1ED2BBF9-11F0-4084-B21F-AD83A8E6DCDC>
<DeviceClass name="WCEUSBS" guid=25DBCE51-6C8F-4A72-8A6D-B54C2B4FC835>
<DeviceClass name="SecurityAccelerator" guid=268C95A1-EDFE-11D3-95C3-0010DC4050A5>
<DeviceClass name="HidMsr" guid=2A9FE532-0CDC-44F9-9827-76192F2CA2FB>
<DeviceClass name="SystemRecovery" guid=2DB15374-706E-4131-A0C7-D7C78EB0289A>
....
$ python64 device_manager\enum_devices.py --class Processor --attributes name manufacturer device_object_name location_paths
Namespace(attributes=['name', 'manufacturer', 'device_object_name', 'location_paths'], clsfilter='Processor', no_print_devices=False, print_resources=False)
<DeviceClass name="Processor" guid=50127DC3-0F36-415E-A6CC-4CB3BE910B65>
* <DeviceInstance "Intel Processor" (id=1)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\00000017
* location_paths=[u'ACPI(_SB_)#ACPI(CPU0)']
* <DeviceInstance "Intel Processor" (id=2)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\00000018
* location_paths=[u'ACPI(_SB_)#ACPI(CPU1)']
* <DeviceInstance "Intel Processor" (id=3)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\00000019
* location_paths=[u'ACPI(_SB_)#ACPI(CPU2)']
* <DeviceInstance "Intel Processor" (id=4)>
Attributes:
* name=Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
* manufacturer=Intel
* device_object_name=\Device\0000001a
* location_paths=[u'ACPI(_SB_)#ACPI(CPU3)']
$ python64 device_manager\enum_devices.py --class Display --print-resources
Namespace(clsfilter='Display', no_print_devices=False, print_resources=True)
<DeviceClass name="Display" guid=4D36E968-E325-11CE-BFC1-08002BE10318>
* <DeviceInstance "NVIDIA GeForce GTX 1070" (id=1)>
* <MemoryResource : [0x000000f6000000-0x000000f6ffffff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <MemoryResource : [0x000000e0000000-0x000000efffffff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <MemoryResource : [0x000000f0000000-0x000000f1ffffff]>
* <DevicePrivateResource type=ResType_DevicePrivate(0x8001)>
* <IoResource : [0x0000000000e000-0x0000000000e07f]>
...
18.11. windows.wintrust¶
import sys import os.path sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows.wintrust TARGET_FILE = r"C:\windows\system32\ntdll.dll" print("Checking signature of <{0}>".format(TARGET_FILE)) print(" is_signed: <{0}>".format(windows.wintrust.is_signed(TARGET_FILE))) print(" check_signature: <{0}>".format(windows.wintrust.check_signature(TARGET_FILE))) sign_info = windows.wintrust.full_signature_information(TARGET_FILE) print(" full_signature_information:") print(" * signed <{0}>".format(sign_info.signed)) print(" * catalog <{0}>".format(sign_info.catalog)) print(" * catalogsigned <{0}>".format(sign_info.catalogsigned)) print(" * additionalinfo <{0}>".format(sign_info.additionalinfo)) print("Checking signature of some loaded DLL") for module in windows.current_process.peb.modules[:5]: path = module.fullname is_signed = windows.wintrust.is_signed(path) if is_signed: print("<{0}> : {1}".format(path, is_signed)) else: sign_info = windows.wintrust.full_signature_information(path) print("<{0}> : {1} ({2})".format(path, is_signed, sign_info[3]))
Output
(cmd) python crypto\wintrust.py Checking signature of <C:\windows\system32\ntdll.dll> is_signed: <True> check_signature: <0> full_signature_information: * signed <True> * catalog <C:\Windows\system32\CatRoot\{F750E6C3-38EE-11D1-85E5-00C04FC295EE}\Microsoft-Windows-Client-Desktop-Required-Package051420~31bf3856ad364e35~amd64~~10.0.22621.3593.cat> * catalogsigned <True> * additionalinfo <0> Checking signature of some loaded DLL <c:\users\cleme\appdata\local\programs\python\python311\python.exe> : True <c:\windows\system32\ntdll.dll> : True <c:\windows\system32\kernel32.dll> : True <c:\windows\system32\kernelbase.dll> : True <c:\windows\system32\ucrtbase.dll> : True
18.12. VectoredException()¶
18.12.1. In local process¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import ctypes import windows from windows.winobject.exception import VectoredException import windows.generated_def.windef as windef from windows.generated_def.winstructs import * @VectoredException def handler(exc): print("==Entry of VEH handler==") if exc[0].ExceptionRecord[0].ExceptionCode == EXCEPTION_ACCESS_VIOLATION: target_addr = ctypes.cast(exc[0].ExceptionRecord[0].ExceptionInformation[1], ctypes.c_void_p).value print("Instr at {0} accessed to addr {1}".format(hex(exc[0].ExceptionRecord[0].ExceptionAddress), hex(target_addr))) print("Resetting page protection to <PAGE_READWRITE>") windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_READWRITE) exc[0].ContextRecord[0].EEFlags.TF = 1 return windef.EXCEPTION_CONTINUE_EXECUTION else: print("Exception of type {0}".format(exc[0].ExceptionRecord[0].ExceptionCode)) print("Resetting page protection to <PAGE_NOACCESS>") windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_NOACCESS) return windef.EXCEPTION_CONTINUE_EXECUTION windows.winproxy.AddVectoredExceptionHandler(0, handler) target_page = windows.current_process.virtual_alloc(0x1000) print("Protected page is at <{0}>".format(hex(target_page))) print("Setting page protection to <PAGE_NOACCESS>") windows.winproxy.VirtualProtect(target_page, 0x1000, windef.PAGE_NOACCESS) print("") v = ctypes.c_uint.from_address(target_page).value print("Value 1 read") print("") v = ctypes.c_uint.from_address(target_page + 0x10).value print("Value 2 read")
Output
(cmd) python process\veh_segv.py Protected page is at <0x289b6bc0000> Setting page protection to <PAGE_NOACCESS> ==Entry of VEH handler== Instr at 0x7ff8bda3e718 accessed to addr 0x289b6bc0000 Resetting page protection to <PAGE_READWRITE> ==Entry of VEH handler== Exception of type EXCEPTION_SINGLE_STEP(0x80000004) Resetting page protection to <PAGE_NOACCESS> Value 1 read ==Entry of VEH handler== Instr at 0x7ff8bda3e718 accessed to addr 0x289b6bc0010 Resetting page protection to <PAGE_READWRITE> ==Entry of VEH handler== Exception of type EXCEPTION_SINGLE_STEP(0x80000004) Resetting page protection to <PAGE_NOACCESS> Value 2 read
18.12.2. In remote process¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.test from windows.generated_def.winstructs import * python_code = """ import windows import ctypes import windows from windows.winobject.exception import VectoredException import windows.generated_def.windef as windef from windows.generated_def.winstructs import * windows.utils.create_console() module_to_trace = "gdi32.dll" nb_repeat = [5] @VectoredException def handler(exc): if exc[0].ExceptionRecord[0].ExceptionCode == EXCEPTION_ACCESS_VIOLATION: print("") target_addr = ctypes.cast(exc[0].ExceptionRecord[0].ExceptionInformation[1], ctypes.c_void_p).value print("Instr at {0} accessed to addr {1} ({2})".format(hex(exc[0].ExceptionRecord[0].ExceptionAddress), hex(target_addr), module_to_trace)) windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_EXECUTE_READWRITE) nb_repeat[0] -= 1 if nb_repeat[0]: exc[0].ContextRecord[0].EEFlags.TF = 1 else: print("No more tracing !") return windef.EXCEPTION_CONTINUE_EXECUTION else: print("Exception of type {0}".format(exc[0].ExceptionRecord[0].ExceptionCode)) print("Resetting page protection to <PAGE_READWRITE>") windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_READWRITE) return windef.EXCEPTION_CONTINUE_EXECUTION windows.winproxy.AddVectoredExceptionHandler(0, handler) print("Tracing execution in module: <{0}>".format(module_to_trace)) module = [x for x in windows.current_process.peb.modules if x.name == module_to_trace][0] target_page = module.baseaddr code_size = module.pe.get_OptionalHeader().SizeOfCode print("Protected page is at {0}".format(hex(target_page))) windows.winproxy.VirtualProtect(target_page, code_size, windef.PAGE_READWRITE) """ c = windows.test.pop_proc_64(dwCreationFlags=CREATE_SUSPENDED) x = c.execute_python(python_code) c.threads[0].resume() import time time.sleep(0.1) for t in c.threads: t.suspend() time.sleep(1) c.exit()
Output:
(cmd λ) python.exe process\remote_veh_segv.py (In another console) Tracing execution in module: <gdi32.dll> Protected page is at 0x7ffa3c700000L Instr at 0x7ffa3c70f0f0L accessed to addr 0x7ffa3c70f0f0L (gdi32.dll) Exception of type EXCEPTION_SINGLE_STEP(0x80000004L) Resetting page protection to <PAGE_READWRITE> Instr at 0x7ffa3c70f0f5L accessed to addr 0x7ffa3c70f0f5L (gdi32.dll) Exception of type EXCEPTION_SINGLE_STEP(0x80000004L) Resetting page protection to <PAGE_READWRITE> Instr at 0x7ffa3c70f0faL accessed to addr 0x7ffa3c70f0faL (gdi32.dll) Exception of type EXCEPTION_SINGLE_STEP(0x80000004L) Resetting page protection to <PAGE_READWRITE> Instr at 0x7ffa3c70f0ffL accessed to addr 0x7ffa3c70f0ffL (gdi32.dll) Exception of type EXCEPTION_SINGLE_STEP(0x80000004L) Resetting page protection to <PAGE_READWRITE> Instr at 0x7ffa3c70f100L accessed to addr 0x7ffa3c70f100L (gdi32.dll) No more tracing !
18.13. Debugging¶
18.13.1. Debugger¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.test import windows.debug from windows.generated_def.winstructs import * class MyDebugger(windows.debug.Debugger): def on_exception(self, exception): code = exception.ExceptionRecord.ExceptionCode addr = exception.ExceptionRecord.ExceptionAddress print("Got exception {0} at 0x{1:x}".format(code, addr)) class PrintUnicodeString(windows.debug.Breakpoint): def __init__(self, addr, argument_position): super(PrintUnicodeString, self).__init__(addr) self.arg_pos = argument_position def trigger(self, dbg, exc): p = dbg.current_process t = dbg.current_thread esp = t.context.Esp unicode_string_addr = p.read_ptr(esp + (self.arg_pos + 1) * 4) wstring_addr = p.read_ptr(unicode_string_addr + 4) dll_loaded = p.read_wstring(wstring_addr).lower() print("Loading <{0}>".format(dll_loaded)) if dll_loaded.endswith("comctl32.dll"): print("Ask to load <comctl32.dll>: exiting process") dbg.current_process.exit() calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS) d = MyDebugger(calc) d.add_bp(PrintUnicodeString("ntdll!LdrLoadDll", argument_position=2)) d.loop()
Output
(cmd) python debug\debugger_print_LdrLoaddll.py Got exception EXCEPTION_BREAKPOINT(0x80000003) at 0x7ff8e5aebd44 Loading <kernel32.dll> Got exception UNKNOW_EXCEPTION(0x4000001f) at 0x77e58727 Loading <api-ms-win-core-synch-l1-2-0> Loading <api-ms-win-core-fibers-l1-1-1> Loading <api-ms-win-core-fibers-l1-1-0> Loading <api-ms-win-core-synch-l1-2-0> Loading <api-ms-win-core-localization-l1-2-1> Loading <kernel32> Loading <api-ms-win-core-string-l1-1-0> Loading <api-ms-win-core-datetime-l1-1-1> Loading <api-ms-win-core-localization-obsolete-l1-2-0> Loading <c:\windows\system32\imm32.dll> Loading <comctl32.dll> Ask to load <comctl32.dll>: exiting process
18.13.1.1. on_setup¶
import windows.debug class MySetupDebugger(windows.debug.Debugger): def on_setup(self): super(MySetupDebugger, self).on_setup() print("Setup called: {0}".format(self.current_process)) def on_exception(self, exc): print("Exception: {0}".format(exc.ExceptionRecord.ExceptionCode)) def on_exit_process(self, evt): print("Process exit: {0}".format(self.current_process)) class SimpleDebugger(windows.debug.Debugger): def on_exception(self, exc): print("Exception: {0}".format(exc.ExceptionRecord.ExceptionCode)) def on_exit_process(self, evt): print("Process exit: {0}".format(self.current_process)) print("== With on_setup ==") dbg = MySetupDebugger.debug(r"c:\windows\system32\whoami.exe") dbg.loop() print("\n== Without on_setup ==") dbg = SimpleDebugger.debug(r"c:\windows\system32\whoami.exe") dbg.loop()
Output
(cmd) python debug\debugger_on_setup.py == With on_setup == Setup called: <WinProcess "whoami.exe" pid 29796 at 0x4ccb790> <whoami output> Process exit: <WinProcess "whoami.exe" pid 29796 (DEAD) at 0x4ccb790> == Without on_setup == Exception: EXCEPTION_BREAKPOINT(0x80000003L) <whoami output> Process exit: <WinProcess "whoami.exe" pid 33328 (DEAD) at 0x4ccbdb0>
18.13.1.2. Single stepping¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.test import windows.debug import windows.native_exec.simple_x86 as x86 from windows.generated_def.winstructs import * class MyDebugger(windows.debug.Debugger): def __init__(self, *args, **kwargs): super(MyDebugger, self).__init__(*args, **kwargs) self.single_step_counter = 0 def on_exception(self, exception): code = exception.ExceptionRecord.ExceptionCode addr = exception.ExceptionRecord.ExceptionAddress print("Got exception {0} at 0x{1:x}".format(code, addr)) def on_single_step(self, exception): code = exception.ExceptionRecord.ExceptionCode addr = exception.ExceptionRecord.ExceptionAddress print("Got single_step {0} at 0x{1:x}".format(code, addr)) self.single_step_counter -= 1 if self.single_step_counter > 0: return self.single_step() else: print("No more single step: exiting") self.current_process.exit() class SingleStepOnWrite(windows.debug.MemoryBreakpoint): """Check that BP/dbg can trigger single step and that instruction follows""" def trigger(self, dbg, exc): fault_addr = exc.ExceptionRecord.ExceptionInformation[1] eip = dbg.current_thread.context.pc print("Instruction at <{0:#x}> wrote at <{1:#x}>".format(eip, fault_addr)) dbg.single_step_counter = 4 return dbg.single_step() calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS) d = MyDebugger(calc) code = calc.virtual_alloc(0x1000) data = calc.virtual_alloc(0x1000) injected = x86.MultipleInstr() injected += x86.Mov("EAX", 0) injected += x86.Mov(x86.deref(data), "EAX") injected += x86.Add("EAX", 4) injected += x86.Mov(x86.deref(data + 4), "EAX") injected += x86.Add("EAX", 8) injected += x86.Mov(x86.deref(data + 8), "EAX") injected += x86.Nop() injected += x86.Nop() injected += x86.Ret() calc.write_memory(code, injected.get_code()) d.add_bp(SingleStepOnWrite(data, size=8, events="W")) calc.create_thread(code, 0) d.loop()
Output
(cmd) python debug\debugger_membp_singlestep.py Got exception EXCEPTION_BREAKPOINT(0x80000003) at 0x7ff8e5aebd44 Got exception UNKNOW_EXCEPTION(0x4000001f) at 0x77e58727 Instruction at <0xa50006> wrote at <0xa60000> Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa5000c Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50011 Instruction at <0xa50011> wrote at <0xa60004> Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50017 Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa5001c Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50022 Got single_step UNKNOW_EXCEPTION(0x4000001e) at 0xa50023 No more single step: exiting
18.13.1.3. windows.debug.FunctionCallBP¶
import sys import os.path import pprint import threading sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.test import windows.debug import windows.generated_def as gdef # The debugge python will just print the result of # 3 call to IsDebuggerPresent TARGET_PYTHON_CODE = '"\ import ctypes;\ import time;\ IsDebuggerPresent = ctypes.windll.kernel32.IsDebuggerPresent;\ print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\ time.sleep(1);\ print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\ time.sleep(1);\ print(\'[DEBUGGE] IsDebuggerPresent={0}\'.format(IsDebuggerPresent()));\ "' # This breakpoint now nothing about its target argument # It only now how to break at the return of the function # This allow us to change the return value of any function class IncrementReturnValue(windows.debug.FunctionCallBP): def __init__(self, addr, initialvalue): super(IncrementReturnValue, self).__init__(addr) self.initialvalue = initialvalue def trigger(self, dbg, exc): # Ask to break a the return of the function # callback is ret_trigger self.break_on_ret(dbg, exc) def ret_trigger(self, dbg, exc): ctx = dbg.current_thread.context # Func result is an alias to EAX/RAX ctx.func_result = self.initialvalue # Set the new context for the target thread dbg.current_thread.set_context(ctx) self.initialvalue += 1 d = windows.debug.Debugger.debug(sys.executable, [sys.executable, "-c", TARGET_PYTHON_CODE]) # We could also give the direct address of the function # But it would require to wait for the module to be loaded d.add_bp(IncrementReturnValue("kernelbase!IsDebuggerPresent", 42)) d.loop()
Output
(cmd) python debug\change_function_ret_value.py [DEBUGGE] IsDebuggerPresent=42 [DEBUGGE] IsDebuggerPresent=43 [DEBUGGE] IsDebuggerPresent=44
18.13.1.4. windows.debug.FunctionBP¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.test import windows.debug from windows.generated_def.winstructs import * class FollowNtCreateFile(windows.debug.FunctionBP): TARGET = windows.winproxy.NtCreateFile COUNTER = 3 def trigger(self, dbg, exc): if not self.COUNTER: print("Exiting process") dbg.current_process.exit() return params = self.extract_arguments(dbg.current_process, dbg.current_thread) filename = params["ObjectAttributes"].contents.ObjectName.contents.Buffer handle_addr = params["FileHandle"].value self.data = (filename, handle_addr) self.break_on_ret(dbg, exc) def ret_trigger(self, dbg, exc): filename, handle_addr = self.data ret_value = dbg.current_thread.context.func_result # EAX / RAX depending of bitness handle_value = dbg.current_process.read_ptr(handle_addr) if ret_value: print("NtCreateFile of <{0}> FAILED (result={1:#x})".format(filename, ret_value)) return print("NtCreateFile of <{0}>: handle = {1:#x}".format(filename, handle_value)) # Manual verification fhandle = [h for h in windows.system.handles if h.dwProcessId == dbg.current_process.pid and h.wValue == handle_value] if not fhandle: raise ValueError("handle not found!") fhandle = fhandle[0] print("Handle manually found! typename=<{0}>, name=<{1}>".format(fhandle.type, fhandle.name)) print("") self.COUNTER -= 1 if __name__ == "__main__": calc = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS) d = windows.debug.Debugger(calc) d.add_bp(FollowNtCreateFile()) d.loop()
Output
(cmd) python debug\debug_functionbp.py NtCreateFile of <50173784>: handle = 0x1c4 Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Globalization\Sorting\SortDefault.nls> NtCreateFile of <50181528>: handle = 0x1ec Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Fonts\StaticCache.dat> NtCreateFile of <50195912>: handle = 0x1fc Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Branding\Basebrd\basebrd.dll> Exiting process
18.13.1.5. Debugger.attach¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.test import windows.debug from windows.generated_def.winstructs import * # Just a debugger that follow NtCreateFile and print filename & handler from debug_functionbp import FollowNtCreateFile def follow_create_file(pid): print("Finding process with pid <{0}>".format(pid)) target = [p for p in windows.system.processes if p.pid == pid][0] print("Target is {0}".format(target)) dbg = windows.debug.Debugger.attach(target) print("Debugger attached: {0}".format(dbg)) print("") dbg.add_bp(FollowNtCreateFile()) dbg.loop() if __name__ == "__main__": # Create a non-debugged process safe to debug calc = windows.test.pop_proc_32(dwCreationFlags=0) # Give ovnly the PID to follow_create_file follow_create_file(calc.pid)
Output
(cmd) python debug\attach.py Finding process with pid <27576> Target is <WinProcess "winver.exe" pid 27576 at 0x223f0862d90> Debugger attached: <windows.debug.debugger.Debugger object at 0x00000223F084A110> NtCreateFile of <54203712>: handle = 0x1c8 Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Globalization\Sorting\SortDefault.nls> NtCreateFile of <54268840>: handle = 0x1f0 Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Fonts\StaticCache.dat> NtCreateFile of <54280288>: handle = 0x200 Handle manually found! typename=<File>, name=<\Device\HarddiskVolume3\Windows\Branding\Basebrd\basebrd.dll> Exiting process
18.13.1.6. Native code tester¶
import sys import argparse import windows import windows.test import windows.debug as dbg import windows.native_exec.simple_x86 as x86 import windows.native_exec.simple_x64 as x64 from windows.generated_def import * def hexdump(string, start_addr=0): result = "" if len(string) == 0: return ascii = list("."*256) for i in range(1,0x7f): ascii[i] = chr(i) ascii[0x0] = "." ascii[0x7] = "." ascii[0x8] = "." ascii[0x9] = "." ascii[0xa] = "." ascii[0x1b] = "." ascii[0xd] = "." ascii[0xff] = "\x13" ascii = "".join(ascii) offset = 0 while (offset+0x10) <= len(string): line = string[offset:(offset+0x10)] linebuf = " %08X " % (offset + start_addr) for i in range(0,16): if i == 8: linebuf += " " linebuf += "%02X " % ord(line[i]) linebuf += " " for i in range(0,16): linebuf += ascii[ord(line[i])] result += linebuf+"\n" offset += 0x10 if (len(string) % 0x10) > 0: linebuf = " %08X " % (offset + start_addr) for i in range((len(string)-(len(string) % 0x10)),(len(string))): if i == 8: linebuf += " " linebuf += "%02X " % ord(string[i]) linebuf += " "*(0x10-(len(string) % 0x10)) linebuf += " " for i in range((len(string)-(len(string) % 0x10)),(len(string))): linebuf += ascii[ord(string[i])] result += linebuf+"\n" return result class StartStepBP(dbg.breakpoints.HXBreakpoint): def trigger(self, dbg, exc): dbg.del_bp(self) dbg.on_single_step(exc) # Trigger single step processing class CodeTesteur(dbg.Debugger): def __init__(self, process, code, register_start={}, steps=False): super(CodeTesteur, self).__init__(process) self.initial_code = code code += "\xcc" self.code_addr = self.write_code_in_target(process, code) register_start["pc"] = self.code_addr self.thread_exec = process.threads[0] self.context_exec = self.thread_exec.context self.setup_target_context(self.context_exec, register_start) print("Startup context is:") self.context_exec.dump() print(self.context_exec.EEFlags) self.thread_exec.suspend() self.thread_exec.set_context(self.context_exec) self.thread_exec.resume() self.init_breakpoint = False # Test code if steps: self.steps = True self.add_bp(StartStepBP(self.code_addr)) self.last_context = self.context_exec self.last_code = self.initial_code def write_code_in_target(self, process, code): addr = process.virtual_alloc(len(code)) process.write_memory(addr, code) return addr def setup_target_context(self, ctx, register_start): for name, value in register_start.items(): if not hasattr(ctx, name): raise ValueError("Unknown register to setup <{0}>".format(name)) setattr(ctx, name, value) def on_single_step(self, x): print("* New step !") # print("EIP = {0:#x}".format(self.current_thread.context.pc)) self.report_ctx_diff(self.last_context, self.current_thread.context) self.last_context = self.current_thread.context self.last_code = self.report_code_diff(self.last_code) self.single_step() print ("") def on_exception(self, x): exc_code = x.ExceptionRecord.ExceptionCode exc_addr = x.ExceptionRecord.ExceptionAddress if not self.init_breakpoint and exc_code == EXCEPTION_BREAKPOINT: self.init_breakpoint = True return ctx = self.current_thread.context print("==Post-exec context==") ctx.dump() print(ctx.EEFlags) if exc_code == EXCEPTION_BREAKPOINT and exc_addr == self.context_exec.pc + len(self.initial_code): print("<Normal terminaison>") else: print("<{0}> at <{1:#x}>".format(exc_code, exc_addr)) if exc_code == EXCEPTION_ACCESS_VIOLATION: exc_infos = x.ExceptionRecord.ExceptionInformation read_write = "write" if exc_infos[0] else "read" target_addr = exc_infos[1] print(" * Access of type <{0}> at address <{1:#x}>".format(read_write, target_addr)) self.report_ctx_diff(self.context_exec, ctx) self.report_code_diff(self.initial_code) self.current_process.exit() return def report_ctx_diff(self, start, now): print("== DIFF ==") for name, start_value in start.regs(): now_value = getattr(now, name) if start_value != now_value: diff = now_value - start_value print("{0}: {1:#x} -> {2:#x} ({3:+#x})".format(name, start_value, now_value, diff)) if start.sp > now.sp: print("Negative Stack: dumping:") data = self.current_process.read_memory(now.sp, start.sp - now.sp) print(hexdump(data, start.sp)) def report_code_diff(self, initial_code): code_size = len(initial_code) final_code = self.current_process.read_memory(self.code_addr, code_size) if final_code == initial_code: return initial_code print("== Executable code DIFF == ") print("Before:") print(hexdump(initial_code, self.code_addr)) print("After:") print(hexdump(final_code, self.code_addr)) return final_code def test_code_x86(code, regs=None, raw=False, steps=False, **kwargs): print("Testing x86 code") process = windows.test.pop_proc_32(dwCreationFlags=DEBUG_PROCESS) if raw: code = code.replace(" ", "").decode('hex') else: code = x86.assemble(code) start_register = {} if regs: for name_value in regs.split(";"): name, value = name_value.split("=") name = name.strip().capitalize() if name == "Eflags": name = "EFlags" value = int(value.strip(), 0) start_register[name] = value x = CodeTesteur(process, code, start_register, steps) x.loop() def test_code_x64(code, regs=None, raw=False, **kwargs): print("Testing x64 code") if windows.current_process.bitness == 32: raise ValueError("Cannot debug a 64b process from 32b python") process = windows.test.pop_proc_64(dwCreationFlags=DEBUG_PROCESS) if raw: code = code.replace(" ", "").decode('hex') else: code = x64.assemble(code) start_register = {} if regs: for name_value in regs.split(";"): name, value = name_value.split("=") name = name.strip().capitalize() if name == "Eflags": name = "EFlags" value = int(value.strip(), 0) start_register[name] = value x = CodeTesteur(process, code, start_register) x.loop() parser = argparse.ArgumentParser(prog=__file__) parser.add_argument('--x64', action='store_const', dest="func", const=test_code_x64, default=test_code_x86, help='Code is x64') parser.add_argument('--raw', action='store_true', help='argument is raw assembled code (in hex)') parser.add_argument('--steps', action='store_true', help='Get all step info') parser.add_argument('code', help='The code to execute') parser.add_argument('regs', nargs="?", help='The default values of the registers') res = parser.parse_args() res.func(**res.__dict__)
Ouput
(cmd) python.exe test_code.py "mov eax, 0x42424242" "eax=0x11223344" Testing x86 code Startup context is: Eip -> 0x3f0000L Esp -> 0x3bfae4L Eax -> 0x11223344L Ebx -> 0x5a6000L Ecx -> 0x0L Edx -> 0x0L Ebp -> 0x0L Edi -> 0x0L Esi -> 0x0L EFlags -> 0x202L EEflags(0x202L:IF) ==Post-exec context== Eip -> 0x3f0007L Esp -> 0x3bfae4L Eax -> 0x42424242L Ebx -> 0x5a6000L Ecx -> 0x0L Edx -> 0x0L Ebp -> 0x0L Edi -> 0x0L Esi -> 0x0L EFlags -> 0x202L EEflags(0x202L:IF) <Normal terminaison> ==DIFF== Eip: 0x3f0000 -> 0x3f0007 (+0x7) Eax: 0x11223344 -> 0x42424242 (+0x31200efe) (cmd) python64 test_code.py --x64 "mov r15, 0x11223344; push r14; call r15" "rcx=1; r14=0x4242424243434343" Testing x64 code Startup context is: Rip -> 0x205a1d60000L Rsp -> 0xe24a88fa88L Rax -> 0x0L Rbx -> 0x0L Rcx -> 0x1L Rdx -> 0xe24aaf9000L Rbp -> 0x0L Rdi -> 0x0L Rsi -> 0x0L R8 -> 0x0L R9 -> 0x0L R10 -> 0x0L R11 -> 0x0L R12 -> 0x0L R13 -> 0x0L R14 -> 0x4242424243434343L R15 -> 0x0L EFlags -> 0x200L EEflags(0x200L:IF) ==Post-exec context== Rip -> 0x11223344L Rsp -> 0xe24a88fa78L Rax -> 0x0L Rbx -> 0x0L Rcx -> 0x1L Rdx -> 0xe24aaf9000L Rbp -> 0x0L Rdi -> 0x0L Rsi -> 0x0L R8 -> 0x0L R9 -> 0x0L R10 -> 0x0L R11 -> 0x0L R12 -> 0x0L R13 -> 0x0L R14 -> 0x4242424243434343L R15 -> 0x11223344L EFlags -> 0x10202L EEflags(0x10202L:IF|RF) <EXCEPTION_ACCESS_VIOLATION(0xc0000005L)> at <0x11223344> ==DIFF== Rip: 0x205a1d60000 -> 0x11223344 (-0x20590b3ccbc) Rsp: 0xe24a88fa88 -> 0xe24a88fa78 (-0x10) R15: 0x0 -> 0x11223344 (+0x11223344) EFlags: 0x200 -> 0x10202 (+0x10002) Negative Stack: dumping: E24A88FA88 0C 00 D6 A1 05 02 00 00 43 43 43 43 42 42 42 42 ........CCCCBBBB
18.13.2. SymbolDebugger¶
import argparse import os import windows import windows.debug import windows.test parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)') args = parser.parse_args() print(args) if args.dbghelp: symbols.set_dbghelp_path(args.dbghelp) else: if "PFW_DBGHELP_PATH" not in os.environ: print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail") class MyInfoBP(windows.debug.Breakpoint): COUNT = 0 def trigger(self, dbg, exc): cursym = dbg.current_resolver[exc.ExceptionRecord.ExceptionAddress] print("Breakpoint triggered at: {0}".format(cursym)) print(repr(cursym)) MyInfoBP.COUNT += 1 if MyInfoBP.COUNT == 4: print("Quitting") dbg.current_process.exit() print("") dbg = windows.debug.SymbolDebugger.debug("C:\\windows\\system32\\notepad.exe") dbg.add_bp(MyInfoBP("kernelbase!CreateFileInternal")) dbg.add_bp(MyInfoBP("ntdll!LdrpInitializeProcess")) dbg.loop()
Output
(cmd) python debug\symbol_debugger.py Namespace(dbghelp=None) Breakpoint triggered at: ntdll!LdrpInitializeProcess <SymbolInfoW name="LdrpInitializeProcess" start=0x7ff8e5aecca0 tag=SymTagPublicSymbol> Breakpoint triggered at: KERNELBASE!CreateFileInternal <SymbolInfoW name="CreateFileInternal" start=0x7ff8e33b4b70 tag=SymTagPublicSymbol> Breakpoint triggered at: KERNELBASE!CreateFileInternal <SymbolInfoW name="CreateFileInternal" start=0x7ff8e33b4b70 tag=SymTagPublicSymbol> Breakpoint triggered at: KERNELBASE!CreateFileInternal <SymbolInfoW name="CreateFileInternal" start=0x7ff8e33b4b70 tag=SymTagPublicSymbol> Quitting
18.13.3. LocalDebugger¶
18.13.3.1. In current process¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.debug from windows.generated_def.winstructs import * import windows.native_exec.simple_x86 as x86 class SingleSteppingDebugger(windows.debug.LocalDebugger): SINGLE_STEP_COUNT = 4 def on_exception(self, exc): code = self.get_exception_code() context = self.get_exception_context() print("EXCEPTION !!!! Got a {0!r} at 0x{1:x}".format(code, context.pc)) self.SINGLE_STEP_COUNT -= 1 if self.SINGLE_STEP_COUNT: return self.single_step() return EXCEPTION_CONTINUE_EXECUTION class RewriteBreakpoint(windows.debug.HXBreakpoint): def trigger(self, dbg, exc): context = dbg.get_exception_context() print("GOT AN HXBP at 0x{0:x}".format(context.pc)) # Rewrite the infinite loop with 2 nop windows.current_process.write_memory(self.addr, b"\x90\x90") # Ask for a single stepping return dbg.single_step() d = SingleSteppingDebugger() # Infinite loop + nop + ret code = x86.assemble("label :begin; jmp :begin; nop; ret") func = windows.native_exec.create_function(code, [PVOID]) print("Code addr = 0x{0:x}".format(func.code_addr)) # Create a thread that will infinite loop t = windows.current_process.create_thread(func.code_addr, 0) # Add a breakpoint on the infitine loop d.add_bp(RewriteBreakpoint(func.code_addr)) t.wait() print("Done!")
Output
(cmd) python debug\local_debugger.py Code addr = 0x25f8532000e GOT AN HXBP at 0x25f8532000e EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e377257d EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e5a6aa80 EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e5aafde0 EXCEPTION !!!! Got a EXCEPTION_SINGLE_STEP(0x80000004) at 0x7ff8e5aafdf4 Done!
18.13.3.2. In remote process¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import ctypes import windows import windows.test from windows.generated_def.winstructs import * remote_code = """ import windows from windows.generated_def.winstructs import * windows.utils.create_console() class YOLOHXBP(windows.debug.HXBreakpoint): def trigger(self, dbg, exc): p = windows.current_process arg_pos = 2 context = dbg.get_exception_context() esp = context.Esp unicode_string_addr = p.read_ptr(esp + (arg_pos + 1) * 4) wstring_addr = p.read_ptr(unicode_string_addr + 4) dll_loaded = p.read_wstring(wstring_addr) print("I AM LOADING <{0}>".format(dll_loaded)) d = windows.debug.LocalDebugger() exp = windows.current_process.peb.modules[1].pe.exports #windows.utils.FixedInteractiveConsole(locals()).interact() ldr = exp["LdrLoadDll"] d.add_bp(YOLOHXBP(ldr)) """ c = windows.test.pop_proc_32(dwCreationFlags=CREATE_SUSPENDED) c.execute_python(remote_code) c.threads[0].resume() import time time.sleep(2) c.exit()
Ouput:
(cmd λ) python.exe debug\local_debugger_remote_process.py (In another console) I AM LOADING <C:\Windows\system32\uxtheme.dll> I AM LOADING <C:\Windows\system32\uxtheme.dll> I AM LOADING <C:\Windows\system32\uxtheme.dll> I AM LOADING <C:\Windows\system32\uxtheme.dll> I AM LOADING <kernel32.dll> I AM LOADING <C:\Windows\WinSxS\x86_microsoft.windows.gdiplus_6595b64144ccf1df_1.1.9600.17415_none_dad8722c5bcc2d8f\gdiplus.dll> I AM LOADING <comctl32.dll> I AM LOADING <comctl32.dll> I AM LOADING <comctl32.dll> I AM LOADING <comctl32.dll> I AM LOADING <comctl32.dll> I AM LOADING <comctl32> I AM LOADING <C:\Windows\SysWOW64\oleacc.dll> I AM LOADING <OLEAUT32.DLL> I AM LOADING <C:\Windows\system32\ole32.dll> I AM LOADING <C:\Windows\system32\MSCTF.dll> I AM LOADING <C:\Windows\SysWOW64\msxml6.dll> I AM LOADING <C:\Windows\system32\shell32.dll> I AM LOADING <C:\Windows\SYSTEM32\WINMM.dll> I AM LOADING <C:\Windows\system32\ole32.dll>
18.14. Symbols¶
18.14.1. VirtualSymbolHandler¶
import os import windows import windows.generated_def as gdef from windows.debug import symbols import argparse parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)') args = parser.parse_args() print(args) if args.dbghelp: symbols.set_dbghelp_path(args.dbghelp) else: if "PFW_DBGHELP_PATH" not in os.environ: print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail") symbols.engine.options = 0 # Disable defered load sh = symbols.VirtualSymbolHandler() ntmod = sh.load_file(r"c:\windows\system32\ntdll.dll", addr=0x420000) print("Ntdll module is: {0}".format(ntmod)) print(" * name = {0}".format(ntmod.name)) print(" * addr = {0:#x}".format(ntmod.addr)) print(" * path = {0:}".format(ntmod.path)) print(" * type = {0:}".format(ntmod.type)) print(" * pdb = {0:}".format(ntmod.pdb)) print("") TEST_FUNCTION = "LdrLoadDll" print("Resolving function <{0}>".format(TEST_FUNCTION)) loaddll = sh["ntdll!" + TEST_FUNCTION] print("Symbol found !") print(" * __repr__: {0!r}".format(loaddll)) print(" * __str__: {0}".format(loaddll)) print(" * addr: {0:#x}".format(loaddll.addr)) print(" * name: {0}".format(loaddll.name)) print(" * fullname: {0}".format(loaddll.fullname)) print(" * module: {0}".format(loaddll.module)) print("") print("Loading kernelbase") kbasemod = sh.load_file(r"c:\windows\system32\kernelbase.dll", addr=0x1230000) print("Loaded modules are: {0}".format(sh.modules)) LOOKUP_ADDR = 0x1231242 print("Looking up address: {0:#x}".format(LOOKUP_ADDR)) lookupsym = sh[LOOKUP_ADDR] print("Symbol resolved !") print(" * __repr__: {0!r}".format(lookupsym)) print(" * __str__: {0}".format(lookupsym)) print(" * start: {0:#x}".format(lookupsym.start)) print(" * addr: {0:#x}".format(lookupsym.addr)) print(" * displacement: {0:#x}".format(lookupsym.displacement)) print(" * name: {0}".format(lookupsym.name)) print(" * fullname: {0}".format(lookupsym.fullname)) print(" * module: {0}".format(lookupsym.module))
Output
(cmd) python debug\symbols\virtsymdemo.py Namespace(dbghelp=None) Ntdll module is: <SymbolModule name="ntdll" type=SymPdb pdb="ntdll.pdb" addr=0x420000> * name = ntdll * addr = 0x420000 * path = c:\windows\system32\ntdll.dll * type = <SYM_TYPE SymPdb(0x3)> * pdb = c:\Symbols\ntdll.pdb\8D5D5ED5D5B8AA609A82600C14E3004D1\ntdll.pdb Resolving function <LdrLoadDll> Symbol found ! * __repr__: <SymbolInfoW name="LdrLoadDll" start=0x44a160 tag=SymTagFunction> * __str__: ntdll!LdrLoadDll * addr: 0x44a160 * name: LdrLoadDll * fullname: ntdll!LdrLoadDll * module: <SymbolModule name="ntdll" type=SymPdb pdb="ntdll.pdb" addr=0x420000> Loading kernelbase Loaded modules are: [<SymbolModule name="ntdll" type=SymPdb pdb="ntdll.pdb" addr=0x420000>, <SymbolModule name="kernelbase" type=SymPdb pdb="kernelbase.pdb" addr=0x1230000>] Looking up address: 0x1231242 Symbol resolved ! * __repr__: <SymbolInfoW name="PsspThunkWin32Nt_HANDLE_ENTRY" start=0x1231240 displacement=0x2 tag=SymTagPublicSymbol> * __str__: kernelbase!PsspThunkWin32Nt_HANDLE_ENTRY+0x2 * start: 0x1231240 * addr: 0x1231242 * displacement: 0x2 * name: PsspThunkWin32Nt_HANDLE_ENTRY * fullname: kernelbase!PsspThunkWin32Nt_HANDLE_ENTRY+0x2 * module: <SymbolModule name="kernelbase" type=SymPdb pdb="kernelbase.pdb" addr=0x1230000>
18.14.2. ProcessSymbolHandler¶
import os import argparse import windows import windows.test import windows.generated_def as gdef from windows.debug import symbols parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)') args = parser.parse_args() print(args) if args.dbghelp: symbols.set_dbghelp_path(args.dbghelp) else: if "PFW_DBGHELP_PATH" not in os.environ: print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail") if windows.current_process.bitness == 32: target = windows.test.pop_proc_32() else: target = windows.test.pop_proc_64() print("Target is {0}".format(target)) sh = symbols.ProcessSymbolHandler(target) import time;time.sleep(0.1) # Just wait for the process initialisation sh.refresh() # Refresh symbol list (Only meaningful for ProcessSymbolHandler) print("Some loaded modules are:".format()) for sm in sh.modules[:3]: print(" * {0}".format(sm)) createserv = sh["advapi32!CreateServiceEx"] print("") TEST_FUNCTION = "advapi32!CreateServiceEx" print("Resolving function <{0}>".format(TEST_FUNCTION)) createserv = sh[TEST_FUNCTION] print("Symbol found !") print(" * __repr__: {0!r}".format(createserv)) print(" * __str__: {0}".format(createserv)) print(" * addr: {0:#x}".format(createserv.addr)) print(" * name: {0}".format(createserv.name)) print(" * fullname: {0}".format(createserv.fullname)) print(" * module: {0}".format(createserv.module)) target.exit()
Output
(cmd) python debug\symbols\processsymdemo.py Namespace(dbghelp=None) Target is <WinProcess "winver.exe" pid 18600 at 0x23a15d4cdd0> Some loaded modules are: * <SymbolModule name="winver" type=SymDeferred pdb="" addr=0x7ff658a30000> * <SymbolModule name="ntdll" type=SymDeferred pdb="" addr=0x7ff8e5a10000> * <SymbolModule name="KERNEL32" type=SymDeferred pdb="" addr=0x7ff8e3760000> Resolving function <advapi32!CreateServiceEx> Symbol found ! * __repr__: <SymbolInfoW name="CreateServiceEx" start=0x7ff8e4b2d2e0 tag=SymTagPublicSymbol> * __str__: advapi32!CreateServiceEx * addr: 0x7ff8e4b2d2e0 * name: CreateServiceEx * fullname: advapi32!CreateServiceEx * module: <SymbolModule name="advapi32" type=SymPdb pdb="advapi32.pdb" addr=0x7ff8e4b10000>
18.14.3. Symbol search¶
import argparse import os import windows import windows.debug.symbols as symbols parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('pattern') parser.add_argument('file', help="The PE file to load") parser.add_argument('--addr', type=lambda x: int(x, 0), default=0, help="The load address of the PE") parser.add_argument('--tag', type=lambda x: int(x, 0), default=0) parser.add_argument('--dbghelp', help='The path of DBG help to use (default use env:PFW_DBGHELP_PATH)') args = parser.parse_args() if args.dbghelp: symbols.set_dbghelp_path(args.dbghelp) else: if "PFW_DBGHELP_PATH" not in os.environ: print("Not dbghelp path given and no environ var 'PFW_DBGHELP_PATH' sample may fail") sh = symbols.VirtualSymbolHandler() mod = sh.load_file(path=args.file, addr=args.addr) res = sh.search(args.pattern, mod=mod, tag=args.tag) print("{0} symbols found:".format(len(res))) for sym in res: print(" * {0!r}".format(sym))
Output
$ python64 debug\symbols\symsearch.py "CreateFile*" c:\windows\system32\kernelbase.dll Namespace(addr=0, dbghelp=None, file='c:\\windows\\system32\\kernelbase.dll', pattern='CreateFile*', tag=0) 16 symbols found: * <SymbolInfoA name="CreateFileInternal" start=0x180024250 tag=SymTagFunction> * <SymbolInfoA name="CreateFileMappingFromApp" start=0x1800809d0 tag=SymTagFunction> * <SymbolInfoA name="CreateFileMoniker" start=0x180087e40 tag=SymTagFunction> * <SymbolInfoA name="CreateFile2" start=0x180074550 tag=SymTagFunction> * <SymbolInfoA name="CreateFileA" start=0x1800240d0 tag=SymTagFunction> * <SymbolInfoA name="CreateFileMappingNumaW" start=0x18002ca40 tag=SymTagFunction> * <SymbolInfoA name="CreateFileMapping2" start=0x1800fb6d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileInternal" start=0x180024250 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileMappingW" start=0x18002cd00 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileDowngrade_Win7" start=0x180082ff0 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileDowngrade_Vista" start=0x18007eba0 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileMappingFromApp" start=0x1800809d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFile2" start=0x180074550 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileW" start=0x1800241d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileA" start=0x1800240d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="CreateFileMappingNumaW" start=0x18002ca40 tag=SymTagPublicSymbol> $ python64 debug\symbols\symsearch.py "NtCreate*" c:\windows\system32\ntdll.dll --addr 0x42000000 Namespace(addr=1107296256, dbghelp=None, file='c:\\windows\\system32\\ntdll.dll', pattern='NtCreate*', tag=0) 47 symbols found: * <SymbolInfoA name="NtCreateProcessEx" start=0x4209ca00 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateIRTimer" start=0x4209d530 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateRegistryTransaction" start=0x4209d750 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateTimer" start=0x4209d810 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateKeyedEvent" start=0x4209d5d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateFile" start=0x4209cb00 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateSymbolicLinkObject" start=0x4209d7d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreatePrivateNamespace" start=0x4209d6d0 tag=SymTagPublicSymbol> * <SymbolInfoA name="NtCreateKey" start=0x4209c400 tag=SymTagPublicSymbol> ...
18.15. WMI¶
18.15.1. WMI requests¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows print("WMI requester is {0}".format(windows.system.wmi)) print("Selecting * from 'Win32_Process'") result = windows.system.wmi.select("Win32_Process") print("They are <{0}> processes".format(len(result))) print("Looking for ourself via pid") us = [p for p in result if int(p["ProcessId"]) == windows.current_process.pid][0] print("Some info about our process:") print(" * {0} -> {1}".format("Name", us["Name"])) print(" * {0} -> {1}".format("ProcessId", us["ProcessId"])) print(" * {0} -> {1}".format("OSName", us["OSName"])) print(" * {0} -> {1}".format("UserModeTime", us["UserModeTime"])) print(" * {0} -> {1}".format("WindowsVersion", us["WindowsVersion"])) print(" * {0} -> {1}".format("CommandLine", us["CommandLine"])) print("<Select Caption,FileSystem,FreeSpace from Win32_LogicalDisk>:") for vol in windows.system.wmi.query("select Caption,FileSystem,FreeSpace from Win32_LogicalDisk"): # Filter out system-properties for the sample print(" * " + str({k:v for k,v in vol.items() if not k.startswith("_")})) print("\n ==== Advanced use ====") print("Listing some namespaces:") for namespace in [ns for ns in windows.system.wmi.namespaces if "2" in ns]: print(" * {0}".format(namespace)) security2 = windows.system.wmi["root\\SecurityCenter2"] print("Querying non-default namespace: {0}".format(security2)) print("Listing some available classes:") for clsname in [x for x in security2.classes if x["__CLASS"].endswith("Product")]: print(" * {0}".format(clsname)) print("Listing <AntiVirusProduct>:") for av in security2.select("AntiVirusProduct"): print(" * {0}".format(av["displayName"]))
Output
(cmd) python wmi\wmi_request.py
WMI requester is <windows.winobject.wmi.WmiManager object at 0x000001CD1A46E150>
Selecting * from 'Win32_Process'
They are <329> processes
Looking for ourself via pid
Some info about our process:
* Name -> python.exe
* ProcessId -> 28460
* OSName -> Microsoft Windows 11 Pro|C:\Windows|\Device\Harddisk0\Partition3
* UserModeTime -> 0
* WindowsVersion -> 10.0.22631
* CommandLine -> C:\Users\cleme\AppData\Local\Programs\Python\Python311\python.exe wmi\wmi_request.py
<Select Caption,FileSystem,FreeSpace from Win32_LogicalDisk>:
* {'Caption': 'C:', 'FileSystem': 'NTFS', 'FreeSpace': '925749731328'}
==== Advanced use ====
Listing some namespaces:
* CIMV2
* SecurityCenter2
* StandardCimv2
Querying non-default namespace: <WmiNamespace "root\SecurityCenter2">
Listing some available classes:
* <WmiObject class "AntiSpywareProduct">
* <WmiObject class "AntiVirusProduct">
* <WmiObject class "FirewallProduct">
Listing <AntiVirusProduct>:
* Windows Defender
18.15.2. WMI Create Process¶
import time import windows wmispace = windows.system.wmi["root\\cimv2"] print("WMI namespace is <{0}>".format(wmispace)) proc_class = wmispace.get_object("Win32_process") print("Process class is {0}".format(proc_class)) inparam_cls = proc_class.get_method("Create").inparam print("Method Create InParams is <{0}>".format(inparam_cls)) print("Method Create InParams properties are <{0}>".format(inparam_cls.properties)) print("Creating instance of inparam") inparam = inparam_cls() print("InParam instance is <{0}>".format(inparam)) print("Setting <CommandLine>") inparam["CommandLine"] = r"c:\windows\system32\notepad.exe" print("Executing method") # This API may change for something that better wraps cls/object/Parameters handling outparam = wmispace.exec_method(proc_class, "Create", inparam) print("OutParams is {0}".format(outparam)) print("Out params values are: {0}".format(outparam.properties)) target = windows.WinProcess(pid=int(outparam["ProcessId"])) print("Created process is {0}".format(target)) print("Waiting 1s") time.sleep(1) print("Killing the process") target.exit(0)
Output
(cmd) python wmi\create_process.py WMI namespace is <<WmiNamespace "root\cimv2">> Process class is <WmiObject class "Win32_Process"> Method Create InParams is <<WmiObject class "__PARAMETERS">> Method Create InParams properties are <['CommandLine', 'CurrentDirectory', 'ProcessStartupInformation']> Creating instance of inparam InParam instance is <<WmiObject instance of "__PARAMETERS">> Setting <CommandLine> Executing method OutParams is <WmiObject instance of "__PARAMETERS"> Out params values are: ['ProcessId', 'ReturnValue'] Created process is <WinProcess "notepad.exe" pid 26744 at 0x180e84d48d0> Waiting 1s Killing the process
18.16. windows.com¶
18.16.1. INetFwPolicy2¶
import sys import os.path import pprint sys.path.append(os.path.abspath(__file__ + "\..\..")) import windows import windows.generated_def as gdef from windows.generated_def import interfaces # This code is a simple version of the firewall code in windows.winoject.network print("Initialisation of COM") windows.com.init() print("Creating INetFwPolicy2 variable") firewall = interfaces.INetFwPolicy2() print("{0} (value = {1})".format(firewall, firewall.value)) print("") print("Generating CLSID") NetFwPolicy2CLSID = windows.com.IID.from_string("E2B3C97F-6AE1-41AC-817A-F6F92166D7DD") print(repr(NetFwPolicy2CLSID)) print("") print("Creating COM instance") windows.com.create_instance(NetFwPolicy2CLSID, firewall) print("{0} (value = 0x{1:0})".format(firewall, firewall.value)) print("") print("Checking for enabled profiles") for profile in [gdef.NET_FW_PROFILE2_DOMAIN, gdef.NET_FW_PROFILE2_PRIVATE, gdef.NET_FW_PROFILE2_PUBLIC]: enabled = gdef.VARIANT_BOOL() firewall.get_FirewallEnabled(profile, enabled) print(" * {0} -> {1}".format(profile, enabled.value))
Output
(cmd) python com\com_inetfwpolicy2.py Initialisation of COM Creating INetFwPolicy2 variable <INetFwPolicy2<NULL> at 0x2925a3e2350> (value = None) Generating CLSID <GUID "E2B3C97F-6AE1-41AC-817A-F6F92166D7DD"> Creating COM instance <INetFwPolicy2 at 0x2925a3e2350> (value = 0x2827524184096) Checking for enabled profiles * NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_DOMAIN(0x1) -> True * NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_PRIVATE(0x2) -> True * NET_FW_PROFILE_TYPE2_.NET_FW_PROFILE2_PUBLIC(0x4) -> True
18.16.2. ICallInterceptor¶
import windows import windows.generated_def as gdef from windows import winproxy # POC of ICallInterceptor # Based on works by Pavel Yosifovich # http://blogs.microsoft.co.il/pavely/2018/02/28/intercepting-com-objects-with-cogetinterceptor/ windows.com.init() # Create an interceptor for the firewall (INetFwPolicy2) interceptor = gdef.ICallInterceptor() winproxy.CoGetInterceptor(gdef.INetFwPolicy2.IID, None, interceptor.IID, interceptor) # The PythonForWindows firewall object is a real/valid INetFwPolicy2 # used for demos of ICallFrameEvents.Invoke real_firewall = windows.system.network.firewall # Custom Python ICallFrameEvents implementation class MySink(windows.com.COMImplementation): IMPLEMENT = gdef.ICallFrameEvents def OnCall(self, this, frame): ifname = gdef.PWSTR() methodname = gdef.PWSTR() print("Hello from python sink !") frame.GetNames(ifname, methodname) print("Catching call to <{0}.{1}>".format(ifname.value, methodname.value)) param0info = gdef.CALLFRAMEPARAMINFO() param0 = windows.com.Variant() frame.GetParamInfo(0, param0info) frame.GetParam(0, param0) print("Info about parameters 0:") windows.utils.sprint(param0info, name=" * param0info") print("param0 value = {0}".format(param0.aslong)) frame.Invoke(real_firewall) frame.SetReturnValue(1234) print("Leaving the sink !") return 0 # Create and register our ICallFrameEvents sink xsink = MySink() interceptor.RegisterSink(xsink) # Create the INetFwPolicy2 interceptor interface fakefirewall = gdef.INetFwPolicy2() interceptor.QueryInterface(fakefirewall.IID, fakefirewall) # Calling one of the INetFwPolicy2 function for testing # Testing on https://msdn.microsoft.com/en-us/library/windows/desktop/aa365316(v=vs.85).aspx enabled = gdef.VARIANT_BOOL() res = fakefirewall.get_FirewallEnabled(2, enabled) print("return value = {0}".format(res)) print("firewall enabled = {0}".format(enabled)) # Test a function taking a POINTER(ICallFrameEvents) (PTR to interface) print("Testing a function taking a PTR to a COM interface") sink2 = gdef.ICallFrameEvents() print("Before call: {0}".format((sink2, sink2.value))) interceptor.GetRegisteredSink(sink2) print("After call: {0}".format((sink2, sink2.value))) # (cmd) python samples\com\icallinterceptor.py # Hello from python sink ! # Catching call to <INetFwPolicy2.FirewallEnabled> # Info about parameters 0: # * param0info.fIn -> 0x1 # * param0info.fOut -> 0x0 # * param0info.stackOffset -> 0x4L # * param0info.cbParam -> 0x4L # param0 value = 2 # Leaving the sink ! # return value = 1234 # firewall enabled = VARIANT_BOOL(True) # Testing a function taking a PTR to a COM interface # Before call: (<ICallFrameEvents object at 0x066EF3F0>, None) # After call: (<ICallFrameEvents object at 0x066EF3F0>, 107934504)
Output
(cmd) python com\icallinterceptor.py Hello from python sink ! Catching call to <INetFwPolicy2.FirewallEnabled> Info about parameters 0: * param0info.fIn -> 0x1 * param0info.fOut -> 0x0 * param0info.stackOffset -> 0x8 * param0info.cbParam -> 0x8 param0 value = 2 Leaving the sink ! return value = 1234 firewall enabled = VARIANT_BOOL(True) Testing a function taking a PTR to a COM interface Before call: (<ICallFrameEvents<NULL> at 0x1fb65de5550>, None) After call: (<ICallFrameEvents at 0x1fb65de5550>, 2179257488408)
18.17. windows.crypto¶
18.17.1. Encryption demo¶
This sample is a working POC able to generate key-pair, encrypt and decrypt file.
import argparse import getpass import windows.crypto as crypto from windows import winproxy from windows.generated_def import * import windows.crypto.generation as gencrypt # http://stackoverflow.com/questions/1461272/basic-questions-on-microsoft-cryptoapi def crypt(src, dst, certs, **kwargs): """Encrypt the content of 'src' file with the certifacts in 'certs' into 'dst'""" # Open every certificates in the certs list certlist = [crypto.Certificate.from_file(x) for x in certs] # Encrypt the content of 'src' with all the public keys(certs) res = crypto.encrypt(certlist, src.read()) print("Encryption done. Result:") print(repr(res)) # Write the result in 'dst' dst.write(res) dst.close() src.close() def decrypt(src, pfxfile, password, outfile=None, **kwargs): """Decrypt the content of 'src' with the private key in 'pfxfile'. the 'pfxfile' is open using the 'password'""" # Open the 'pfx' with the given password if password is None: password = getpass.getpass() pfx = crypto.import_pfx(pfxfile.read(), password) # Decrypt the content of the file decrypted = crypto.decrypt(pfx, src.read()) if outfile is None: print(u"Result = <{0}>".format(decrypted)) else: with open(outfile, "wb") as f: f.write(decrypted) return decrypted def sign(src, pfxfile, password, outfile=None, **kwargs): if password is None: password = getpass.getpass() pfx = crypto.import_pfx(pfxfile.read(), password) # Decrypt the content of the file certs = pfx.certs if len(certs) != 1: raise ValueError("PFX containing exactly one certificate expected for signature") cert = certs[0] signed = crypto.sign(cert, src.read()) if outfile is None: print(u"Result = <{0}>".format(signed)) else: with open(outfile, "wb") as f: f.write(signed) return signed PFW_TMP_KEY_CONTAINER = "PythonForWindowsTMPContainer" def genkeys(common_name, pfxpassword, outname, keysize=2048, **kwargs): """Generate a SHA256/RSA key pair. A self-signed certificate with 'common_name' is stored as 'outname'.cer. The private key is stored in 'outname'.pfx protected with 'pfxpassword'""" cert_store = crypto.CertificateStore.new_in_memory() # Create a TMP context that will hold our newly generated key-pair with crypto.CryptContext(PFW_TMP_KEY_CONTAINER, None, PROV_RSA_FULL, 0, retrycreate=True) as ctx: key = HCRYPTKEY() keysize_flags = keysize << 16 # Generate a key-pair that is exportable winproxy.CryptGenKey(ctx, AT_KEYEXCHANGE, CRYPT_EXPORTABLE | keysize_flags, key) # It does NOT destroy the key-pair from the container, # It only release the key handle # https://msdn.microsoft.com/en-us/library/windows/desktop/aa379918(v=vs.85).aspx winproxy.CryptDestroyKey(key) # Descrption of the key-container that will be used to generate the certificate KeyProvInfo = CRYPT_KEY_PROV_INFO() KeyProvInfo.pwszContainerName = PFW_TMP_KEY_CONTAINER KeyProvInfo.pwszProvName = None KeyProvInfo.dwProvType = PROV_RSA_FULL KeyProvInfo.dwFlags = 0 KeyProvInfo.cProvParam = 0 KeyProvInfo.rgProvParam = None #KeyProvInfo.dwKeySpec = AT_SIGNATURE KeyProvInfo.dwKeySpec = AT_KEYEXCHANGE crypt_algo = CRYPT_ALGORITHM_IDENTIFIER() crypt_algo.pszObjId = szOID_RSA_SHA256RSA certif_name = "CN={0}".format(common_name) # Generate a self-signed certificate based on the given key-container and signature algorithme certif = gencrypt.generate_selfsigned_certificate(certif_name, key_info=KeyProvInfo, signature_algo=crypt_algo) # Add the newly created certificate to our TMP cert-store cert_store.add_certificate(certif) # Generate a pfx from the TMP cert-store print("Password is <{0}>".format(pfxpassword)) pfx = gencrypt.generate_pfx(cert_store, pfxpassword) if outname is None: outname = common_name.lower() # Dump the certif (public key) and pfx (public + private keys) with open(outname + ".cer", "wb") as f: # The encoded certif only contains the public key f.write(certif.encoded) with open(outname + ".pfx", "wb") as f: f.write(pfx) print(certif) # Destroy the TMP key container prov = HCRYPTPROV() winproxy.CryptAcquireContextW(prov, PFW_TMP_KEY_CONTAINER, None, PROV_RSA_FULL, CRYPT_DELETEKEYSET) # Openssl commands to check ce certif/pfx ## Read certificate info (.cer) ### openssl x509 -inform der -in {certif} -text -noout ## Read pfx info (ask to another password to encrypt Private key before print/export) ### openssl pkcs12 -info -in {pfx} -nokeys ## Read pfx info !!!! PRINT PRIVATE KEY !!!! ### openssl pkcs12 -info -in {pfx} -nodes ## Read ASN1 data ### openssl asn1parse -inform DER -in {file} parser = argparse.ArgumentParser(prog=__file__, formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers(description='valid subcommands') cryptparse = subparsers.add_parser('crypt') cryptparse.set_defaults(func=crypt) cryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to encrypt') cryptparse.add_argument('dst', type=argparse.FileType('wb'), help='The encrypted file') cryptparse.add_argument('certs', type=str, nargs='+', help='List of certfile used to encrypt the src') decryptparse = subparsers.add_parser('decrypt') decryptparse.set_defaults(func=decrypt) decryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to decrypt') decryptparse.add_argument('pfxfile', type=argparse.FileType('rb'), help='PFX file to use') decryptparse.add_argument('--password', help='Password of the PFX') decryptparse.add_argument('--outfile', default=None, help='The outputfile default is print') decryptparse = subparsers.add_parser('sign') decryptparse.set_defaults(func=sign) decryptparse.add_argument('src', type=argparse.FileType('rb'), help='File to sign') decryptparse.add_argument('pfxfile', type=argparse.FileType('rb'), help='PFX file to use') decryptparse.add_argument('--password', help='Password of the PFX') decryptparse.add_argument('--outfile', default=None, help='The outputfile default is print') genkeysparse = subparsers.add_parser('genkey', formatter_class=argparse.ArgumentDefaultsHelpFormatter) genkeysparse.set_defaults(func=genkeys) genkeysparse.add_argument('common_name', nargs='?', metavar='CommonName', default='DEFAULT', help='the common name of the certificate') genkeysparse.add_argument('outname', nargs='?',help='The filename base for the generated files') genkeysparse.add_argument('--pfxpassword', nargs='?', help='Password to protect the PFX') genkeysparse.add_argument('--keysize', nargs='?', type=lambda x: int(x, 0), default=2048, help='The size of the RSA key') res = parser.parse_args() res.func(**res.__dict__)
Ouput:
(cmd λ) python crypto\encryption_demo.py genkey YOLOCERTIF mykey --pfxpassword MYPASSWORD
<CertificatContext "YOLOCERTIF" serial="1b a4 3e 17 f7 ed ec ab 4f f8 11 46 48 e9 29 25">
(cmd λ) ls
mykey.cer mykey.pfx
(cmd λ) echo|set /p="my secret message" > message.txt
(cmd λ) python crypto\encryption_demo.py crypt message.txt message.crypt mykey.cer
Encryption done. Result:
bytearray(b'0\x82\x01\x19\x06\t*\x86H\x86\xf7\r\x01\x07\x03\xa0\x82\x01\n0\x82\x01\x06\x02\x01\x001\x81\xc30\x81
\xc0\x02\x01\x000)0\x151\x130\x11\x06\x03U\x04\x03\x13\nYOLOCERTIF\x02\x10\x1b\xa4>\x17\xf7\xed\xec\xabO\xf8\x11
FH\xe9)%0\r\x06\t*\x86H\x86\xf7\r\x01\x01\x01\x05\x00\x04\x81\x80V\x89)\xf5\xaaM\x99cEA\x17^\xa2D~\x94\xe3\xf2\x1f
\x05Y\xc2\xbb\xb2\xbbYBpU6\x870\xce\xe7\xd2M{\xbb\xb9K\xa0\xf5\xe5\x93\xca\xedF\x80.x\xdc\xf2\x0c\xa6UO\x01\r\xaf
\xd0Z\xd9\xabnzR\xd4j=\xca\xc2RG\xcd\x11u\x82\x7f\x8c\xd8t\xb9\xf9\xe8%\xfal\xaaHPj;\xecKk]\t%\xfd\x91\xcc\xe0lWf
\xc6\x12x\x1am\xc8\x01t\xac\xa6\xf3#\x02\xd4J \x8eZ\xbb\x10W\xe1 0;\x06\t*\x86H\x86\xf7\r\x01\x07\x010\x14\x06\x08*
\x86H\x86\xf7\r\x03\x07\x04\x08\x14F\x04\xad\xed9\xed<\x80\x18\x80]6\xccTV\xbc\xb8*\x84QY!~\xb3\n\x1aV\xd4\rf\xd1n:')
(cmd λ) python crypto\encryption_demo.py decrypt --password BADPASS message.crypt mykey.pfx
Traceback (most recent call last):
File "..\samples\encryption_demo.py", line 103, in <module>
res.func(**res.__dict__)
File "..\samples\encryption_demo.py", line 26, in decrypt
pfx = crypto.import_pfx(pfxfile.read(), password)
File "c:\users\hakril\documents\work\pythonforwindows\windows\crypto\certificate.py", line 153, in import_pfx
cert_store = winproxy.PFXImportCertStore(pfx, password, flags)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 1065, in PFXImportCertStore
return PFXImportCertStore.ctypes_function(pPFX, szPassword, dwFlags)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 148, in perform_call
return self._cprototyped(*args)
File "c:\users\hakril\documents\work\pythonforwindows\windows\winproxy.py", line 69, in kernel32_error_check
raise WinproxyError(func_name)
windows.winproxy.error.WinproxyError: PFXImportCertStore: [Error 86] The specified network password is not correct.
(cmd λ) python crypto\encryption_demo.py decrypt --password MYPASSWORD message.crypt mykey.pfx
Result = <my secret message>
18.17.2. Certificate demo¶
import hashlib import base64 import windows.crypto windowscert = b"""-----BEGIN CERTIFICATE----- MIIFBDCCA+ygAwIBAgITMwAAAQZuwyXEMckYDgAAAAABBjANBgkqhkiG9w0BAQsF ADCBhDELMAkGA1UEBhMCVVMxEzARBgNVBAgTCldhc2hpbmd0b24xEDAOBgNVBAcT B1JlZG1vbmQxHjAcBgNVBAoTFU1pY3Jvc29mdCBDb3Jwb3JhdGlvbjEuMCwGA1UE AxMlTWljcm9zb2Z0IFdpbmRvd3MgUHJvZHVjdGlvbiBQQ0EgMjAxMTAeFw0xNjEw MTEyMDM5MzFaFw0xODAxMTEyMDM5MzFaMHAxCzAJBgNVBAYTAlVTMRMwEQYDVQQI EwpXYXNoaW5ndG9uMRAwDgYDVQQHEwdSZWRtb25kMR4wHAYDVQQKExVNaWNyb3Nv ZnQgQ29ycG9yYXRpb24xGjAYBgNVBAMTEU1pY3Jvc29mdCBXaW5kb3dzMIIBIjAN BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyWcaCYghNInk3ecpyu2uZ7LCV9QS 7GWYr41ufTkcL66ewHxlAoWjmkKG6W2Bp9BYYQok10iDeDGACE9Vjr6m4Jdh+YuN RLxMnHC8JTGzk96CzmdBPAuUWdAcHNmTkIWQF6AXzsbBWsekQejvDBygAOCuIYh4 sBgNa5cjTxQc7Iyp9c7RxBmThV5BNFTOnSN6D9N8zU+ENgIZuyHxGvqzRdrhU4G4 Cg/h1CkI4TgeZQZCeUNPnWV6DMuvPCiqGEia5phOJZyENKND0Sx6eQZrYnuz1gMn YaEnO+ggegtt4pWpqg8Ch0jNrkL1fb3Kzz7E34/K9dcTgaOymfF6qUKabQIDAQAB o4IBgDCCAXwwHwYDVR0lBBgwFgYKKwYBBAGCNwoDBgYIKwYBBQUHAwMwHQYDVR0O BBYEFBEciVg/vsVmKtr/hmHt7KM6g8lSMFIGA1UdEQRLMEmkRzBFMQ0wCwYDVQQL EwRNT1BSMTQwMgYDVQQFEysyMjk4NzkrMTQ3NDQ5YmUtMTVhOC00ZWJhLTkzZjMt ZDExMGE1YzQ1NTUyMB8GA1UdIwQYMBaAFKkpAjmOFsSXeM2Q+Z5PmuF8Va9TMFQG A1UdHwRNMEswSaBHoEWGQ2h0dHA6Ly93d3cubWljcm9zb2Z0LmNvbS9wa2lvcHMv Y3JsL01pY1dpblByb1BDQTIwMTFfMjAxMS0xMC0xOS5jcmwwYQYIKwYBBQUHAQEE VTBTMFEGCCsGAQUFBzAChkVodHRwOi8vd3d3Lm1pY3Jvc29mdC5jb20vcGtpb3Bz L2NlcnRzL01pY1dpblByb1BDQTIwMTFfMjAxMS0xMC0xOS5jcnQwDAYDVR0TAQH/ BAIwADANBgkqhkiG9w0BAQsFAAOCAQEAvYC1iawgKoxXAotQXaN0lj1J5VX01/un 7JybZF4sPMG4acoFT85Ao5U6TK5ATPB7yPUulAivp8908DwTGqN+Ju6iH+UkvAb+ a/WcHVEMxQXK5eOFNE6yekUArBGbMNWlTFrpwklmVTnL9R+4aApTEe6ITT1KLDio 5uFw98n5Sqgh+In073czyiTG7MVhBexbOfhgnciXoufeyhwy1pYgjouSqSQZs4bj cUwQTwGlS2Gd5a+3nblhjn+QhSszIo1K5n1udLPFWtn29BuGlSrtTXPv5OCfNtLO l2ec6CyjDQc6HcQBNCsbJVq6qGtQbYNE+ih+KhIU4tO5jf25xthf2g== -----END CERTIFICATE-----""" raw_cert = base64.decodebytes(b"".join(windowscert.split(b"\n")[1:-1])) cert = windows.crypto.Certificate.from_buffer(raw_cert) print("Analysing certificate: {0}".format(cert)) print(" * name: <{0}>".format(cert.name)) print(" * issuer: <{0}>".format(cert.issuer)) print(" * raw_serial: <{0}>".format(cert.raw_serial)) print(" * serial: <{0}>".format(cert.serial)) print(" * encoded start: <{0!r}>".format(cert.encoded[:20])) print("") chains = cert.chains print("This certificate has {0} certificate chain(s)".format(len(chains))) for i, chain in enumerate(chains): print("Chain {0}:".format(i)) for ccert in chain: print(" {0}:".format(ccert)) print(" * issuer: <{0}>".format(ccert.issuer)) print ("") cert_to_verif = ccert print("Looking for <{0}> in trusted certificates".format(cert_to_verif.name)) root_store = windows.crypto.CertificateStore.from_system_store("Root") # This is not the correct way verify the validity of a certificate chain. # I would say that if the goal is to verify the signature of the certificate: use wintrust. # (or maybe CertVerifyCertificateChainPolicy : https://msdn.microsoft.com/en-us/library/windows/desktop/aa377163(v=vs.85).aspx) matchs = [c for c in root_store.certs if c == cert_to_verif] print("matches = {0}".format(matchs)) if matchs: print("Found it !") else: print("Not found :(") ## Extract certificates of a PE file ## This code is not a fixed API and the current state of my tests print ("") print ("== PE Analysis ==") TARGET_FILE = r"C:\windows\system32\ntdll.dll" print("Target sha1 = <{0}>".format(hashlib.sha1(open(TARGET_FILE, "rb").read()).hexdigest())) cryptobj = windows.crypto.CryptObject(TARGET_FILE) print("Analysing {0}".format(cryptobj)) print("File has {0} signer(s):".format(cryptobj.crypt_msg.nb_signer)) for i, signer in enumerate(cryptobj.crypt_msg.signers): print("Signer {0}:".format(i)) print(" * Issuer: {0!r}".format(signer.Issuer.data)) print(" * HashAlgorithme: {0}".format(signer.HashAlgorithm.pszObjId)) cert = cryptobj.cert_store.find(signer.Issuer, signer.SerialNumber) print(" * Certificate: {0}".format(cert)) print("") print("File embdeds {0} certificate(s):".format(cryptobj.crypt_msg.nb_cert)) for i, certificate in enumerate(cryptobj.crypt_msg.certs): print(" * {0}) {1}".format(i, certificate))
Output
(cmd) python crypto\certificate.py Analysing certificate: <Certificate "Microsoft Windows" serial="33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06"> * name: <Microsoft Windows> * issuer: <Microsoft Windows Production PCA 2011> * raw_serial: <[51, 0, 0, 1, 6, 110, 195, 37, 196, 49, 201, 24, 14, 0, 0, 0, 0, 1, 6]> * serial: <33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06> * encoded start: <bytearray(b'0\x82\x05\x040\x82\x03\xec\xa0\x03\x02\x01\x02\x02\x133\x00\x00\x01\x06')> This certificate has 1 certificate chain(s) Chain 0: <Certificate "Microsoft Windows" serial="33 00 00 01 06 6e c3 25 c4 31 c9 18 0e 00 00 00 00 01 06">: * issuer: <Microsoft Windows Production PCA 2011> <Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">: * issuer: <Microsoft Root Certificate Authority 2010> <Certificate "Microsoft Root Certificate Authority 2010" serial="28 cc 3a 25 bf ba 44 ac 44 9a 9b 58 6b 43 39 aa">: * issuer: <Microsoft Root Certificate Authority 2010> Looking for <Microsoft Root Certificate Authority 2010> in trusted certificates matches = [] Not found :( == PE Analysis == Target sha1 = <5f7905b56952e28316edcbce0206d7bec34de2a1> Analysing <CryptObject "C:\windows\system32\ntdll.dll" content_type=CERT_QUERY_CONTENT_PKCS7_SIGNED_EMBED(0xa)> File has 1 signer(s): Signer 0: * Issuer: bytearray(b'0\x81\x841\x0b0\t\x06\x03U\x04\x06\x13\x02US1\x130\x11\x06\x03U\x04\x08\x13\nWashington1\x100\x0e\x06\x03U\x04\x07\x13\x07Redmond1\x1e0\x1c\x06\x03U\x04\n\x13\x15Microsoft Corporation1.0,\x06\x03U\x04\x03\x13%Microsoft Windows Production PCA 2011') * HashAlgorithme: b'2.16.840.1.101.3.4.2.1' * Certificate: <Certificate "Microsoft Windows" serial="33 00 00 04 5c 3d 56 72 66 6c b7 54 17 00 00 00 00 04 5c"> File embdeds 2 certificate(s): * 0) <Certificate "Microsoft Windows" serial="33 00 00 04 5c 3d 56 72 66 6c b7 54 17 00 00 00 00 04 5c"> * 1) <Certificate "Microsoft Windows Production PCA 2011" serial="61 07 76 56 00 00 00 00 00 08">
18.18. windows.alpc¶
18.18.1. simple alpc communication¶
import multiprocessing import windows.alpc from windows.generated_def import LPC_CONNECTION_REQUEST, LPC_REQUEST PORT_NAME = r"\RPC Control\PythonForWindowsPORT" def alpc_server(): server = windows.alpc.AlpcServer(PORT_NAME) # Create the ALPC Port print("[SERV] PORT <{0}> CREATED".format(PORT_NAME)) msg = server.recv() # Wait for a message print("[SERV] Message type = {0:#x}".format(msg.type)) print("[SERV] Received data: <{0}>".format(msg.data.decode())) assert msg.type & 0xfff == LPC_CONNECTION_REQUEST # Check that message is a connection request print("[SERV] Connection request") server.accept_connection(msg) msg = server.recv() # Wait for a real message print("") print("[SERV] Received message: <{0}>".format(msg.data.decode())) print("[SERV] Message type = {0:#x}".format(msg.type)) assert msg.type & 0xfff == LPC_REQUEST # We can reply by two ways: # - Send the same message with modified data # - Recreate a Message and copy the MessageId msg.data = u"REQUEST '{0}' DONE".format(msg.data.decode()).encode() server.send(msg) def alpc_client(): print("Client pid = {0}".format(windows.current_process.pid)) # Creation an 'AlpcClient' with a port name will connect to the port with an empty message client = windows.alpc.AlpcClient(PORT_NAME) print("[CLIENT] Connected: {0}".format(client)) # Send a message / wait for the response response = client.send_receive(b"Hello world !") print("[CLIENT] Response: <{0}>".format(response.data.decode())) # You can also send message without waiting for a response with 'client.send' if __name__ == "__main__": proc = multiprocessing.Process(target=alpc_server, args=()) proc.start() import time; time.sleep(1) alpc_client() print("BYE") proc.terminate()
Output
(cmd) python alpc\simple_alpc.py [SERV] PORT <\RPC Control\PythonForWindowsPORT> CREATED Client pid = 15840 [SERV] Message type = 0x300a [SERV] Received data: <> [SERV] Connection request [CLIENT] Connected: <windows.alpc.AlpcClient object at 0x06919290> [SERV] Received message: <Hello world !> [SERV] Message type = 0x3001 [CLIENT] Response: <REQUEST 'Hello world !' DONE> BYE
18.18.2. advanced alpc communication¶
import sys import multiprocessing import windows.alpc from windows.generated_def import LPC_CONNECTION_REQUEST, LPC_REQUEST import windows.generated_def as gdef import ctypes import tempfile PORT_NAME = r"\RPC Control\PythonForWindowsPORT_2" PORT_CONTEXT = 0x11223344 def full_alpc_server(): print("server pid = {0}".format(windows.current_process.pid)) server = windows.alpc.AlpcServer(PORT_NAME) print("[SERV] PORT <{0}> CREATED".format(PORT_NAME)) msg = server.recv() print("[SERV] == Message received ==") if msg.type & 0xfff == LPC_CONNECTION_REQUEST: print(" * ALPC connection request: <{0}>".format(msg.data.decode())) msg.data = b"Connection message response" server.accept_connection(msg, port_context=PORT_CONTEXT) else: raise ValueError("Expected connection") while True: msg = server.recv() print("[SERV] == Message received ==") # print(" * Data: {0}".format(msg.data)) # print("[SERV] RECV Message type = {0:#x}".format(msg.type)) # print("[SERV] RECV Message Valid ATTRS = {0:#x}".format(msg.attributes.ValidAttributes)) # print("[SERV] RECV Message ATTRS = {0:#x}".format(msg.attributes.AllocatedAttributes)) if msg.type & 0xfff == LPC_REQUEST: print(" * ALPC request: <{0}>".format(msg.data.decode())) print(" * view_is_valid <{0}>".format(msg.view_is_valid)) if msg.view_is_valid: print(" * message view attribute:") windows.utils.print_ctypes_struct(msg.view_attribute, " - VIEW", hexa=True) view_data = windows.current_process.read_string(msg.view_attribute.ViewBase) print(" * Reading view content: <{0}>".format(view_data)) # Needed in Win7 - TODO: why is there a different behavior ? msg.attributes.ValidAttributes -= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE print(" * security_is_valid <{0}>".format(msg.security_is_valid)) print(" * handle_is_valid <{0}>".format(msg.handle_is_valid)) if msg.handle_is_valid: if msg.handle_attribute.Handle: print(" * message handle attribute:") windows.utils.print_ctypes_struct(msg.handle_attribute, " - HANDLE", hexa=True) if msg.handle_attribute.ObjectType == 1: f = windows.utils.create_file_from_handle(msg.handle_attribute.Handle) print(" - File: {0}".format(f)) print(" - content: <{0}>".format(f.read())) else: print(" - unknow object type == {0}".format(msg.handle_attribute.ObjectType)) msg.attributes.ValidAttributes -= gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE print(" * context_is_valid <{0}>".format(msg.context_is_valid)) if msg.context_is_valid: print(" * message context attribute:") windows.utils.print_ctypes_struct(msg.context_attribute, " - CTX", hexa=True) if msg.attributes.ValidAttributes & gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE: print(" * message token attribute:") token_struct = msg.attributes.get_attribute(gdef.ALPC_MESSAGE_TOKEN_ATTRIBUTE) windows.utils.print_ctypes_struct(token_struct, " - TOKEN", hexa=True) # We can reply by to way: # - Send the same message with modified data # - Recreate a Message and copy the MessageId msg.data = "REQUEST '{0}' DONE".format(msg.data.decode()).encode() sys.stdout.flush() server.send(msg) else: print(ValueError("Unexpected message type <{0}>".format(msg.type & 0xfff))) def send_message_with_handle(client): print("") print("[Client] == Sending a message with a handle ==") # Craft a file with some data f = tempfile.NamedTemporaryFile() f.write(b"Tempfile data <3") f.seek(0) # New message with a Handle msg = windows.alpc.AlpcMessage() msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_HANDLE_ATTRIBUTE msg.handle_attribute.Flags = gdef.ALPC_HANDLEFLG_DUPLICATE_SAME_ACCESS msg.handle_attribute.Handle = windows.utils.get_handle_from_file(f) msg.handle_attribute.ObjectType = 0 msg.handle_attribute.DesiredAccess = 0 msg.data = b"some message with a file" client.send_receive(msg) def send_message_with_view(client): print("") print("[Client] == Sending a message with a view ==") # Create View section = client.create_port_section(0, 0, 0x4000) view = client.map_section(section[0], 0x4000) # New message with a View msg = windows.alpc.AlpcMessage(0x2000) msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE msg.view_attribute.Flags = 0 msg.view_attribute.ViewBase = view.ViewBase msg.view_attribute.SectionHandle = view.SectionHandle msg.view_attribute.ViewSize = 0x4000 msg.data = b"some message with a view" windows.current_process.write_memory(view.ViewBase, b"The content of the view :)\x00") client.send_receive(msg) def alpc_client(): print("Client pid = {0}".format(windows.current_process.pid)) client = windows.alpc.AlpcClient() # You can create a non-connected AlpcClient and send a custom # 'AlpcMessage' for complexe alpc port connection. connect_message = windows.alpc.AlpcMessage() connect_message.data = b"Connection request client message" print("[CLIENT] == Connecting to port ==") connect_response = client.connect_to_port(PORT_NAME, connect_message) print("[CLIENT] Connected with response: <{0}>".format(connect_response.data.decode())) # AlpcClient send/recv/send_receive methods accept both string or # AlpcMessage for complexe message. print("") print("[CLIENT] == Sending a message ==") msg = windows.alpc.AlpcMessage() msg.data = b"Complex Message 1" print(" * Sending Message <{0}>".format(msg.data.decode())) response = client.send_receive(msg) print("[CLIENT] Server response: <{0}>".format(response.data.decode())) print("[CLIENT] RESP Message Valid ATTRS = {0}".format(response.valid_attributes)) send_message_with_handle(client) send_message_with_view(client) sys.stdout.flush() if __name__ == "__main__": proc = multiprocessing.Process(target=full_alpc_server, args=()) proc.start() import time; time.sleep(0.5) alpc_client() import time; time.sleep(0.5) print("BYE") proc.terminate()
Output
(cmd) python alpc\advanced_alpc.py server pid = 2300 [SERV] PORT <\RPC Control\PythonForWindowsPORT_2> CREATED Client pid = 14848 [CLIENT] == Connecting to port == [SERV] == Message received == * ALPC connection request: <Connection request client message> [CLIENT] Connected with response: <Connection message response> [CLIENT] == Sending a message == * Sending Message <Complex Message 1> [SERV] == Message received == * ALPC request: <Complex Message 1> * view_is_valid <False> * security_is_valid <False> * handle_is_valid <False> * context_is_valid <True> * message context attribute: - CTX.PortContext -> 0x11223344 - CTX.MessageContext -> None - CTX.Sequence -> 0x1L - CTX.MessageId -> 0x0L - CTX.CallbackId -> 0x0L * message token attribute: - TOKEN.TokenId -> 0x4101ef8eL - TOKEN.AuthenticationId -> 0x54177L - TOKEN.ModifiedId -> 0x4077ab89L [CLIENT] Server response: <REQUEST 'Complex Message 1' DONE> [CLIENT] RESP Message Valid ATTRS = [ALPC_MESSAGE_CONTEXT_ATTRIBUTE(0x20000000L)] [Client] == Sending a message with a handle == [SERV] == Message received == * ALPC request: <some message with a file> * view_is_valid <False> * security_is_valid <False> * handle_is_valid <True> * message handle attribute: - HANDLE.Flags -> 0x0L - HANDLE.Handle -> 0x2cc - HANDLE.ObjectType -> 0x1L - HANDLE.DesiredAccess -> 0x13019fL - File: <open file '<fdopen>', mode 'r' at 0x049ECA18> - content: <Tempfile data <3> * context_is_valid <True> * message context attribute: - CTX.PortContext -> 0x11223344 - CTX.MessageContext -> None - CTX.Sequence -> 0x2L - CTX.MessageId -> 0x0L - CTX.CallbackId -> 0x0L * message token attribute: - TOKEN.TokenId -> 0x4101ef8eL - TOKEN.AuthenticationId -> 0x54177L - TOKEN.ModifiedId -> 0x4077ab89L [Client] == Sending a message with a view == [SERV] == Message received == * ALPC request: <some message with a view> * view_is_valid <True> * message view attribute: - VIEW.Flags -> 0x0L - VIEW.SectionHandle -> None - VIEW.ViewBase -> 0x4780000 - VIEW.ViewSize -> 0x4000 * Reading view content: <The content of the view :)> * security_is_valid <False> * handle_is_valid <False> * context_is_valid <True> * message context attribute: - CTX.PortContext -> 0x11223344 - CTX.MessageContext -> None - CTX.Sequence -> 0x3L - CTX.MessageId -> 0x0L - CTX.CallbackId -> 0x0L * message token attribute: - TOKEN.TokenId -> 0x4101ef8eL - TOKEN.AuthenticationId -> 0x54177L - TOKEN.ModifiedId -> 0x4077ab89L [SERV] == Message received == Unexpected message type <12> [SERV] == Message received == Unexpected message type <5> [SERV] == Message received == Unexpected message type <12> [SERV] == Message received == Unexpected message type <12> BYE
18.19. windows.rpc¶
18.19.1. Manual UAC¶
import argparse import sys import windows.rpc import windows.generated_def as gdef from windows.rpc import ndr # NDR Descriptions class NDRPoint(ndr.NdrStructure): MEMBERS = [ndr.NdrLong, ndr.NdrLong] class NdrUACStartupInfo(ndr.NdrStructure): MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString), ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, ndr.NdrLong, NDRPoint] class RAiLaunchAdminProcessParameters(ndr.NdrParameters): MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString), ndr.NdrUniquePTR(ndr.NdrWString), ndr.NdrLong, ndr.NdrLong, ndr.NdrWString, ndr.NdrWString, NdrUACStartupInfo, ndr.NdrLong, ndr.NdrLong] class NdrProcessInformation(ndr.NdrParameters): MEMBERS = [ndr.NdrLong] * 4 # Parsing args parser = argparse.ArgumentParser(prog=__file__) parser.add_argument('--target', default=sys.executable, help='Executable to launch') parser.add_argument('--cmdline', default="", help='The commandline for the process') parser.add_argument('--uacflags', type=lambda x: int(x, 0), default=0x11) parser.add_argument('--creationflags', type=lambda x: int(x, 0), default=gdef.CREATE_UNICODE_ENVIRONMENT) params = parser.parse_args() print(params) # Connecting to RPC Interface. UAC_UIID = "201ef99a-7fa0-444c-9399-19ba84f12a1a" client = windows.rpc.find_alpc_endpoint_and_connect(UAC_UIID) iid = client.bind(UAC_UIID) # Marshalling parameters. parameters = RAiLaunchAdminProcessParameters.pack([ params.target, # Application Path params.cmdline, # Commandline params.uacflags, # UAC-Request Flag params.creationflags, # dwCreationFlags "", # StartDirectory "WinSta0\\Default", # Station # Startup Info (None, # Title 0, # dwX 0, # dwY 0, # dwXSize 0, # dwYSize 0, # dwXCountChars 0, # dwYCountChars 0, # dwFillAttribute 0, # dwFlags 5, # wShowWindow # Point structure: Use MonitorFromPoint to setup StartupInfo.hStdOutput (0, 0)), 0, # Window-Handle to know if UAC can steal focus 0xffffffff]) # UAC Timeout result = client.call(iid, 0, parameters) stream = ndr.NdrStream(result) ph, th, pid, tid = NdrProcessInformation.unpack(stream) return_value = ndr.NdrLong.unpack(stream) print("Return value = {0:#x}".format(return_value)) target = windows.winobject.process.WinProcess(handle=ph) print("Created process is {0}".format(target)) print(" * bitness is {0}".format(target.bitness)) print(" * integrity: {0}".format(target.token.integrity)) print(" * elevated: {0}".format(target.token.is_elevated))
Output:
(cmd λ) python rpc\uac.py Namespace(cmdline='', creationflags=CREATE_UNICODE_ENVIRONMENT(0x400L), target='C:\\Python27\\python.exe', uacflags=17) # UAC pop - asking to execute python.exe | Clicking Yes Return value = 0x6 Created process is <WinProcess "python.exe" pid 19304 at 0x455f7d0> * bitness is 32 * integrity: SECURITY_MANDATORY_HIGH_RID(0x3000L) * elevated: True # The new python.exe in another window >>> windows.current_process.token.integrity SECURITY_MANDATORY_HIGH_RID(0x3000L) >>> windows.current_process.token.is_elevated True
18.19.2. Manual LsarEnumeratePrivileges¶
import windows.rpc from windows.rpc import ndr class PLSAPR_OBJECT_ATTRIBUTES(ndr.NdrStructure): MEMBERS = [ndr.NdrLong, ndr.NdrUniquePTR(ndr.NdrWString), ndr.NdrUniquePTR(ndr.NdrLong), # We dont care of the subtype as we will pass None ndr.NdrLong, ndr.NdrUniquePTR(ndr.NdrLong), # We dont care of the subtype as we will pass None ndr.NdrUniquePTR(ndr.NdrLong)] # We dont care of the subtype as we will pass None ## From: RPCVIEW # long Proc44_LsarOpenPolicy2( # [in][unique][string] wchar_t* arg_0, # [in]struct Struct_364_t* arg_1, # [in]long arg_2, # [out][context_handle] void** arg_3); # This function has a [out][context_handle] meaning it return a context_handle # Context handle are represented by 5 NdrLong where the first one is always 0 # PythonForWindows represent context_handle using NdrContextHandle class LsarOpenPolicy2Parameter(ndr.NdrParameters): MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString), PLSAPR_OBJECT_ATTRIBUTES, ndr.NdrLong] ## From: RPCVIEW # long Proc2_LsarEnumeratePrivileges( # [in][context_handle] void* arg_0, # [in][out]long *arg_1, # [out]struct Struct_110_t* arg_2, # [in]long arg_3); # This function has a [in][context_handle] meaning it expect a context_handle # We can pass the NdrContextHandle returned by Proc44_LsarOpenPolicy2 class LsarEnumeratePrivilegesParameter(ndr.NdrParameters): MEMBERS = [ndr.NdrContextHandle, ndr.NdrLong, ndr.NdrLong] class LSAPR_POLICY_PRIVILEGE_DEF(object): @classmethod def unpack(cls, stream): size1 = ndr.NdrShort.unpack(stream) ptr = ndr.NdrShort.unpack(stream) size2 = ndr.NdrLong.unpack(stream) luid = ndr.NdrHyper.unpack(stream) return ptr, luid class LSAPR_PRIVILEGE_ENUM_BUFFER(object): @classmethod def unpack(cls, stream): entries = ndr.NdrLong.unpack(stream) array_size = ndr.NdrLong.unpack(stream) array_ptr = ndr.NdrLong.unpack(stream) # Unpack pointed array array_size2 = ndr.NdrLong.unpack(stream) assert array_size == array_size2 x = [] # unpack each elements LSAPR_POLICY_PRIVILEGE_DEF for i in range(array_size2): ptr, luid = LSAPR_POLICY_PRIVILEGE_DEF.unpack(stream) if ptr: x.append(luid) # unpack pointed strings result = [] for luid in x: name = ndr.NdrWcharConformantVaryingArrays.unpack(stream) result.append((luid, name)) return result # Actual code ## LSASS alpc endpoints is fixed, no need for the epmapper client = windows.rpc.RPCClient(r"\RPC Control\lsasspirpc") ## Bind to the desired interface iid = client.bind('12345778-1234-abcd-ef00-0123456789ab', version=(0,0)) ## Craft parameters and call 'LsarOpenPolicy2' params = LsarOpenPolicy2Parameter.pack([None, (0, None, None, 0, None, None), 0x20000000]) res = client.call(iid, 44, params) ## Unpack the resulting handle handle = ndr.NdrContextHandle.unpack(ndr.NdrStream(res)) # As context_handle have 4 NdrLong of effective data # We can represent them as GUID # NdrContextHandle is just a wrapper packing/unpacking GUID and taking # care of the leading NdrLong(0) in the actual ndr representation of context_handle print("Context Handle is: {0}\n".format(handle)) ## Craft parameters and call 'LsarEnumeratePrivileges' x = LsarEnumeratePrivilegesParameter.pack([handle, 0, 10000]); res = client.call(iid, 2, x) print("Privileges:") ## Unpack the resulting 'LSAPR_PRIVILEGE_ENUM_BUFFER' priviledges = LSAPR_PRIVILEGE_ENUM_BUFFER.unpack(ndr.NdrStream(res)) for priv in priviledges: print(priv)
Output
(cmd) python rpc\lsass.py (2, u'SeCreateTokenPrivilege') (3, u'SeAssignPrimaryTokenPrivilege') (4, u'SeLockMemoryPrivilege') (5, u'SeIncreaseQuotaPrivilege') (6, u'SeMachineAccountPrivilege') (7, u'SeTcbPrivilege') (8, u'SeSecurityPrivilege') (9, u'SeTakeOwnershipPrivilege') (10, u'SeLoadDriverPrivilege') (11, u'SeSystemProfilePrivilege') (12, u'SeSystemtimePrivilege') (13, u'SeProfileSingleProcessPrivilege') (14, u'SeIncreaseBasePriorityPrivilege') (15, u'SeCreatePagefilePrivilege') (16, u'SeCreatePermanentPrivilege') (17, u'SeBackupPrivilege') (18, u'SeRestorePrivilege') (19, u'SeShutdownPrivilege') (20, u'SeDebugPrivilege') (21, u'SeAuditPrivilege') (22, u'SeSystemEnvironmentPrivilege') (23, u'SeChangeNotifyPrivilege') (24, u'SeRemoteShutdownPrivilege') (25, u'SeUndockPrivilege') (26, u'SeSyncAgentPrivilege') (27, u'SeEnableDelegationPrivilege') (28, u'SeManageVolumePrivilege') (29, u'SeImpersonatePrivilege') (30, u'SeCreateGlobalPrivilege') (31, u'SeTrustedCredManAccessPrivilege') (32, u'SeRelabelPrivilege') (33, u'SeIncreaseWorkingSetPrivilege') (34, u'SeTimeZonePrivilege') (35, u'SeCreateSymbolicLinkPrivilege') (36, u'SeDelegateSessionUserImpersonatePrivilege')
18.20. windows.pipe¶
18.20.1. Communication with an injected process¶
import windows import windows.test import windows.pipe p = windows.test.pop_proc_32() print("Child is {0}".format(p)) PIPE_NAME = "PFW_Pipe" rcode = """ import windows import windows.pipe f = open('tst.txt', "w+") fh = windows.utils.get_handle_from_file(f) hm = windows.winproxy.CreateFileMappingA(fh, dwMaximumSizeLow=0x1000, lpName=None) addr = windows.winproxy.MapViewOfFile(hm, dwNumberOfBytesToMap=0x1000) windows.pipe.send_object("{pipe}", addr) """ with windows.pipe.create(PIPE_NAME) as np: print("Created pipe is {0}".format(np)) p.execute_python(rcode.format(pipe=PIPE_NAME)) print("Receiving object from injected process") addr = np.recv() print("Remote Address = {0:#x}".format(addr)) print("Querying memory in target at <{0:#x}>".format(addr)) print(" * {0}".format(p.query_memory(addr))) print("Querying mapped file in target at <{0:#x}>".format(addr)) print(" * {0}".format(p.get_mapped_filename(addr))) p.exit()
Output
(cmd) python pipe\child_send_object.py Child is <WinProcess "notepad.exe" pid 4316 at 0x672fcf0> Created pipe is <PipeConnection name="\\.\pipe\PFW_Pipe" server=True> Receiving object from injected process Remote Address = 0x97a0000 Querying memory in target at <0x97a0000> * <MEMORY_BASIC_INFORMATION32 BaseAddress=0x97a0000 RegionSize=0x001000 State=MEM_COMMIT(0x1000L) Type=MEM_MAPPED(0x40000L) Protect=PAGE_READWRITE(0x4L)> Querying mapped file in target at <0x97a0000> * \Device\HarddiskVolume2\Users\hakril\Documents\projets\PythonForWindows\samples\tst.txt
18.21. windows.security¶
18.21.1. Security Descriptor¶
import windows.security SDDL = "O:BAG:AND:(A;OI;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)(D;CIIO;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)" sd = windows.security.SecurityDescriptor.from_string(SDDL) print("Security descriptor is: {0}".format(sd)) print("Owner: {0}".format(sd.owner)) print(" - lookup: {0}".format(windows.utils.lookup_sid(sd.owner))) print("Group: {0}".format(sd.group)) print(" - lookup: {0}".format(windows.utils.lookup_sid(sd.group))) dacl = sd.dacl print("Dacl: {0}".format(dacl)) for i, ace in enumerate(dacl): print("") print(" ACE [{0}]: {1}".format(i, ace)) print(" - Header-AceType: {0}".format(ace.Header.AceType)) print(" - Header-AceFlags: {0}".format(ace.Header.AceFlags)) print(" - Header-flags: {0}".format(ace.Header.flags)) print(" - Mask: {0}".format(ace.Mask)) print(" - mask: {0}".format(ace.mask)) print(" - Sid: {0}".format(ace.sid))
Output
(cmd) python security\security_descriptor.py Security descriptor is: O:BAG:AND:(A;OI;CCDCLCSWRPWPRCWDWOGA;;;S-1-0-0)(D;CIIO;CCDCLCSWRPWPRCWDWOGA;;;S-1-0-0) Owner: S-1-5-32-544 - lookup: ('BUILTIN', 'Administrators') Group: S-1-5-7 - lookup: ('NT AUTHORITY', 'ANONYMOUS LOGON') Dacl: <Acl count=2> ACE [0]: <AccessAllowedACE mask=269353023> - Header-AceType: ACCESS_ALLOWED_ACE_TYPE(0x0) - Header-AceFlags: 1 - Header-flags: [OBJECT_INHERIT_ACE(0x1)] - Mask: 269353023 - mask: [1, 2, 4, 8, 16, 32, READ_CONTROL(0x20000), WRITE_DAC(0x40000), WRITE_OWNER(0x80000), GENERIC_ALL(0x10000000)] - Sid: S-1-0-0 ACE [1]: <AccessDeniedACE mask=269353023> - Header-AceType: ACCESS_DENIED_ACE_TYPE(0x1) - Header-AceFlags: 10 - Header-flags: [CONTAINER_INHERIT_ACE(0x2), INHERIT_ONLY_ACE(0x8)] - Mask: 269353023 - mask: [1, 2, 4, 8, 16, 32, READ_CONTROL(0x20000), WRITE_DAC(0x40000), WRITE_OWNER(0x80000), GENERIC_ALL(0x10000000)] - Sid: S-1-0-0
18.21.2. Query SACL¶
import sys import windows.security TARGET = r"C:\windows\notepad.exe" # On WIN10 (at least) notepad.exe has a AuditACE if not windows.current_process.token.elevated: print(ValueError("This sample should be run as admin to demonstration SACL access")) print("") print("[NO-PRIV] Querying <{0}> SecurityDescriptor without SACL".format(TARGET)) sd = windows.security.SecurityDescriptor.from_filename(TARGET) print("sacl = {0}".format(sd.sacl)) print("") print("[NO-PRIV] Querying <{0}> SecurityDescriptor with SACL".format(TARGET)) try: sd = windows.security.SecurityDescriptor.from_filename(TARGET, query_sacl=True) print("sacl = {0}".format(sd.sacl)) except WindowsError as e: print(e) print("") print("Enabling <SeSecurityPrivilege>") try: windows.current_process.token.enable_privilege("SeSecurityPrivilege") except ValueError as e: print("[ERROR] {0}".format(e)) exit(1) print("") print("[PRIV] Querying <{0}> SecurityDescriptor with SACL".format(TARGET)) sd = windows.security.SecurityDescriptor.from_filename(TARGET, query_sacl=True) print("sacl = {0}".format(sd.sacl)) print(list(sd.sacl))
Output
(cmd) python security\query_sacl.py This sample should be run as admin to demonstration SACL access [NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor without SACL sacl = <Acl count=0> [NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL None: [Error 1314] A required privilege is not held by the client. Enabling <SeSecurityPrivilege> [ERROR] <Token TokenId=0xd6db5cc Type=TokenPrimary(0x1L)> has no privilege <SeSecurityPrivilege> (cmd-admin) python security\query_sacl.py [NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor without SACL sacl = <Acl count=0> [NO-PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL None: [Error 1314] A required privilege is not held by the client. Enabling <SeSecurityPrivilege> [PRIV] Querying <C:\windows\notepad.exe> SecurityDescriptor with SACL sacl = <Acl count=1> [<SystemAuditACE mask=852246>]
18.22. ETW (Event Tracing for Windows)¶
18.22.1. Trace processing¶
import ctypes import struct import windows import windows.generated_def as gdef makeg = gdef.GUID.from_string # This sample record the ETW event of provider CBB61B6D-A2CF-471A-9A58-A4CD5C08FFBA # related to the UAC (service AppInfo) # The ETW session is called MY_UAC_MONITOR # Is then trigger the UAC and display the retrieved event afterward def show(event): print("{0:#x}: {1}".format(event.EventHeader.TimeStamp, event)) print(" guid: {0}".format(event.guid)) print(" id: {0}".format(event.id)) print(" opcode: {0}".format(event.opcode)) print(" level: {0}".format(event.level)) print(" data: {0!r}".format(event.user_data.replace("\x00", ""))) return 0 session_name = "MY_UAC_MONITOR" logfile_name = "uac.trace" print("Recording UAC event in file <{0}> using session named <{1}>".format(logfile_name, session_name)) my_trace = windows.system.etw.open_trace(session_name, logfile=logfile_name) my_trace.stop(soft=True) # Stop previous trace with this name if exists my_trace.start() my_trace.enable("CBB61B6D-A2CF-471A-9A58-A4CD5C08FFBA", 0xff, 0xff) # Trigger UAC windows.winproxy.ShellExecuteA(None, "runas", "mmc.exe", "BAD_MMC_FILENAME", None , 5) my_trace.stop() my_trace.process(show) #: Process the events registered in the trace (and logfile)
Output
(cmd) python etw\uac_trace.py Recording UAC event in file <uac.trace> using session named <MY_UAC_MONITOR> 0x1d65bb1febaceea: <EventRecord provider="68FDD900-4A3E-11D1-84F4-0000F80464E3" id=0> guid: 68FDD900-4A3E-11D1-84F4-0000F80464E3 id: 0 opcode: 0 level: 0 data: '\x01\n\x01\x05\xbbG\x04-D\xd5\xff\xb1[\xd6\x01Zb\x02\t\x01\x04\xf0\x0c\t\x06\xc4\xff\xff\xff@tzres.dll,-302\n\x05\x03@tzres.dll,-301\x03\x05\x02\xc4\xff\xff\xff\xc0\x92\x1c\xd2D[\xd6\x01\x80\x96\x98\xea\xce\xba\xfe\xb1[\xd6\x01\x01MY_UAC_MONITORC:\\Users\\hakril\\Documents\\projets\\PythonForWindows\\samples\\uac.trace' [...] 0x1d65bb1fec4c011: <EventRecord provider="DEB74A23-5444-3F3B-924B-0E653973F55A" id=11> guid: DEB74A23-5444-3F3B-924B-0E653973F55A id: 11 opcode: 0 level: 0 data: '\x9e\x06\x04\x19\x14\x04\x08\x04\xff\xff\xff\xffWinSta0\\DefaultC:\\Windows\\System32\\mmc.exe"C:\\Windows\\System32\\mmc.exe" BAD_MMC_FILENAMEC:\\Users\\hakril\\Documents\\projets\\PythonForWindows\\samples' 0x1d65bb1fec4e48b: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=10> guid: C0B508D3-5459-339F-A213-889C238CA5B1 id: 10 opcode: 0 level: 0 data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe"C:\\WINDOWS\\SysWOW64\\mmc.exe" BAD_MMC_FILENAME` ' 0x1d65bb1fec7c8eb: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=13> guid: C0B508D3-5459-339F-A213-889C238CA5B1 id: 13 opcode: 0 level: 0 data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe' 0x1d65bb1fec7c8f0: <EventRecord provider="C0B508D3-5459-339F-A213-889C238CA5B1" id=14> guid: C0B508D3-5459-339F-A213-889C238CA5B1 id: 14 opcode: 0 level: 0 data: '"C:\\WINDOWS\\SysWOW64\\mmc.exe" BAD_MMC_FILENAME' [...] 0x1d65bb1feca018d: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=11> guid: 172FF31C-2D80-31A6-FCA8-EB000D380666 id: 11 opcode: 0 level: 0 data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe' 0x1d65bb1feca030d: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=25> guid: 172FF31C-2D80-31A6-FCA8-EB000D380666 id: 25 opcode: 0 level: 0 data: 'C:\\WINDOWS\\SysWOW64\\mmc.exe\x08\x02TRUEFALSE\x10' 0x1d65bb1fecfcdc0: <EventRecord provider="172FF31C-2D80-31A6-FCA8-EB000D380666" id=27> guid: 172FF31C-2D80-31A6-FCA8-EB000D380666 id: 27 opcode: 0 level: 0 data: '\x05TRUETRUE' [...]
18.22.2. Enumeration¶
import windows etwmgr = windows.system.etw print("ETW Manager is: {0}".format(etwmgr)) print("") print("Listing some ETW sessions:") for sess in etwmgr.sessions[:2]: print(" * {0}".format(sess)) print(" * name: {0}".format(sess.name)) print(" * guid: {0}".format(sess.guid)) print(" * id: {0}".format(sess.id)) print(" * logfile: {0}".format(sess.logfile)) sess = etwmgr.sessions[1] target_id = sess.id NB_MATCH = 0 print("") print("Looking for providers for: {0}".format(sess)) for provider in windows.system.etw.providers: if NB_MATCH == 3: break for instance in provider.instances: if NB_MATCH == 3: break for session in instance.sessions: if session.LoggerId == target_id and instance.Pid: proc = [p for p in windows.system.processes if p.pid == instance.Pid][0] print("Found a provider/session for target:") print(" * Provider: {0}".format(provider)) print(" * Instance: {0}".format(instance)) print(" * Process: {0}".format(proc)) NB_MATCH += 1 if NB_MATCH == 3: break break
Output
(cmd) python etw\etw_enumeration.py ETW Manager is: <windows.winobject.event_trace.EtwManager object at 0x03AFBF70> Listing some ETW sessions: * <EventTraceProperties name="AppModel" guid=A922A8BE-2450-438E-9520-FBCDFB46B0BD> * name: AppModel * guid: A922A8BE-2450-438E-9520-FBCDFB46B0BD * id: 4 * logfile: * <EventTraceProperties name="LwtNetLog" guid=603BA31E-EC5A-4CDE-BE87-ED0A16C3B170> * name: LwtNetLog * guid: 603BA31E-EC5A-4CDE-BE87-ED0A16C3B170 * id: 14 * logfile: C:\WINDOWS\System32\LogFiles\WMI\LwtNetLog.etl Looking for providers for: <EventTraceProperties name="LwtNetLog" guid=603BA31E-EC5A-4CDE-BE87-ED0A16C3B170> Found a provider/session for target: * Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE"> * Instance: <TraceProviderInstanceInfo Pid=5256 EnableCount=1> * Process: <WinProcess "RuntimeBroker.exe" pid 5256 at 0x54c39d0> Found a provider/session for target: * Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE"> * Instance: <TraceProviderInstanceInfo Pid=10768 EnableCount=1> * Process: <WinProcess "chrome.exe" pid 10768 at 0x54c3930> Found a provider/session for target: * Provider: <TraceProvider for "43D1A55C-76D6-4F7E-995C-64C711E5CAFE"> * Instance: <TraceProviderInstanceInfo Pid=10236 EnableCount=1> * Process: <WinProcess "YourPhone.exe" pid 10236 at 0x54c37d0>