From 8cc40147984e9105183a6646d86c69f7219aedd6 Mon Sep 17 00:00:00 2001 From: ltlly Date: Thu, 10 Apr 2025 21:53:46 +0800 Subject: [PATCH 1/2] Add debugger-related mpc function, including functionality to retrieve all registers, call stack, byte values, strings, and more. --- src/ida_pro_mcp/mcp-plugin.py | 410 ++++++++++++++++++++++++++++++---- 1 file changed, 370 insertions(+), 40 deletions(-) diff --git a/src/ida_pro_mcp/mcp-plugin.py b/src/ida_pro_mcp/mcp-plugin.py index 29e790c..7b88de8 100644 --- a/src/ida_pro_mcp/mcp-plugin.py +++ b/src/ida_pro_mcp/mcp-plugin.py @@ -249,6 +249,10 @@ def _run_server(self): import ida_xref import ida_entry import idautils +import ida_idd +import ida_dbg +import ida_name + class IDAError(Exception): def __init__(self, message: str): @@ -487,6 +491,133 @@ def create_demangled_to_ea_map(): if demangled: DEMANGLED_TO_EA[demangled] = ea + +@jsonrpc +@idaread +def get_all_regs() -> list[dict[str, str]]: + """Get all registers and their values,this function is only available in the debugger""" + res = [] + dbg = ida_idd.get_dbg() + for tidx in range(ida_dbg.get_thread_qty()): + tid = ida_dbg.getn_thread(tidx) + regs = [] + regvals = ida_dbg.get_reg_vals(tid) + for ridx, rv in enumerate(regvals): + rinfo = dbg.regs(ridx) + rval = rv.pyval(rinfo.dtype) + if isinstance(rval, int): + rval = "0x%x" % rval + if isinstance(rval, bytes): + rval = "".join(["%02x" % b for b in rval]) + regs.append( + { + "reg": rinfo.name, + "value": rval, + } + ) + res.append( + { + "thread_id": tid, + "regs": regs, + } + ) + return res + + +import os + + +@jsonrpc +@idaread +def get_call_stack() -> list[dict[str, str]]: + """Get the current call stack""" + callstack = [] + try: + tid = ida_dbg.get_current_thread() + trace = ida_idd.call_stack_t() + + if not ida_dbg.collect_stack_trace(tid, trace): + return [] + for frame in trace: + frame_info = {"address": hex(frame.callea)} + try: + mi = ida_idd.modinfo_t() + if ida_dbg.get_module_info(frame.callea, mi): + frame_info["module"] = os.path.basename(mi.name) + else: + frame_info["module"] = "" + + name = ( + ida_name.get_nice_colored_name( + frame.callea, + ida_name.GNCN_NOCOLOR + | ida_name.GNCN_NOLABEL + | ida_name.GNCN_NOSEG + | ida_name.GNCN_PREFDBG, + ) + or "" + ) + frame_info["symbol"] = name + + except Exception as e: + frame_info["module"] = "" + frame_info["symbol"] = str(e) + + callstack.append(frame_info) + + except Exception as e: + pass + return callstack + + +@jsonrpc +@idaread +def get_wide_byte_rpc( + address: Annotated[str, "Address to get 1 byte value from"], +) -> int: + """Get 1 byte value at the specified address""" + ea = parse_address(address) + return ida_bytes.get_wide_byte(ea) + + +@jsonrpc +@idaread +def get_wide_word_rpc( + address: Annotated[str, "Address to get 2 bytes value from"], +) -> int: + """Get 2 bytes value at the specified address""" + ea = parse_address(address) + return ida_bytes.get_wide_word(ea) + + +@jsonrpc +@idaread +def get_wide_dword_rpc( + address: Annotated[str, "Address to get 4 bytes value from"], +) -> int: + """Get 4 bytes value at the specified address""" + ea = parse_address(address) + return ida_bytes.get_wide_dword(ea) + + +@jsonrpc +@idaread +def get_qword_rpc(address: Annotated[str, "Address to get 8 bytes value from"]) -> int: + """Get 8 bytes value at the specified address""" + ea = parse_address(address) + return ida_bytes.get_qword(ea) + + +@jsonrpc +@idaread +def get_str_by_addr(address: Annotated[str, "Address to get string from"]): + """Get string at the specified address""" + try: + return idaapi.get_strlit_contents(parse_address(address),-1,0).decode("utf-8") + except Exception as e: + return "Error:" + str(e) + + @jsonrpc @idaread def get_function_by_name( @@ -505,26 +636,158 @@ def get_function_by_name( raise IDAError(f"No function found with name {name}") return get_function(function_address) + +@jsonrpc +@idaread +def search_functions_by_name( + name: Annotated[str, "Name of the function to search for"], +) -> list[Function]: + """Search for functions by name""" + results = [] + for ea in idautils.Functions(): + func_name = ida_funcs.get_func_name(ea) + if name.lower() in func_name.lower(): + results.append(get_function(ea)) + return results + + +import ida_ida + + +@jsonrpc +@idaread +def list_all_breakpoints(): + """ + List all breakpoints in the program. + """ + ea = ida_ida.inf_get_min_ea() + end_ea = ida_ida.inf_get_max_ea() + bkpts = [] + while ea <= end_ea: + bpt = ida_dbg.bpt_t() + if ida_dbg.get_bpt(ea, bpt): + bkpts.append( + { + "ea": hex(bpt.ea), + "type": bpt.type, + "enabled": bpt.flags & ida_dbg.BPT_ENABLED, + "condition": bpt.condition if bpt.condition else None, + } + ) + ea = ida_bytes.next_head(ea, end_ea) + return bkpts + + +@jsonrpc +@idaread +def idadbg_start_process() -> str: + """Start the debugger""" + ret = idaapi.start_process("", "", "") + if ret == 1: + return "Debugger started" + return "Failed to start debugger" + + +@jsonrpc +@idaread +def idadbg_exit_process() -> str: + """Exit the debugger""" + ret = idaapi.exit_process() + if ret == 1: + return "Debugger exited" + return "Failed to exit debugger" + + +@jsonrpc +@idaread +def idadbg_continue_process() -> str: + """Continue the debugger""" + ret = idaapi.continue_process() + if ret == 1: + return "Debugger continued" + return "Failed to continue debugger" + + +@jsonrpc +@idaread +def idadbg_run_to( + address: Annotated[str, "Run the debugger to the specified address"], +) -> str: + """Run the debugger to the specified address""" + ea = parse_address(address) + ret = idaapi.run_to(ea) + if ret == 1: + return f"Debugger run to {hex(ea)}" + return f"Failed to run to address {hex(ea)}" + + +@jsonrpc +@idaread +def idadbg_add_bpt( + address: Annotated[str, "Set a breakpoint at the specified address"], +) -> str: + """Set a breakpoint at the specified address""" + ea = parse_address(address) + ret = idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT) + if ret == 1: + return f"Breakpoint set at {hex(ea)}" + bpts = list_all_breakpoints() + for bpt in bpts: + if bpt["ea"] == hex(ea): + return f"Breakpoint already exists at {hex(ea)}" + return f"Failed to set breakpoint at address {hex(ea)}" + + +@jsonrpc +@idaread +def idadbg_del_bpt( + address: Annotated[str, "del a breakpoint at the specified address"], +) -> str: + """del a breakpoint at the specified address""" + ea = parse_address(address) + ret = idaapi.del_bpt(ea) + if ret == 1: + return f"Breakpoint deleted at {hex(ea)}" + return f"Failed to delete breakpoint at address {hex(ea)}" + + +@jsonrpc +@idaread +def idadbg_enable_bpt( + address: Annotated[str, "Enable or disable a breakpoint at the specified address"], + enable: Annotated[bool, "Enable or disable a breakpoint"], +) -> str: + """Enable or disable a breakpoint at the specified address""" + ea = parse_address(address) + ret = idaapi.enable_bpt(ea, enable) + if ret == 1: + return f"Breakpoint {'enabled' if enable else 'disabled'} at {hex(ea)}" + return f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}" + + @jsonrpc @idaread def get_function_by_address( - address: Annotated[str, "Address of the function to get"] + address: Annotated[str, "Address of the function to get"], ) -> Function: """Get a function by its address""" return get_function(parse_address(address)) + @jsonrpc @idaread def get_current_address() -> str: """Get the address currently selected by the user""" return hex(idaapi.get_screen_ea()) + @jsonrpc @idaread def get_current_function() -> Optional[Function]: """Get the function currently selected by the user""" return get_function(idaapi.get_screen_ea()) + class ConvertedNumber(TypedDict): decimal: str hexadecimal: str @@ -532,6 +795,7 @@ class ConvertedNumber(TypedDict): ascii: Optional[str] binary: str + @jsonrpc def convert_number( text: Annotated[str, "Textual representation of the number to convert"], @@ -573,15 +837,18 @@ def convert_number( "hexadecimal": hex(value), "bytes": bytes.hex(" "), "ascii": ascii, - "binary": bin(value) + "binary": bin(value), } + T = TypeVar("T") + class Page(TypedDict, Generic[T]): data: list[T] next_offset: Optional[int] + def paginate(data: list[T], offset: int, count: int) -> Page[T]: if count == 0: count = len(data) @@ -589,26 +856,31 @@ def paginate(data: list[T], offset: int, count: int) -> Page[T]: if next_offset >= len(data): next_offset = None return { - "data": data[offset:offset+count], + "data": data[offset : offset + count], "next_offset": next_offset, } + @jsonrpc @idaread def list_functions( offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"], + count: Annotated[ + int, "Number of functions to list (100 is a good default, 0 means remainder)" + ], ) -> Page[Function]: """List all functions in the database (paginated)""" functions = [get_function(address) for address in idautils.Functions()] return paginate(functions, offset, count) + class String(TypedDict): address: str length: int type: str string: str + def get_strings() -> list[String]: strings = [] for item in idautils.Strings(): @@ -616,32 +888,43 @@ def get_strings() -> list[String]: try: string = str(item) if string: - strings.append({ - "address": hex(item.ea), - "length": item.length, - "type": string_type, - "string": string - }) + strings.append( + { + "address": hex(item.ea), + "length": item.length, + "type": string_type, + "string": string, + } + ) except: continue return strings + @jsonrpc @idaread def list_strings( offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], + count: Annotated[ + int, "Number of strings to list (100 is a good default, 0 means remainder)" + ], ) -> Page[String]: """List all strings in the database (paginated)""" strings = get_strings() return paginate(strings, offset, count) + @jsonrpc @idaread def search_strings( - pattern_str: Annotated[str, "The regular expression to match((The generated regular expression includes case by default))"], - offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], + pattern_str: Annotated[ + str, + "The regular expression to match((The generated regular expression includes case by default))", + ], + offset: Annotated[int, "Offset to start listing from (start at 0)"], + count: Annotated[ + int, "Number of strings to list (100 is a good default, 0 means remainder)" + ], ) -> Page[String]: """Search for strings that satisfy a regular expression""" strings = get_strings() @@ -650,17 +933,22 @@ def search_strings( except Exception as e: raise ValueError(f"Regular expression syntax error, reason is {e}") try: - matched_strings = [s for s in strings if s["string"] and re.search(pattern, s["string"])] + matched_strings = [ + s for s in strings if s["string"] and re.search(pattern, s["string"]) + ] except Exception as e: raise ValueError(f"The regular match failed, reason is {e}") return paginate(matched_strings, offset, count) + @jsonrpc @idaread def search_strings( pattern: Annotated[str, "Substring to search for in strings"], offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], + count: Annotated[ + int, "Number of strings to list (100 is a good default, 0 means remainder)" + ], ) -> Page[String]: """Search for strings containing the given pattern (case-insensitive)""" strings = get_strings() @@ -668,12 +956,13 @@ def search_strings( return paginate(matched_strings, offset, count) - def decompile_checked(address: int) -> ida_hexrays.cfunc_t: if not ida_hexrays.init_hexrays_plugin(): raise IDAError("Hex-Rays decompiler is not available") error = ida_hexrays.hexrays_failure_t() - cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(address, error, ida_hexrays.DECOMP_WARNINGS) + cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func( + address, error, ida_hexrays.DECOMP_WARNINGS + ) if not cfunc: message = f"Decompilation failed at {hex(address)}" if error.str: @@ -683,10 +972,11 @@ def decompile_checked(address: int) -> ida_hexrays.cfunc_t: raise IDAError(message) return cfunc + @jsonrpc @idaread def decompile_function( - address: Annotated[str, "Address of the function to decompile"] + address: Annotated[str, "Address of the function to decompile"], ) -> str: """Decompile a function at the given address""" address = parse_address(address) @@ -716,10 +1006,11 @@ def decompile_function( return pseudocode + @jsonrpc @idaread def disassemble_function( - start_address: Annotated[str, "Address of the function to disassemble"] + start_address: Annotated[str, "Address of the function to disassemble"], ) -> str: """Get assembly code (address: instruction; comment) for a function""" start = parse_address(start_address) @@ -743,27 +1034,32 @@ def disassemble_function( disassembly += f"; {comment}" return disassembly + class Xref(TypedDict): address: str type: str function: Optional[Function] + @jsonrpc @idaread def get_xrefs_to( - address: Annotated[str, "Address to get cross references to"] + address: Annotated[str, "Address to get cross references to"], ) -> list[Xref]: """Get all cross references to the given address""" xrefs = [] xref: ida_xref.xrefblk_t for xref in idautils.XrefsTo(parse_address(address)): - xrefs.append({ - "address": hex(xref.frm), - "type": "code" if xref.iscode else "data", - "function": get_function(xref.frm, raise_error=False), - }) + xrefs.append( + { + "address": hex(xref.frm), + "type": "code" if xref.iscode else "data", + "function": get_function(xref.frm, raise_error=False), + } + ) return xrefs + @jsonrpc @idaread def get_entry_points() -> list[Function]: @@ -777,11 +1073,12 @@ def get_entry_points() -> list[Function]: result.append(func) return result + @jsonrpc @idawrite def set_comment( address: Annotated[str, "Address in the function to set the comment for"], - comment: Annotated[str, "Comment text"] + comment: Annotated[str, "Comment text"], ): """Set a comment for a given address in the function disassembly and pseudocode""" address = parse_address(address) @@ -824,6 +1121,7 @@ def set_comment( cfunc.save_user_cmts() print(f"Failed to set decompiler comment at {hex(address)}") + def refresh_decompiler_widget(): widget = ida_kernwin.get_current_widget() if widget is not None: @@ -831,32 +1129,41 @@ def refresh_decompiler_widget(): if vu is not None: vu.refresh_ctext() + def refresh_decompiler_ctext(function_address: int): error = ida_hexrays.hexrays_failure_t() - cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(function_address, error, ida_hexrays.DECOMP_WARNINGS) + cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func( + function_address, error, ida_hexrays.DECOMP_WARNINGS + ) if cfunc: cfunc.refresh_func_ctext() + @jsonrpc @idawrite def rename_local_variable( function_address: Annotated[str, "Address of the function containing the variable"], old_name: Annotated[str, "Current name of the variable"], - new_name: Annotated[str, "New name for the variable (empty for a default name)"] + new_name: Annotated[str, "New name for the variable (empty for a default name)"], ): """Rename a local variable in a function""" func = idaapi.get_func(parse_address(function_address)) if not func: raise IDAError(f"No function found at address {function_address}") if not ida_hexrays.rename_lvar(func.start_ea, old_name, new_name): - raise IDAError(f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}") + raise IDAError( + f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}" + ) refresh_decompiler_ctext(func.start_ea) + @jsonrpc @idawrite def rename_global_variable( old_name: Annotated[str, "Current name of the global variable"], - new_name: Annotated[str, "New name for the global variable (empty for a default name)"] + new_name: Annotated[ + str, "New name for the global variable (empty for a default name)" + ], ): """Rename a global variable""" ea = idaapi.get_name_ea(idaapi.BADADDR, old_name) @@ -864,11 +1171,12 @@ def rename_global_variable( raise IDAError(f"Failed to rename global variable {old_name} to {new_name}") refresh_decompiler_ctext(ea) + @jsonrpc @idawrite def set_global_variable_type( variable_name: Annotated[str, "Name of the global variable"], - new_type: Annotated[str, "New type for the variable"] + new_type: Annotated[str, "New type for the variable"], ): """Set a global variable's type""" ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name) @@ -878,11 +1186,12 @@ def set_global_variable_type( if not ida_typeinf.apply_tinfo(ea, tif, ida_typeinf.PT_SIL): raise IDAError(f"Failed to apply type") + @jsonrpc @idawrite def rename_function( function_address: Annotated[str, "Address of the function to rename"], - new_name: Annotated[str, "New name for the function (empty for a default name)"] + new_name: Annotated[str, "New name for the function (empty for a default name)"], ): """Rename a function""" func = idaapi.get_func(parse_address(function_address)) @@ -892,11 +1201,12 @@ def rename_function( raise IDAError(f"Failed to rename function {hex(func.start_ea)} to {new_name}") refresh_decompiler_ctext(func.start_ea) + @jsonrpc @idawrite def set_function_prototype( function_address: Annotated[str, "Address of the function"], - prototype: Annotated[str, "New function prototype"] + prototype: Annotated[str, "New function prototype"], ) -> str: """Set a function's prototype""" func = idaapi.get_func(parse_address(function_address)) @@ -912,6 +1222,7 @@ def set_function_prototype( except Exception as e: raise IDAError(f"Failed to parse prototype string: {prototype}") + class my_modifier_t(ida_hexrays.user_lvar_modifier_t): def __init__(self, var_name: str, new_type: ida_typeinf.tinfo_t): ida_hexrays.user_lvar_modifier_t.__init__(self) @@ -926,19 +1237,27 @@ def modify_lvars(self, lvars): return True return False + # NOTE: This is extremely hacky, but necessary to get errors out of IDA def parse_decls_ctypes(decls: str, hti_flags: int) -> tuple[int, str]: if sys.platform == "win32": import ctypes + assert isinstance(decls, str), "decls must be a string" assert isinstance(hti_flags, int), "hti_flags must be an int" c_decls = decls.encode("utf-8") c_til = None ida_dll = ctypes.CDLL("ida") - ida_dll.parse_decls.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int] + ida_dll.parse_decls.argtypes = [ + ctypes.c_void_p, + ctypes.c_char_p, + ctypes.c_void_p, + ctypes.c_int, + ] ida_dll.parse_decls.restype = ctypes.c_int messages = [] + @ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p) def magic_printer(fmt: bytes, arg1: bytes): if fmt.count(b"%") == 1 and b"%s" in fmt: @@ -957,10 +1276,14 @@ def magic_printer(fmt: bytes, arg1: bytes): messages = [] return errors, messages + @jsonrpc @idawrite def declare_c_type( - c_declaration: Annotated[str, "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };"], + c_declaration: Annotated[ + str, + "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };", + ], ): """Create or update a local type from a C declaration""" # PT_SIL: Suppress warning dialogs (although it seems unnecessary here) @@ -971,15 +1294,18 @@ def declare_c_type( pretty_messages = "\n".join(messages) if errors > 0: - raise IDAError(f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}") + raise IDAError( + f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}" + ) return f"success\n\nInfo:\n{pretty_messages}" + @jsonrpc @idawrite def set_local_variable_type( function_address: Annotated[str, "Address of the function containing the variable"], variable_name: Annotated[str, "Name of the variable"], - new_type: Annotated[str, "New type for the variable"] + new_type: Annotated[str, "New type for the variable"], ): """Set a local variable's type""" try: @@ -989,7 +1315,7 @@ def set_local_variable_type( try: new_tif = ida_typeinf.tinfo_t() # parse_decl requires semicolon for the type - ida_typeinf.parse_decl(new_tif, None, new_type+";", ida_typeinf.PT_SIL) + ida_typeinf.parse_decl(new_tif, None, new_type + ";", ida_typeinf.PT_SIL) except Exception: raise IDAError(f"Failed to parse type: {new_type}") func = idaapi.get_func(parse_address(function_address)) @@ -1002,6 +1328,7 @@ def set_local_variable_type( raise IDAError(f"Failed to modify local variable: {variable_name}") refresh_decompiler_ctext(func.start_ea) + class MCP(idaapi.plugin_t): flags = idaapi.PLUGIN_KEEP comment = "MCP Plugin" @@ -1014,7 +1341,9 @@ def init(self): hotkey = MCP.wanted_hotkey.replace("-", "+") if sys.platform == "darwin": hotkey = hotkey.replace("Alt", "Option") - print(f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server") + print( + f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server" + ) return idaapi.PLUGIN_KEEP def run(self, args): @@ -1023,5 +1352,6 @@ def run(self, args): def term(self): self.server.stop() + def PLUGIN_ENTRY(): return MCP() From caf0492ceeaa597af49bbe9ff9d0c15ed7c25298 Mon Sep 17 00:00:00 2001 From: Duncan Ogilvie Date: Fri, 18 Apr 2025 13:49:01 +0200 Subject: [PATCH 2/2] Address review comments --- src/ida_pro_mcp/mcp-plugin.py | 547 +++++++++++++--------------------- 1 file changed, 201 insertions(+), 346 deletions(-) diff --git a/src/ida_pro_mcp/mcp-plugin.py b/src/ida_pro_mcp/mcp-plugin.py index 7b88de8..bf7b3d2 100644 --- a/src/ida_pro_mcp/mcp-plugin.py +++ b/src/ida_pro_mcp/mcp-plugin.py @@ -1,3 +1,4 @@ +import os import sys if sys.version_info < (3, 11): @@ -252,7 +253,7 @@ def _run_server(self): import ida_idd import ida_dbg import ida_name - +import ida_ida class IDAError(Exception): def __init__(self, message: str): @@ -491,133 +492,6 @@ def create_demangled_to_ea_map(): if demangled: DEMANGLED_TO_EA[demangled] = ea - -@jsonrpc -@idaread -def get_all_regs() -> list[dict[str, str]]: - """Get all registers and their values,this function is only available in the debugger""" - res = [] - dbg = ida_idd.get_dbg() - for tidx in range(ida_dbg.get_thread_qty()): - tid = ida_dbg.getn_thread(tidx) - regs = [] - regvals = ida_dbg.get_reg_vals(tid) - for ridx, rv in enumerate(regvals): - rinfo = dbg.regs(ridx) - rval = rv.pyval(rinfo.dtype) - if isinstance(rval, int): - rval = "0x%x" % rval - if isinstance(rval, bytes): - rval = "".join(["%02x" % b for b in rval]) - regs.append( - { - "reg": rinfo.name, - "value": rval, - } - ) - res.append( - { - "thread_id": tid, - "regs": regs, - } - ) - return res - - -import os - - -@jsonrpc -@idaread -def get_call_stack() -> list[dict[str, str]]: - """Get the current call stack""" - callstack = [] - try: - tid = ida_dbg.get_current_thread() - trace = ida_idd.call_stack_t() - - if not ida_dbg.collect_stack_trace(tid, trace): - return [] - for frame in trace: - frame_info = {"address": hex(frame.callea)} - try: - mi = ida_idd.modinfo_t() - if ida_dbg.get_module_info(frame.callea, mi): - frame_info["module"] = os.path.basename(mi.name) - else: - frame_info["module"] = "" - - name = ( - ida_name.get_nice_colored_name( - frame.callea, - ida_name.GNCN_NOCOLOR - | ida_name.GNCN_NOLABEL - | ida_name.GNCN_NOSEG - | ida_name.GNCN_PREFDBG, - ) - or "" - ) - frame_info["symbol"] = name - - except Exception as e: - frame_info["module"] = "" - frame_info["symbol"] = str(e) - - callstack.append(frame_info) - - except Exception as e: - pass - return callstack - - -@jsonrpc -@idaread -def get_wide_byte_rpc( - address: Annotated[str, "Address to get 1 byte value from"], -) -> int: - """Get 1 byte value at the specified address""" - ea = parse_address(address) - return ida_bytes.get_wide_byte(ea) - - -@jsonrpc -@idaread -def get_wide_word_rpc( - address: Annotated[str, "Address to get 2 bytes value from"], -) -> int: - """Get 2 bytes value at the specified address""" - ea = parse_address(address) - return ida_bytes.get_wide_word(ea) - - -@jsonrpc -@idaread -def get_wide_dword_rpc( - address: Annotated[str, "Address to get 4 bytes value from"], -) -> int: - """Get 4 bytes value at the specified address""" - ea = parse_address(address) - return ida_bytes.get_wide_dword(ea) - - -@jsonrpc -@idaread -def get_qword_rpc(address: Annotated[str, "Address to get 8 bytes value from"]) -> int: - """Get 8 bytes value at the specified address""" - ea = parse_address(address) - return ida_bytes.get_qword(ea) - - -@jsonrpc -@idaread -def get_str_by_addr(address: Annotated[str, "Address to get string from"]): - """Get string at the specified address""" - try: - return idaapi.get_strlit_contents(parse_address(address),-1,0).decode("utf-8") - except Exception as e: - return "Error:" + str(e) - - @jsonrpc @idaread def get_function_by_name( @@ -636,135 +510,6 @@ def get_function_by_name( raise IDAError(f"No function found with name {name}") return get_function(function_address) - -@jsonrpc -@idaread -def search_functions_by_name( - name: Annotated[str, "Name of the function to search for"], -) -> list[Function]: - """Search for functions by name""" - results = [] - for ea in idautils.Functions(): - func_name = ida_funcs.get_func_name(ea) - if name.lower() in func_name.lower(): - results.append(get_function(ea)) - return results - - -import ida_ida - - -@jsonrpc -@idaread -def list_all_breakpoints(): - """ - List all breakpoints in the program. - """ - ea = ida_ida.inf_get_min_ea() - end_ea = ida_ida.inf_get_max_ea() - bkpts = [] - while ea <= end_ea: - bpt = ida_dbg.bpt_t() - if ida_dbg.get_bpt(ea, bpt): - bkpts.append( - { - "ea": hex(bpt.ea), - "type": bpt.type, - "enabled": bpt.flags & ida_dbg.BPT_ENABLED, - "condition": bpt.condition if bpt.condition else None, - } - ) - ea = ida_bytes.next_head(ea, end_ea) - return bkpts - - -@jsonrpc -@idaread -def idadbg_start_process() -> str: - """Start the debugger""" - ret = idaapi.start_process("", "", "") - if ret == 1: - return "Debugger started" - return "Failed to start debugger" - - -@jsonrpc -@idaread -def idadbg_exit_process() -> str: - """Exit the debugger""" - ret = idaapi.exit_process() - if ret == 1: - return "Debugger exited" - return "Failed to exit debugger" - - -@jsonrpc -@idaread -def idadbg_continue_process() -> str: - """Continue the debugger""" - ret = idaapi.continue_process() - if ret == 1: - return "Debugger continued" - return "Failed to continue debugger" - - -@jsonrpc -@idaread -def idadbg_run_to( - address: Annotated[str, "Run the debugger to the specified address"], -) -> str: - """Run the debugger to the specified address""" - ea = parse_address(address) - ret = idaapi.run_to(ea) - if ret == 1: - return f"Debugger run to {hex(ea)}" - return f"Failed to run to address {hex(ea)}" - - -@jsonrpc -@idaread -def idadbg_add_bpt( - address: Annotated[str, "Set a breakpoint at the specified address"], -) -> str: - """Set a breakpoint at the specified address""" - ea = parse_address(address) - ret = idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT) - if ret == 1: - return f"Breakpoint set at {hex(ea)}" - bpts = list_all_breakpoints() - for bpt in bpts: - if bpt["ea"] == hex(ea): - return f"Breakpoint already exists at {hex(ea)}" - return f"Failed to set breakpoint at address {hex(ea)}" - - -@jsonrpc -@idaread -def idadbg_del_bpt( - address: Annotated[str, "del a breakpoint at the specified address"], -) -> str: - """del a breakpoint at the specified address""" - ea = parse_address(address) - ret = idaapi.del_bpt(ea) - if ret == 1: - return f"Breakpoint deleted at {hex(ea)}" - return f"Failed to delete breakpoint at address {hex(ea)}" - - -@jsonrpc -@idaread -def idadbg_enable_bpt( - address: Annotated[str, "Enable or disable a breakpoint at the specified address"], - enable: Annotated[bool, "Enable or disable a breakpoint"], -) -> str: - """Enable or disable a breakpoint at the specified address""" - ea = parse_address(address) - ret = idaapi.enable_bpt(ea, enable) - if ret == 1: - return f"Breakpoint {'enabled' if enable else 'disabled'} at {hex(ea)}" - return f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}" - - @jsonrpc @idaread def get_function_by_address( @@ -773,21 +518,18 @@ def get_function_by_address( """Get a function by its address""" return get_function(parse_address(address)) - @jsonrpc @idaread def get_current_address() -> str: """Get the address currently selected by the user""" return hex(idaapi.get_screen_ea()) - @jsonrpc @idaread def get_current_function() -> Optional[Function]: """Get the function currently selected by the user""" return get_function(idaapi.get_screen_ea()) - class ConvertedNumber(TypedDict): decimal: str hexadecimal: str @@ -795,7 +537,6 @@ class ConvertedNumber(TypedDict): ascii: Optional[str] binary: str - @jsonrpc def convert_number( text: Annotated[str, "Textual representation of the number to convert"], @@ -840,15 +581,12 @@ def convert_number( "binary": bin(value), } - T = TypeVar("T") - class Page(TypedDict, Generic[T]): data: list[T] next_offset: Optional[int] - def paginate(data: list[T], offset: int, count: int) -> Page[T]: if count == 0: count = len(data) @@ -856,31 +594,26 @@ def paginate(data: list[T], offset: int, count: int) -> Page[T]: if next_offset >= len(data): next_offset = None return { - "data": data[offset : offset + count], + "data": data[offset:offset + count], "next_offset": next_offset, } - @jsonrpc @idaread def list_functions( offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[ - int, "Number of functions to list (100 is a good default, 0 means remainder)" - ], + count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"], ) -> Page[Function]: """List all functions in the database (paginated)""" functions = [get_function(address) for address in idautils.Functions()] return paginate(functions, offset, count) - class String(TypedDict): address: str length: int type: str string: str - def get_strings() -> list[String]: strings = [] for item in idautils.Strings(): @@ -888,43 +621,32 @@ def get_strings() -> list[String]: try: string = str(item) if string: - strings.append( - { - "address": hex(item.ea), - "length": item.length, - "type": string_type, - "string": string, - } - ) + strings.append({ + "address": hex(item.ea), + "length": item.length, + "type": string_type, + "string": string, + }) except: continue return strings - @jsonrpc @idaread def list_strings( offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[ - int, "Number of strings to list (100 is a good default, 0 means remainder)" - ], + count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], ) -> Page[String]: """List all strings in the database (paginated)""" strings = get_strings() return paginate(strings, offset, count) - @jsonrpc @idaread def search_strings( - pattern_str: Annotated[ - str, - "The regular expression to match((The generated regular expression includes case by default))", - ], + pattern_str: Annotated[str, "The regular expression to match((The generated regular expression includes case by default))"], offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[ - int, "Number of strings to list (100 is a good default, 0 means remainder)" - ], + count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], ) -> Page[String]: """Search for strings that satisfy a regular expression""" strings = get_strings() @@ -933,36 +655,28 @@ def search_strings( except Exception as e: raise ValueError(f"Regular expression syntax error, reason is {e}") try: - matched_strings = [ - s for s in strings if s["string"] and re.search(pattern, s["string"]) - ] + matched_strings = [s for s in strings if s["string"] and re.search(pattern, s["string"])] except Exception as e: raise ValueError(f"The regular match failed, reason is {e}") return paginate(matched_strings, offset, count) - @jsonrpc @idaread def search_strings( pattern: Annotated[str, "Substring to search for in strings"], offset: Annotated[int, "Offset to start listing from (start at 0)"], - count: Annotated[ - int, "Number of strings to list (100 is a good default, 0 means remainder)" - ], + count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"], ) -> Page[String]: """Search for strings containing the given pattern (case-insensitive)""" strings = get_strings() matched_strings = [s for s in strings if pattern.lower() in s["string"].lower()] return paginate(matched_strings, offset, count) - def decompile_checked(address: int) -> ida_hexrays.cfunc_t: if not ida_hexrays.init_hexrays_plugin(): raise IDAError("Hex-Rays decompiler is not available") error = ida_hexrays.hexrays_failure_t() - cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func( - address, error, ida_hexrays.DECOMP_WARNINGS - ) + cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(address, error, ida_hexrays.DECOMP_WARNINGS) if not cfunc: message = f"Decompilation failed at {hex(address)}" if error.str: @@ -972,7 +686,6 @@ def decompile_checked(address: int) -> ida_hexrays.cfunc_t: raise IDAError(message) return cfunc - @jsonrpc @idaread def decompile_function( @@ -1006,7 +719,6 @@ def decompile_function( return pseudocode - @jsonrpc @idaread def disassemble_function( @@ -1034,13 +746,11 @@ def disassemble_function( disassembly += f"; {comment}" return disassembly - class Xref(TypedDict): address: str type: str function: Optional[Function] - @jsonrpc @idaread def get_xrefs_to( @@ -1050,16 +760,13 @@ def get_xrefs_to( xrefs = [] xref: ida_xref.xrefblk_t for xref in idautils.XrefsTo(parse_address(address)): - xrefs.append( - { - "address": hex(xref.frm), - "type": "code" if xref.iscode else "data", - "function": get_function(xref.frm, raise_error=False), - } - ) + xrefs.append({ + "address": hex(xref.frm), + "type": "code" if xref.iscode else "data", + "function": get_function(xref.frm, raise_error=False), + }) return xrefs - @jsonrpc @idaread def get_entry_points() -> list[Function]: @@ -1073,7 +780,6 @@ def get_entry_points() -> list[Function]: result.append(func) return result - @jsonrpc @idawrite def set_comment( @@ -1121,7 +827,6 @@ def set_comment( cfunc.save_user_cmts() print(f"Failed to set decompiler comment at {hex(address)}") - def refresh_decompiler_widget(): widget = ida_kernwin.get_current_widget() if widget is not None: @@ -1129,16 +834,12 @@ def refresh_decompiler_widget(): if vu is not None: vu.refresh_ctext() - def refresh_decompiler_ctext(function_address: int): error = ida_hexrays.hexrays_failure_t() - cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func( - function_address, error, ida_hexrays.DECOMP_WARNINGS - ) + cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(function_address, error, ida_hexrays.DECOMP_WARNINGS) if cfunc: cfunc.refresh_func_ctext() - @jsonrpc @idawrite def rename_local_variable( @@ -1151,19 +852,14 @@ def rename_local_variable( if not func: raise IDAError(f"No function found at address {function_address}") if not ida_hexrays.rename_lvar(func.start_ea, old_name, new_name): - raise IDAError( - f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}" - ) + raise IDAError(f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}") refresh_decompiler_ctext(func.start_ea) - @jsonrpc @idawrite def rename_global_variable( old_name: Annotated[str, "Current name of the global variable"], - new_name: Annotated[ - str, "New name for the global variable (empty for a default name)" - ], + new_name: Annotated[str, "New name for the global variable (empty for a default name)"], ): """Rename a global variable""" ea = idaapi.get_name_ea(idaapi.BADADDR, old_name) @@ -1171,7 +867,6 @@ def rename_global_variable( raise IDAError(f"Failed to rename global variable {old_name} to {new_name}") refresh_decompiler_ctext(ea) - @jsonrpc @idawrite def set_global_variable_type( @@ -1186,7 +881,6 @@ def set_global_variable_type( if not ida_typeinf.apply_tinfo(ea, tif, ida_typeinf.PT_SIL): raise IDAError(f"Failed to apply type") - @jsonrpc @idawrite def rename_function( @@ -1201,7 +895,6 @@ def rename_function( raise IDAError(f"Failed to rename function {hex(func.start_ea)} to {new_name}") refresh_decompiler_ctext(func.start_ea) - @jsonrpc @idawrite def set_function_prototype( @@ -1222,7 +915,6 @@ def set_function_prototype( except Exception as e: raise IDAError(f"Failed to parse prototype string: {prototype}") - class my_modifier_t(ida_hexrays.user_lvar_modifier_t): def __init__(self, var_name: str, new_type: ida_typeinf.tinfo_t): ida_hexrays.user_lvar_modifier_t.__init__(self) @@ -1237,7 +929,6 @@ def modify_lvars(self, lvars): return True return False - # NOTE: This is extremely hacky, but necessary to get errors out of IDA def parse_decls_ctypes(decls: str, hti_flags: int) -> tuple[int, str]: if sys.platform == "win32": @@ -1276,14 +967,10 @@ def magic_printer(fmt: bytes, arg1: bytes): messages = [] return errors, messages - @jsonrpc @idawrite def declare_c_type( - c_declaration: Annotated[ - str, - "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };", - ], + c_declaration: Annotated[str, "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };"], ): """Create or update a local type from a C declaration""" # PT_SIL: Suppress warning dialogs (although it seems unnecessary here) @@ -1294,12 +981,9 @@ def declare_c_type( pretty_messages = "\n".join(messages) if errors > 0: - raise IDAError( - f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}" - ) + raise IDAError(f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}") return f"success\n\nInfo:\n{pretty_messages}" - @jsonrpc @idawrite def set_local_variable_type( @@ -1328,6 +1012,180 @@ def set_local_variable_type( raise IDAError(f"Failed to modify local variable: {variable_name}") refresh_decompiler_ctext(func.start_ea) +@jsonrpc +@idaread +def dbg_get_registers() -> list[dict[str, str]]: + """Get all registers and their values. This function is only available when debugging.""" + result = [] + dbg = ida_idd.get_dbg() + # TODO: raise an exception when not debugging? + for thread_index in range(ida_dbg.get_thread_qty()): + tid = ida_dbg.getn_thread(thread_index) + regs = [] + regvals = ida_dbg.get_reg_vals(tid) + for reg_index, rv in enumerate(regvals): + reg_info = dbg.regs(reg_index) + reg_value = rv.pyval(reg_info.dtype) + if isinstance(reg_value, int): + reg_value = hex(reg_value) + if isinstance(reg_value, bytes): + reg_value = reg_value.hex(" ") + regs.append({ + "name": reg_info.name, + "value": reg_value, + }) + result.append({ + "thread_id": tid, + "registers": regs, + }) + return result + +@jsonrpc +@idaread +def dbg_get_call_stack() -> list[dict[str, str]]: + """Get the current call stack.""" + callstack = [] + try: + tid = ida_dbg.get_current_thread() + trace = ida_idd.call_stack_t() + + if not ida_dbg.collect_stack_trace(tid, trace): + return [] + for frame in trace: + frame_info = { + "address": hex(frame.callea), + } + try: + module_info = ida_idd.modinfo_t() + if ida_dbg.get_module_info(frame.callea, module_info): + frame_info["module"] = os.path.basename(module_info.name) + else: + frame_info["module"] = "" + + name = ( + ida_name.get_nice_colored_name( + frame.callea, + ida_name.GNCN_NOCOLOR + | ida_name.GNCN_NOLABEL + | ida_name.GNCN_NOSEG + | ida_name.GNCN_PREFDBG, + ) + or "" + ) + frame_info["symbol"] = name + + except Exception as e: + frame_info["module"] = "" + frame_info["symbol"] = str(e) + + callstack.append(frame_info) + + except Exception as e: + pass + return callstack + +@jsonrpc +@idaread +def dbg_list_breakpoints(): + """ + List all breakpoints in the program. + """ + ea = ida_ida.inf_get_min_ea() + end_ea = ida_ida.inf_get_max_ea() + bkpts = [] + while ea <= end_ea: + bpt = ida_dbg.bpt_t() + if ida_dbg.get_bpt(ea, bpt): + bkpts.append( + { + "ea": hex(bpt.ea), + "type": bpt.type, + "enabled": bpt.flags & ida_dbg.BPT_ENABLED, + "condition": bpt.condition if bpt.condition else None, + } + ) + ea = ida_bytes.next_head(ea, end_ea) + return bkpts + +@jsonrpc +@idaread +def dbg_start_process() -> str: + """Start the debugger""" + ret = idaapi.start_process("", "", "") + if ret == 1: + return "Debugger started" + return "Failed to start debugger" + +@jsonrpc +@idaread +def dbg_exit_process() -> str: + """Exit the debugger""" + ret = idaapi.exit_process() + if ret == 1: + return "Debugger exited" + return "Failed to exit debugger" + +@jsonrpc +@idaread +def dbg_continue_process() -> str: + """Continue the debugger""" + ret = idaapi.continue_process() + if ret == 1: + return "Debugger continued" + return "Failed to continue debugger" + +@jsonrpc +@idaread +def dbg_run_to( + address: Annotated[str, "Run the debugger to the specified address"], +) -> str: + """Run the debugger to the specified address""" + ea = parse_address(address) + ret = idaapi.run_to(ea) + if ret == 1: + return f"Debugger run to {hex(ea)}" + return f"Failed to run to address {hex(ea)}" + +@jsonrpc +@idaread +def dbg_set_breakpoint( + address: Annotated[str, "Set a breakpoint at the specified address"], +) -> str: + """Set a breakpoint at the specified address""" + ea = parse_address(address) + ret = idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT) + if ret == 1: + return f"Breakpoint set at {hex(ea)}" + bpts = dbg_list_breakpoints() + for bpt in bpts: + if bpt["ea"] == hex(ea): + return f"Breakpoint already exists at {hex(ea)}" + return f"Failed to set breakpoint at address {hex(ea)}" + +@jsonrpc +@idaread +def dbg_delete_breakpoint( + address: Annotated[str, "del a breakpoint at the specified address"], +) -> str: + """del a breakpoint at the specified address""" + ea = parse_address(address) + ret = idaapi.del_bpt(ea) + if ret == 1: + return f"Breakpoint deleted at {hex(ea)}" + return f"Failed to delete breakpoint at address {hex(ea)}" + +@jsonrpc +@idaread +def dbg_enable_breakpoint( + address: Annotated[str, "Enable or disable a breakpoint at the specified address"], + enable: Annotated[bool, "Enable or disable a breakpoint"], +) -> str: + """Enable or disable a breakpoint at the specified address""" + ea = parse_address(address) + ret = idaapi.enable_bpt(ea, enable) + if ret == 1: + return f"Breakpoint {'enabled' if enable else 'disabled'} at {hex(ea)}" + return f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}" class MCP(idaapi.plugin_t): flags = idaapi.PLUGIN_KEEP @@ -1341,9 +1199,7 @@ def init(self): hotkey = MCP.wanted_hotkey.replace("-", "+") if sys.platform == "darwin": hotkey = hotkey.replace("Alt", "Option") - print( - f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server" - ) + print(f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server") return idaapi.PLUGIN_KEEP def run(self, args): @@ -1352,6 +1208,5 @@ def run(self, args): def term(self): self.server.stop() - def PLUGIN_ENTRY(): return MCP()