建议搭配angr源码和angr symbion论文阅读。
angr symbion是一项很神奇的技术,可以帮助实现跳过符号分析前复杂的环境初始化工作,通过一个调试接口,比如gdbserver,可以实现具体环境和符号环境的交错执行。具体的使用方法可以参考官方文档。但是因为这个技术非常有意思,可能对IOT,嵌入式领域的自动化漏洞分析带来极大的帮助,因此我决定深挖该技术的实现细节。
先看一个官方demo:
# Instantiating the ConcreteTarget
avatar_gdb = AvatarGDBConcreteTarget(avatar2.archs.x86.X86_64,
GDB_SERVER_IP, GDB_SERVER_PORT)
# Creating the Project
p = angr.Project(binary_x64, concrete_target=avatar_gdb,
use_sim_procedures=True)
# Getting an entry_state
entry_state = p.factory.entry_state()
# Forget about these options as for now, will explain later.
entry_state.options.add(angr.options.SYMBION_SYNC_CLE)
entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC)
# Use Symbion!
simgr.use_technique(angr.exploration_techniques.Symbion(find=[0x85b853])
可以看出使用该技术的第一项就是初始化ConcreteTarget,在该初始化过程中使用到了一个库Avatar,根据官方介绍,这是一个针对嵌入式设备固件的动态分析框架。显然,symboin技术用它作为自己和concrete state交互的接口,因此,我们先了解一下avatar.
avatar2作为一个编排框架,其实可以支持同时连接多个IOT设备和模拟环境,从架构上来看,avatar2由以下部分组成:
1. targets
2. Protocols
3. endpoints (like an emulator or a physical device)
可以看出avatar2控制一系列target和endpoints进行交互,因此target是avatar2里面的重要组成部分,其包含以下几种类型:
• GDBTarget
• QemuTarget
• JLinkTarget
• OpenOCDTarget
• PandaTarget
以GDBTarget为例,通过它可以实现与gdbserver的交互,进而实现对远程执行环境的控制,不过对于执行的控制avatar2甚至可以实现两个状态的交换
from avatar2 import *
sample = 'firmware.bin'
openocd_conf = 'nucleo-l152re.cfg'
# Create avatar instance with custom output directory
avatar = Avatar(output_directory='/tmp/myavatar')
# Add first target
qemu = avatar.add_target(QemuTarget,
gdb_executable="arm-none-eabi-gdb",
firmware=sample, cpu_model="cortex-m3",
executable="targets/qemu/arm-softmmu/qemu-system-")
# Add the second target
nucleo = avatar.add_target(OpenOCDTarget,
gdb_executable="arm-none-eabi-gdb",
openocd_script=openocd_conf)
# Set up custom gdb ports to avoid collisions
qemu.gdb_port = 1234
nucleo.gdb_port = 1235
# Specify first memory range
rom = avatar.add_memory_range(0x08000000, 0x1000000, name='rom',
file=sample)
# Specify second memory range
ram = avatar.add_memory_range(0x20000000, 0x14000, name='ram')
# Initialize Targets
avatar.init_targets()
# Execute on the nucleo up to a specific address
nucleo.set_breakpoint(0x800B570)
nucleo.cont()
nucleo.wait()
# Transfer the state over to qemu
avatar.transfer_state(nucleo, qemu, sync_regs=True, synced_ranges=[ram])
# Continue execution on qemu
qemu.cont()
在symboin中,使用的就是GDBTarget,同时实现了一系列方法,比如read_memory,write_memory,read_register,write_register等,这些大部分也是对avatar2相关api的包装,不过基于对gdb和gdbserver的了解,并不难理解实现这些功能的方法。在symbion的官方demo中可以看到接下来的动作就是以avatar_gdb创建Project,并对创造出来的state添加angr.options.SYMBION_SYNC_CLE
和 angr.options.SYMBION_KEEP_STUBS_ON_SYNC
,对于这两个option官方有详细的解释,主要是用来控制concrete state和abstract state的同步,因此如何将concrete state成功的转化为abstract state,核心作用在于simgr.use_technique(angr.exploration_techniques.Symbion(find=[0x85b853]),为了实现不同状态之间的转化,angr实现了SimEngineConcrete引擎,在该引擎中存在以下方法:
def process_successors(self, successors, extra_stop_points=None, memory_concretize=None,
register_concretize=None, timeout=0, *args, **kwargs):
new_state = self.state
# setup the concrete process and resume the execution
self.to_engine(new_state, extra_stop_points, memory_concretize, register_concretize, timeout)
# sync angr with the current state of the concrete process using
# the state plugin
new_state.concrete.sync()
successors.engine = "SimEngineConcrete"
successors.sort = "SimEngineConcrete"
successors.add_successor(new_state, new_state.ip, new_state.solver.true, new_state.unicorn.jumpkind)
successors.description = "Concrete Successors"
successors.processed = True
其中to_engine则通过控制avatar2来实现将目标环境的程序运行到指定地址(断点位置),然后调用sync方法进行不同状态之间的同步,调试发现angr存在名为Concrete的插件:
class Concrete(SimStatePlugin):
def __init__(self, segment_registers_initialized=False, segment_registers_callback_initialized=False,
whitelist=None, fs_register_bp=None, already_sync_objects_addresses=None,
):
super().__init__()
self.segment_registers_initialized = segment_registers_initialized
self.segment_registers_callback_initialized = segment_registers_callback_initialized
if not whitelist:
self.whitelist = []
else:
self.whitelist = whitelist
self.synchronize_cle = False
self.stubs_on_sync = False
self.fs_register_bp = fs_register_bp
if not already_sync_objects_addresses:
self.already_sync_objects_addresses = []
else:
self.already_sync_objects_addresses = already_sync_objects_addresses
.......
通过该插件的sync方法,实现了远程目标环境的本地同步,其主要分为这几部分:
寄存器:
def _sync_registers(self, register_names, target):
for register_name in register_names:
try:
reg_value = target.read_register(register_name)
setattr(self.state.regs, register_name, reg_value)
l.debug("Register: %s value: %x ", register_name, self.state.solver.eval(getattr(self.state.regs,
register_name),
cast_to=int))
except SimConcreteRegisterError as exc:
l.debug("Can't set register %s reason: %s, if this register is not used "
"this message can be ignored", register_name, exc)
基本上就是获取远程目标状态的值进行本地化处理。
内存:
def _sync_cle(self, target):
def _check_mapping_name(cle_mapping_name, concrete_mapping_name):
if cle_mapping_name == concrete_mapping_name:
return True
else:
# removing version and extension information from the library name
cle_mapping_name = re.findall(r"[\w']+", cle_mapping_name)
concrete_mapping_name = re.findall(r"[\w']+", concrete_mapping_name)
return cle_mapping_name[0] == concrete_mapping_name[0]
l.debug("Synchronizing CLE backend with the concrete process memory mapping")
try:
vmmap = target.get_mappings()
except NotImplementedError:
l.critical("Can't synchronize CLE backend using the ConcreteTarget provided.")
self.synchronize_cle = False # so, deactivate this feature
l.debug("CLE synchronization has been deactivated")
return
for mapped_object in self.state.project.loader.all_elf_objects:
binary_name = os.path.basename(mapped_object.binary)
# this object has already been sync, skip it.
if binary_name in self.already_sync_objects_addresses:
continue
for mmap in vmmap:
if _check_mapping_name(binary_name, mmap.name):
l.debug("Match! %s -> %s", mmap.name, binary_name)
# let's make sure that we have the header at this address to confirm that it is the
# base address.
# That's not a perfect solution, but should work most of the time.
result = target.read_memory(mmap.start_address, 0x10)
if self.state.project.loader.main_object.check_magic_compatibility(io.BytesIO(result)):
if mapped_object.mapped_base == mmap.start_address:
# We already have the correct address for this memory mapping
l.debug("Object %s is already rebased correctly at 0x%x", binary_name,
mapped_object.mapped_base)
self.already_sync_objects_addresses.append(mmap.name)
break # object has been synchronized, move to the next one!
# rebase the object if the CLE address doesn't match the real one,
# this can happen with PIE binaries and libraries.
l.debug("Remapping object %s mapped at address 0x%x at address 0x%x", binary_name,
mapped_object.mapped_base, mmap.start_address)
old_mapped_base = mapped_object.mapped_base
mapped_object.mapped_base = mmap.start_address # Rebase now!
# TODO re-write this horrible thing
mapped_object.sections._rebase(abs(mmap.start_address - old_mapped_base)) # fix sections
mapped_object.segments._rebase(abs(mmap.start_address - old_mapped_base)) # fix segments
self.already_sync_objects_addresses.append(mmap.name)
break # object has been synchronized, move to the next one!
内存这块通过比对二进制文件的名字和从远程环境获取的vmmap所属内存的名字来确认如何同步本地环境,参考示意图:
SimProc:
• SimProcess在angr里面起到非常重要的作用,这里需要对其进行重新hook,来恢复对于SimProcess的使用。
• symbion所做的主要是以上三点,进而帮助用户搭建一个针对某些目标(比如嵌入式设备)运行时状态的本地恢复,来方便用户针对某些功能进行更好的分析验证,不过到目前为止我们知道了远程状态是如何建立的以及远程状态如何同步到angr本地的抽象环境,但是还不知道angr state是如何建立的,下面一起了解一下。
• 在angr的Project里面有一个SimOS的概念,在我们初始化Project的时候一般是不需要指定的,angr会根据你加载的文件类型来进行自动化的匹配,当然如果加载的文件识别不出来,那可能需要你手动指定了。对于SimOS,angr实现了以下几种:
register_simos('linux', SimLinux)
register_simos('windows', SimWindows)
register_simos('cgc', SimCGC)
register_simos('javavm', SimJavaVM)
这些不同的OS class里面都实现的有类似entry_state,blank_state这样的函数来帮助建立初始化的抽象环境。不过上述函数一般都是在state_blank函数上做封装处理,该函数用来初始化一个blank state。在state_blank最开始会根据ELF文件(或者其它平台类型的文件,这里以Linux平台作为例子)的段属性来创建对应的map信息:
permission_map = { }
for obj in self.project.loader.all_objects:
for seg in obj.segments:
perms = 0
# bit values based off of protection bit values from sys/mman.h
if seg.is_readable:
perms |= 1 # PROT_READ
if seg.is_writable:
perms |= 2 # PROT_WRITE
if seg.is_executable:
perms |= 4 # PROT_EXEC
permission_map[(seg.min_addr, seg.max_addr)] = perms
然后根据栈的属性设置对应的权限:
if stack_end is None:
stack_end = self.arch.initial_sp
......
if self.project.loader.main_object.execstack:
stack_perms = 1 | 2 | 4 # RWX
else:
stack_perms = 1 | 2 # RW
(上图为32位程序,根据不同的程序位数,应该是有一个预设的栈帧开始位置)
然后根据上述的一些文件信息初始化SimState类,经常用angr的人应该对这个类很熟悉,它代表的就是程序在进行符号执行时的state。
state = SimState(self.project, stack_end=stack_end, stack_size=stack_size, stack_perms=stack_perms, **kwargs)
在SimState初始化最开始的阶段一样是设置各种信息,这一块非常简单:
# pylint: disable=not-callable
class SimState(PluginHub):
"""
The SimState represents the state of a program, including its memory, registers, and so forth.
:param angr.Project project: The project instance.
:param archinfo.Arch|str arch: The architecture of the state.
:ivar regs: A convenient view of the state's registers, where each register is a property
:ivar mem: A convenient view of the state's memory, a :class:`angr.state_plugins.view.SimMemView`
:ivar registers: The state's register file as a flat memory region
:ivar memory: The state's memory as a flat memory region
:ivar solver: The symbolic solver and variable manager for this state
:ivar inspect: The breakpoint manager, a :class:`angr.state_plugins.inspect.SimInspector`
:ivar log: Information about the state's history
:ivar scratch: Information about the current execution step
:ivar posix: MISNOMER: information about the operating system or environment model
:ivar fs: The current state of the simulated filesystem
:ivar libc: Information about the standard library we are emulating
:ivar cgc: Information about the cgc environment
:ivar uc_manager: Control of under-constrained symbolic execution
:ivar unicorn: Control of the Unicorn Engine
"""
# Type Annotations for default plugins to allow type inference
solver: 'SimSolver'
posix: 'SimSystemPosix'
registers: 'MemoryMixin'
regs: 'SimRegNameView'
memory: 'MemoryMixin'
callstack: 'CallStack'
mem: "SimMemView"
callstack: 'CallStack'
mem: "SimMemView"
history: 'SimStateHistory'
inspect: 'SimInspector'
jni_references: "SimStateJNIReferences"
scratch: "SimStateScratch"
def __init__(
self,
project=None,
arch=None,
plugins=None,
mode=None,
options=None,
add_options=None,
remove_options=None,
special_memory_filler=None,
os_name=None,
plugin_preset='default',
cle_memory_backer=None,
dict_memory_backer=None,
permissions_map=None,
default_permissions=3,
stack_perms=None,
stack_end=None,
stack_size=None,
regioned_memory_cls=None,
**kwargs):
if kwargs:
l.warning("Unused keyword arguments passed to SimState: %s", " ".join(kwargs))
super(SimState, self).__init__()
self.project = project
# Java & Java JNI
self._is_java_project = self.project and self.project.is_java_project
self._is_java_jni_project = self.project and self.project.is_java_jni_project
# Arch
if self._is_java_jni_project:
self._arch = { "soot" : project.arch,
"vex" : project.simos.native_simos.arch }
# This flag indicates whether the current ip is a native address or
# a soot address descriptor.
# Note: We cannot solely rely on the ip to make that decsision,
# because the registers (storing the ip) are part of the
# plugins that are getting toggled (=> mutual dependence).
self.ip_is_soot_addr = False
else:
self._arch = arch if arch is not None else project.arch.copy() if project is not None else None
if type(self._arch) is str:
self._arch = archinfo.arch_from_id(self._arch)
# the options
if options is None:
if mode is None:
l.warning("SimState defaulting to symbolic mode.")
mode = "symbolic"
options = o.modes[mode]
if isinstance(options, (set, list)):
options = SimStateOptions(options)
if add_options is not None:
options |= add_options
if remove_options is not None:
options -= remove_options
self.options = options
self.mode = mode
self.supports_inspect = False
# OS name
self.os_name = os_name
# This is used in static mode as we don't have any constraints there
self._satisfiable = True
self.uninitialized_access_handler = None
self._special_memory_filler = special_memory_filler
# this is a global condition, applied to all added constraints, memory reads, etc
self._global_condition = None
self.ip_constraints = []
# plugins. lord help us
if plugin_preset is not None:
self.use_plugin_preset(plugin_preset)
if plugins is not None:
for n,p in plugins.items():
self.register_plugin(n, p, inhibit_init=True)
.......
既然最开始创建的是blank state,那么必然是不存在预设的插件的,因此angr会在这种默认条件下使用symbolic mode 和 sym_memory插件以及默认的register插件,这些都是angr实现抽象环境的核心组件,这些插件基本都继承SimStatePlugin类,在实现抽象化内存的设计中存在内存默认权限(3),默认的栈空间以及权限,基于cle.Loader选择的默认memory_backer,默认的空符号化地址集合等,同时将初始化后的结果注册为memory插件:
else:
sim_memory_cls = self.plugin_preset.request_plugin('sym_memory')
sim_memory = sim_memory_cls(cle_memory_backer=cle_memory_backer, dict_memory_backer=dict_memory_backer, memory_id='mem',
permissions_map=permissions_map, default_permissions=default_permissions,
stack_perms=stack_perms, stack_end=stack_end, stack_size=stack_size)
# Add memory plugin
if not self._is_java_jni_project:
self.register_plugin('memory', sim_memory, inhibit_init=True)
对于寄存器来说,在抽象初始化过程中其实和上述的内存初始化过程很相似,因为对于angr来说同样使用经典的抽象内存映射寄存器结构:
else:
sim_registers_cls = self.plugin_preset.request_plugin('sym_memory')
sim_registers = sim_registers_cls(memory_id="reg", endness=register_endness)
# Add registers plugin
if not self._is_java_jni_project:
self.register_plugin('registers', sim_registers, inhibit_init=True)
除了内存和寄存器之外,对于一个state比较重要的就是输入输出环境变量等这些内容了,angr的做法是实现了一个posix插件:
state.register_plugin('posix', SimSystemPosix(stdin=stdin, brk=actual_brk))
class SimSystemPosix(SimStatePlugin):
"""
Data storage and interaction mechanisms for states with an environment conforming to posix.
Available as ``state.posix``.
"""
#__slots__ = [ 'maximum_symbolic_syscalls', 'files', 'max_length' ]
# some posix constants
SIG_BLOCK=0
SIG_UNBLOCK=1
SIG_SETMASK=2
EPERM = 1 # /* Operation not permitted */
ENOENT = 2 # /* No such file or directory */
ESRCH = 3 # /* No such process */
EINTR = 4 # /* Interrupted system call */
EIO = 5 # /* I/O error */
ENXIO = 6 # /* No such device or address */
E2BIG = 7 # /* Argument list too long */
ENOEXEC = 8 # /* Exec format error */
EBADF = 9 # /* Bad file number */
ECHILD = 10 # /* No child processes */
EAGAIN = 11 # /* Try again */
ENOMEM = 12 # /* Out of memory */
EACCES = 13 # /* Permission denied */
EFAULT = 14 # /* Bad address */
ENOTBLK = 15 # /* Block device required */
EBUSY = 16 # /* Device or resource busy */
EEXIST = 17 # /* File exists */
EXDEV = 18 # /* Cross-device link */
ENODEV = 19 # /* No such device */
ENOTDIR = 20 # /* Not a directory */
EISDIR = 21 # /* Is a directory */
EINVAL = 22 # /* Invalid argument */
ENFILE = 23 # /* File table overflow */
EMFILE = 24 # /* Too many open files */
ENOTTY = 25 # /* Not a typewriter */
ETXTBSY = 26 # /* Text file busy */
EFBIG = 27 # /* File too large */
ENOSPC = 28 # /* No space left on device */
ESPIPE = 29 # /* Illegal seek */
EROFS = 30 # /* Read-only file system */
EMLINK = 31 # /* Too many links */
EPIPE = 32 # /* Broken pipe */
EDOM = 33 # /* Math argument out of domain of func */
ERANGE = 34 # /* Math result not representable */
def __init__(self,
stdin=None,
stdout=None,
stderr=None,
fd=None,
sockets=None,
socket_queue=None,
argv=None,
argc=None,
environ=None,
auxv=None,
tls_modules=None,
sigmask=None,
pid=None,
ppid=None,
uid=None,
gid=None,
brk=None):
super().__init__()
# some limits and constants
self.sigmask_bits = 1024
self.maximum_symbolic_syscalls = 255
self.max_length = 2 ** 16
self.argc = argc
self.argv = argv
self.environ = environ
self.auxv = auxv
self.tls_modules = tls_modules if tls_modules is not None else {}
self.brk = brk if brk is not None else 0x1b00000
self._sigmask = sigmask
self.pid = 1337 if pid is None else pid
self.ppid = 1336 if ppid is None else ppid
self.uid = 1000 if uid is None else uid
self.gid = 1000 if gid is None else gid
self.dev_fs = None
self.proc_fs = None
self.autotmp_counter = 0
self._closed_fds = []
self.sockets = sockets if sockets is not None else {}
self.socket_queue = socket_queue if socket_queue is not None else []
if stdin is None:
stdin = SimPacketsStream('stdin', write_mode=False, writable=False, ident='stdin')
if stdout is None:
stdout = SimPacketsStream('stdout', write_mode=True, writable=True, ident='stdout')
if stderr is None:
stderr = SimPacketsStream('stderr', write_mode=True, writable=True, ident='stderr')
if fd is None:
fd = {}
tty = SimFileDescriptorDuplex(stdin, stdout)
# the initial fd layout just looks like this:
# lrwx------ 1 audrey audrey 64 Jan 17 14:21 0 -> /dev/pts/4
# lrwx------ 1 audrey audrey 64 Jan 17 14:21 1 -> /dev/pts/4
# lrwx------ 1 audrey audrey 64 Jan 17 14:21 2 -> /dev/pts/4
# but we want to distinguish the streams. we compromise by having 0 and 1 go to the "tty"
# and stderr goes to a special stderr file
fd[0] = tty
fd[1] = tty
fd[2] = SimFileDescriptor(stderr, 0)
self.fd = fd
# these are the storage mechanisms!
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
其中SimPacketsStream是基于SimFileBase实现的一个抽象结构(类似linux的fd),用来实现标准输入输出等的读写操作。接下来将一些默认值写入到新的State里面,比如sp,以及将ip寄存机设置到入口点等最后一个抽象环境就创建完成了,接下来就是分配栈空间:
if not self._is_core and hasattr(state.memory, 'allocate_stack_pages'):
state.memory.allocate_stack_pages(state.solver.eval(state.regs.sp) - 1, 0x20 * 0x1000)
为state注册文件系统:
state.register_plugin('fs', SimFilesystem(files=fs, pathsep=pathsep, cwd=cwd, mountpoints=mounts))
这个文件系统也非常抽象,基本就是实现了挂载映射和对于文件的简单存储。然后就进入对抽象state的信息填充环节,args设置,env设置,argc设置等:
filename = self.project.filename or 'dummy_filename'
if args is None:
args = [filename]
if env is None:
env = {}
# Prepare argc
if argc is None:
argc = claripy.BVV(len(args), 32)
elif type(argc) is int: # pylint: disable=unidiomatic-typecheck
argc = claripy.BVV(argc, 32)
并使用StringTableSpec(这就是一个用来存储字符串的结构)来对这些信息进行存储,不过在添加字符串的时候会使用基于claripy.BVV等的变量。并在state里面设置env指针等信息来初始化执行环境。其它的entry_state应该就是基于最原始的抽象环境对一些内容填充(不过这块我没调试,有兴趣可以调试看看)。因此综上所述,symbion的特点就是实现了抽象环境和具体执行环境之间的动态切换,来帮助缓解漏洞挖掘或者程序分析过程中对于模拟环境和模拟执行数据的依赖,其总结就分为三个步骤:
1. 初始化具体执行环境,比如gdbserver提供的远程环境,而且一般在程序刚开始执行的时候一般初始化entry point。
2. 当程序执行到需要进行符号执行的时候将寄存器和内存信息同步到符号执行引擎,对state进行恢复。
3. 当符号执行引擎到达目标位置的时候收集变量约束信息进行约束求解,然后赋值给具体化环境,进而让具体化环境也达到相同的位置。
如果有什么偏差,还望指正。