梦回净土抵万侵!!!
在实力面前,不相信任何运维工具,只相信自己手搓的脚本。
在放工具之前来点小插曲。
之前看公众号有师傅反制了macos,遂自查了一下,不查不知道,一查发现两个进程有网络连接。
PowerChime进程
该进程为MAC的充电提示音进程,一个充电提示音进程有外联,而且还是本地ipv6。通过充电频率来统计数据,不排除还传其他东西。
fe80开头地址为本地地址,但是不排除本地代理流量再传递的可能
`进程名称: PowerChime``用户名: mac``网络连接:` `本地地址: ipv6:62300` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:52115` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:63501` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:51850` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:60860` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:55092` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:54878` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:54829` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:60721` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED` `本地地址: ipv6:64410` `远程地址: fe80:8::aede:48ff:fe33:4455:49178` `状态: ESTABLISHED`` `
corespeechd进程
该进程为语音识别进程,我没打开语音识别辅助,他已经悄悄在后台给我启动了。
`进程名称: corespeechd``用户名: mac``网络连接:` `本地地址: ipv6:49742` `远程地址: fe80:8::aede:48ff:fe33:4455:49162` `状态: ESTABLISHED`
禁用ipv6命令:
`networksetup -setv6off Wi-Fi``networksetup -setv6off Ethernet`
PowerChime:
`defaults write com.apple.PowerChime ChimeOnNoHardware -bool true``sudo killall PowerChime`
corespeechd(很顽固,kill半天kill不掉,最后直接删掉,让他变哑巴聋子):
`重启你的 Mac。``按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。``csrutil disable``sudo mv /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd.bak``重启你的 Mac。``按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。``csrutil enable``关闭恢复模式终端,然后从苹果菜单中选择“重启”。`
一 、内网被动流量探针钉钉提醒监控
该脚本通过监听开启的自定义端口流量来达到内网扫描告警目的。
端口占用会报错,尽量搞台干净系统运行。
常见端口:
`21``22``80``105``135``139``443``445``1433``1521``2181``2379``3000``3306``3389``4443``4848``5432``5672``6379``6443``7001``7002``7003``7077``7848``8009``8080``8081``8161``8181``8200``8443``8848``8983``9000``9001``9042``9080``9090``9200``9300``9848``9990``9999``10250``11211``15672``19999``27017``50000``50070``61616`
`import socket``import threading``import requests``import os`` ``# 从 token.txt 文件中读取钉钉机器人 Webhook 的 token``def load_dingtalk_token(filename):` `if not os.path.isfile(filename):` `print(f"文件 {filename} 不存在")` `return None` ` with open(filename, 'r') as file:` `token = file.read().strip()` `return token`` ``# 使用钉钉机器人发送提醒``def send_dingtalk_alert(token, message):` `webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'` `headers = {` `'Content-Type': 'application/json'` `}` `data = {` `'msgtype': 'text',` `'text': {` `'content': message` `}` `}` `response = requests.post(webhook_url, json=data, headers=headers)` `if response.status_code != 200:` `print(f"发送钉钉消息失败: {response.text}")`` ``# 处理客户端连接``def handle_client(client_socket, local_ip, local_port, token):` `remote_ip, remote_port = client_socket.getpeername()` `hostname = socket.gethostname()` ` # 接收请求数据` `request_data = b""` `while True:` `data = client_socket.recv(1024)` `if not data:` `break` `request_data += data` ` request_text = request_data.decode('utf-8', errors='replace')` `message = f"本机内网IP: {local_ip}\n" \` `f"主机名: {hostname}\n" \` `f"被连接端口: {local_port}\n" \` `f"远程连接IP: {remote_ip}\n" \` `f"远程连接端口: {remote_port}\n" \` `f"请求数据:\n{request_text}"` `print(message)` `send_dingtalk_alert(token, message)` `client_socket.close()`` ``# 启动服务器``def start_server(port, token):` `server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)` `server.bind(('0.0.0.0', port))` `server.listen(5)` `local_ip = socket.gethostbyname(socket.gethostname())`` ` `print(f"服务器启动,监听端口: {port}")` `while True:` `client_socket, addr = server.accept()` `client_handler = threading.Thread(` `target=handle_client,` `args=(client_socket, local_ip, port, token)` `)` `client_handler.start()`` ``# 从文件中加载端口列表``def load_ports_from_file(filename):` `if not os.path.isfile(filename):` `print(f"文件 {filename} 不存在")` `return []` ` with open(filename, 'r') as file:` `ports = [int(line.strip()) for line in file if line.strip().isdigit()]` `return ports`` ``if __name__ == "__main__":` `# 加载钉钉 token` `token = load_dingtalk_token('token.txt')` `if not token:` `print("无法加载钉钉 token")` `else:` `# 加载端口列表` `ports = load_ports_from_file('ports.txt')` `if not ports:` `print("没有可监听的端口")` `else:` `for port in ports:` `server_thread = threading.Thread(target=start_server, args=(port, token))` `server_thread.start()`
二 、服务器指定目录指定后缀文件增加钉钉提醒监控
看一眼便懂,不介绍了。
`import os``import time``import socket``import requests``import psutil``from watchdog.observers import Observer``from watchdog.events import FileSystemEventHandler`` ``def load_from_file(filename):` `if not os.path.isfile(filename):` `print(f"文件 {filename} 不存在")` `return None` `with open(filename, 'r') as file:` `return [line.strip() for line in file.readlines()]`` ``def send_dingtalk_alert(token, message):` `webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'` `headers = {'Content-Type': 'application/json'}` `data = {'msgtype': 'text', 'text': {'content': message}}` `response = requests.post(webhook_url, json=data, headers=headers)` `if response.status_code != 200:` `print(f"发送钉钉消息失败: {response.text}")`` ``def get_local_info():` `hostname = socket.gethostname()` `local_ip = socket.gethostbyname(hostname)` `return local_ip, hostname`` ``def get_process_info(path):` `for pid in psutil.pids():` `try:` `p = psutil.Process(pid)` `for file in p.open_files():` `if file.path == path:` `return p.name(), pid` `except (psutil.NoSuchProcess, psutil.AccessDenied):` `continue` `return None, None`` ``class DirectoryEventHandler(FileSystemEventHandler):` `def __init__(self, file_extension, token, local_ip, hostname):` `self.file_extension = file_extension` `self.token = token` `self.local_ip = local_ip` `self.hostname = hostname`` ` `def on_created(self, event):` `if not event.is_directory and event.src_path.endswith(self.file_extension):` `process_name, pid = get_process_info(event.src_path)` `directory = os.path.dirname(event.src_path)` `message = (` `f"检测到新文件: {event.src_path}\n"` `f"所在目录: {directory}\n"` `f"本机内网IP: {self.local_ip}\n"` `f"主机名: {self.hostname}\n"` `f"进程名称: {process_name if process_name else '未知'}\n"` `f"进程ID: {pid if pid else '未知'}"` `)` `print(message)` `send_dingtalk_alert(self.token, message)`` ``if __name__ == "__main__":` `token = load_from_file('token.txt')` `if not token:` `print("无法加载钉钉 token")` `exit(1)`` ` `directories_to_watch = load_from_file('directories.txt')` `if not directories_to_watch:` `print("无法加载监控目录")` `exit(1)`` ` `local_ip, hostname = get_local_info()` `file_extension = '.jsp' # 请替换为你要监控的文件后缀`` ` `event_handler = DirectoryEventHandler(file_extension, token[0], local_ip, hostname)` `observer = Observer()`` ` `for directory in directories_to_watch:` `observer.schedule(event_handler, directory, recursive=True)` `print(f"开始监控目录: {directory}, 监控文件后缀: {file_extension}")`` ` `observer.start()` `try:` `while True:` `time.sleep(5)` `except KeyboardInterrupt:` `observer.stop()` `print("停止监控")` `observer.join()`
三、Linux服务器简单应急python脚本
`import os``import subprocess``import json`` ``def run_command(command):` `"""运行系统命令并返回输出"""` `try:` `result = subprocess.run(command, shell=True, capture_output=True, text=True)` `return result.stdout.strip()` `except Exception as e:` `return str(e)`` ``def collect_system_info():` `"""收集系统基本信息"""` `info = {` `'hostname': run_command('hostname'),` `'uptime': run_command('uptime'),` `'users': run_command('who'),` `'last_logins': run_command('last -n 10'),` `'disk_usage': run_command('df -h'),` `'memory_usage': run_command('free -h'),` `'cpu_info': run_command('lscpu'),` `'network_info': run_command('ifconfig -a'),` `}` `return info`` ``def check_open_ports():` `"""检查开放端口"""` `open_ports = run_command('netstat -tuln')` `return open_ports`` ``def list_running_processes():` `"""列出当前运行的进程"""` `processes = run_command('ps aux')` `return processes`` ``def search_suspicious_files():` `"""搜索可疑文件(例如最近修改的文件)"""` `suspicious_files = run_command('find / -type f -mtime -1 2>/dev/null')` `return suspicious_files`` ``def main():` `# 收集系统信息` `system_info = collect_system_info()` `with open('system_info.json', 'w') as f:` `json.dump(system_info, f, indent=4)`` ` `# 检查开放端口` `open_ports = check_open_ports()` `with open('open_ports.txt', 'w') as f:` `f.write(open_ports)` ` # 列出运行的进程` `running_processes = list_running_processes()` `with open('running_processes.txt', 'w') as f:` `f.write(running_processes)`` ` `# 搜索可疑文件` `suspicious_files = search_suspicious_files()` `with open('suspicious_files.txt', 'w') as f:` `f.write(suspicious_files)`` ` `print("应急响应数据已收集完毕,保存在当前目录下的相关文件中。")`` ``if __name__ == "__main__":` `main()`
四、Windows简单应急python脚本
`import os``import subprocess``import json`` ``def run_command(command):` `"""运行系统命令并返回输出"""` `try:` `result = subprocess.run(command, shell=True, capture_output=True, text=True)` `return result.stdout.strip()` `except Exception as e:` `return str(e)`` ``def collect_system_info():` `"""收集系统基本信息"""` `info = {` `'hostname': run_command('hostname'),` `'systeminfo': run_command('systeminfo'),` `'uptime': run_command('net statistics workstation'),` `'users': run_command('query user'),` `'last_logins': run_command('quser'),` `'disk_usage': run_command('wmic logicaldisk get size,freespace,caption'),` `'memory_usage': run_command('systeminfo | findstr /C:"Total Physical Memory" /C:"Available Physical Memory"'),` `'cpu_info': run_command('wmic cpu get name,NumberOfCores,NumberOfLogicalProcessors'),` `'network_info': run_command('ipconfig /all'),` `}` `return info`` ``def check_open_ports():` `"""检查开放端口"""` `open_ports = run_command('netstat -an')` `return open_ports`` ``def list_running_processes():` `"""列出当前运行的进程"""` `processes = run_command('tasklist')` `return processes`` ``def search_suspicious_files():` `"""搜索可疑文件(例如最近修改的文件)"""` `suspicious_files = run_command('dir /s /b /a-d /o-d')` `return suspicious_files`` ``def main():` `# 收集系统信息` `system_info = collect_system_info()` `with open('system_info.json', 'w') as f:` `json.dump(system_info, f, indent=4)`` ` `# 检查开放端口` `open_ports = check_open_ports()` `with open('open_ports.txt', 'w') as f:` `f.write(open_ports)` ` # 列出运行的进程` `running_processes = list_running_processes()` `with open('running_processes.txt', 'w') as f:` `f.write(running_processes)`` ` `# 搜索可疑文件` `suspicious_files = search_suspicious_files()` `with open('suspicious_files.txt', 'w') as f:` `f.write(suspicious_files)`` ` `print("应急响应数据已收集完毕,保存在当前目录下的相关文件中。")`` ``if __name__ == "__main__":` `main()`
五、Linux日志分析脚本
自己改下日志路径
`import os``import re``import gzip``import json``import inquirer``import subprocess``from datetime import datetime, timedelta``import argparse`` ``# 日志文件路径``LOG_FILES = {` `"system": ["/var/log/syslog", "/var/log/auth.log", "/var/log/kern.log"],` `"container": ["/var/lib/docker/containers/*/*.log"],` `"java": ["/path/to/java/application/logs/*.log"], # 修改为实际路径` `"network": ["/var/log/ufw.log", "/var/log/iptables.log"],` `"middleware": ["/var/log/nginx/access.log", "/var/log/nginx/error.log",` `"/var/log/apache2/access.log", "/var/log/apache2/error.log",` `"/var/log/mysql/error.log"],``}`` ``DEFAULT_KEYWORDS = ["error", "fail", "denied", "segfault", "panic", "exception", "timeout"]`` ``def read_log_file(file_path):` `with (gzip.open(file_path, 'rt') if file_path.endswith('.gz') else open(file_path, 'r')) as f:` `return f.readlines()`` ``def glob_files(pattern):` `return subprocess.getoutput(f"ls {pattern}").split('\n')`` ``def parse_log_line(line, date_format="%b %d %H:%M:%S"):` `try:` `timestamp_str = line.split(' ')[0]` `timestamp = datetime.strptime(timestamp_str, date_format)` `return timestamp, line` `except Exception:` `return None, line`` ``def filter_logs_by_time(logs, time_delta):` `now = datetime.now()` `return [line for log in logs if (timestamp := parse_log_line(log)[0]) and now - timestamp <= time_delta]`` ``def search_logs(logs, keywords):` `results = {}` `for keyword in keywords:` `pattern = re.compile(keyword, re.IGNORECASE)` `matching_lines = [line for line in logs if pattern.search(line)]` `if matching_lines:` `results[keyword] = matching_lines` `return results`` ``def save_results(results, output_file):` `with open(output_file, 'w') as f:` `json.dump(results, f, indent=4)`` ``def interactive_input():` `questions = [` `inquirer.Checkbox('categories', message="请选择要分析的日志类别", choices=list(LOG_FILES.keys())),` `inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"),` `inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)),` `]` `answers = inquirer.prompt(questions)` `return answers['categories'], timedelta(hours=int(answers['time_delta'])), answers['keywords'].split(',')`` ``def main():` `parser = argparse.ArgumentParser(description="日志收集和分析脚本")` `parser.add_argument('--interactive', action='store_true', help="启用交互模式")` `args = parser.parse_args()`` ` `if args.interactive:` `selected_categories, time_delta, keywords = interactive_input()` `else:` `selected_categories, time_delta, keywords = list(LOG_FILES.keys()), timedelta(hours=24), DEFAULT_KEYWORDS`` ` `all_results = {}` `for category in selected_categories:` `for pattern in LOG_FILES[category]:` `for log_file in glob_files(pattern):` `if os.path.exists(log_file):` `logs = read_log_file(log_file)` `filtered_logs = filter_logs_by_time(logs, time_delta)` `results = search_logs(filtered_logs, keywords)` `if results:` `all_results.setdefault(category, {})[log_file] = results`` ` `output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"` `save_results(all_results, output_file)` `print(f"日志分析结果已保存到 {output_file}")`` ``if __name__ == "__main__":` `main()`
六、windows日志分析脚本
自己改路径
`import win32evtlog``import win32evtlogutil``import win32security``import inquirer``import argparse``import json``from datetime import datetime, timedelta`` ``# 默认的日志源``LOG_SOURCES = {` `"System": "System",` `"Application": "Application",` `"Security": "Security",``}`` ``DEFAULT_KEYWORDS = ["error", "fail", "denied", "exception", "timeout"]`` ``def read_event_log(source, time_delta):` `server = 'localhost'` `log_type = source` `hand = win32evtlog.OpenEventLog(server, log_type)` `flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ` `total = win32evtlog.GetNumberOfEventLogRecords(hand)`` ` `events = []` `now = datetime.now()` `time_threshold = now - time_delta`` ` `while True:` `records = win32evtlog.ReadEventLog(hand, flags, 0)` `if not records:` `break` `for record in records:` `event_time = datetime.fromtimestamp(record.TimeGenerated)` `if event_time < time_threshold:` `return events` `events.append((event_time, record))`` ` `win32evtlog.CloseEventLog(hand)` `return events`` ``def search_logs(events, keywords):` `results = {}` `for keyword in keywords:` `keyword_lower = keyword.lower()` `matching_events = [event for event in events if keyword_lower in event[1].StringInserts.lower()]` `if matching_events:` `results[keyword] = matching_events` `return results`` ``def save_results(results, output_file):` `with open(output_file, 'w') as f:` `json.dump(results, f, indent=4, default=str)`` ``def interactive_input():` `questions = [` `inquirer.Checkbox('sources', message="请选择要分析的日志源", choices=list(LOG_SOURCES.keys())),` `inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"),` `inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)),` `]` `answers = inquirer.prompt(questions)` `selected_sources = [LOG_SOURCES[source] for source in answers['sources']]` `time_delta = timedelta(hours=int(answers['time_delta']))` `keywords = answers['keywords'].split(',')` `return selected_sources, time_delta, keywords`` ``def main():` `parser = argparse.ArgumentParser(description="Windows 日志收集和分析脚本")` `parser.add_argument('--interactive', action='store_true', help="启用交互模式")` `args = parser.parse_args()`` ` `if args.interactive:` `selected_sources, time_delta, keywords = interactive_input()` `else:` `selected_sources = list(LOG_SOURCES.values())` `time_delta = timedelta(hours=24)` `keywords = DEFAULT_KEYWORDS`` ` `all_results = {}` `for source in selected_sources:` `events = read_event_log(source, time_delta)` `results = search_logs(events, keywords)` `if results:` `all_results[source] = results`` ` `output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"` `save_results(all_results, output_file)` `print(f"日志分析结果已保存到 {output_file}")`` ``if __name__ == "__main__":` `main()`
七、内存马筛查脚本
`import psutil``import subprocess``import os``import json`` ``# 检查可疑进程``def check_suspicious_processes():` `suspicious_processes = []` `for proc in psutil.process_iter(['pid', 'name', 'cmdline']):` `if any(keyword in proc.info['cmdline'] for keyword in ['java', 'tomcat', 'nacos']):` `suspicious_processes.append(proc.info)` `return suspicious_processes`` ``# 分析网络连接``def analyze_network_connections():` `network_connections = []` `for conn in psutil.net_connections(kind='inet'):` `if conn.laddr.port in range(8000, 9000) or conn.status == 'LISTEN':` `try:` `proc = psutil.Process(conn.pid)` `if any(keyword in proc.cmdline() for keyword in ['java', 'tomcat', 'nacos']):` `network_connections.append({` `'pid': conn.pid,` `'name': proc.name(),` `'cmdline': proc.cmdline(),` `'local_address': conn.laddr,` `'remote_address': conn.raddr,` `'status': conn.status` `})` `except psutil.NoSuchProcess:` `continue` `return network_connections`` ``# 检查文件系统中的可疑文件``def check_suspicious_files():` `suspicious_files = []` `search_paths = ['/var/www/html', '/usr/share/nginx/html']` `keywords = ['Behinder', 'AntSword', 'Godzilla']` `for path in search_paths:` `for root, _, files in os.walk(path):` `for file in files:` `if file.endswith(('.jsp', '.php', '.aspx')):` `file_path = os.path.join(root, file)` `with open(file_path, 'r', errors='ignore') as f:` `content = f.read()` `if any(keyword in content for keyword in keywords):` `suspicious_files.append(file_path)` `return suspicious_files`` ``# 获取Java进程ID``def get_java_pids():` `java_pids = []` `for proc in psutil.process_iter(['pid', 'name', 'cmdline']):` `if 'java' in proc.info['cmdline']:` `java_pids.append(proc.info['pid'])` `return java_pids`` ``# 检查Java进程中的内存马``def check_memory_malware(pids):` `heap_dumps = []` `for pid in pids:` `dump_file = f'/tmp/heapdump_{pid}.hprof'` `cmd = ['jmap', '-dump:live,format=b,file=' + dump_file, str(pid)]` `try:` `subprocess.run(cmd, check=True)` `heap_dumps.append(dump_file)` `except subprocess.CalledProcessError:` `print(f"内存转储失败: {pid}")` `return heap_dumps`` ``# 分析内存转储文件``def analyze_heap_dump(dump_file):` `analysis_result = {}` `# 假设我们使用 Eclipse MAT 的命令行工具进行分析` `mat_cmd = [` `'java', '-jar', 'org.eclipse.mat.cli-1.11.0.jar', # 替换为实际的 Eclipse MAT CLI 工具路径` `'-consolelog',` `'-heapdump', dump_file,` `'-query', 'find_leaks', # 这里使用 MAT 内置的 find_leaks 查询` `'-format', 'JSON',` `'-output', dump_file + '.json'` `]` `try:` `subprocess.run(mat_cmd, check=True)` `with open(dump_file + '.json') as f:` `analysis_result = json.load(f)` `except subprocess.CalledProcessError:` `print(f"内存分析失败: {dump_file}")` `except FileNotFoundError:` `print(f"找不到分析结果文件: {dump_file}.json")` `return analysis_result`` ``# 主函数``def main():` `results = {` `'suspicious_processes': check_suspicious_processes(),` `'network_connections': analyze_network_connections`
八、cobalt等远控的dns请求筛查脚本
根据DNS请求频率
`from scapy.all import *``import time`` ``# 存储 DNS 请求的字典``dns_requests = {}`` ``# 捕获并解析 DNS 数据包的回调函数``def dns_monitor_callback(packet):` `if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0: # 只关注DNS请求` `dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8')` `src_ip = packet[IP].src` `current_time = time.time()`` ` `# 记录每个源IP的DNS请求时间` `if src_ip not in dns_requests:` `dns_requests[src_ip] = []` `dns_requests[src_ip].append(current_time)`` ` `# 检查最近的一段时间内的请求频率` `dns_requests[src_ip] = [t for t in dns_requests[src_ip] if current_time - t < 60] # 只保留最近60秒的请求` `if len(dns_requests[src_ip]) > 20: # 如果60秒内的请求数超过20次,触发警报` `print(f"[ALERT] High DNS request frequency from {src_ip}: {len(dns_requests[src_ip])} requests in the last minute")`` ``# 使用scapy捕获DNS流量``def start_dns_monitor():` `print("Starting DNS traffic monitor...")` `sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)`` ``if __name__ == "__main__":` `start_dns_monitor()`
dns响应数据包类型
`from scapy.all import *`` ``# 捕获并解析 DNS 数据包的回调函数``def dns_monitor_callback(packet):` `if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 1: # 只关注DNS响应` `dns_response = packet.getlayer(DNS).an` `if dns_response:` `response_size = len(dns_response)` `if response_size > 512: # 检查响应数据包大小,超过512字节的可能是异常的` `src_ip = packet[IP].src` `print(f"[ALERT] Large DNS response detected from {src_ip}: {response_size} bytes")`` ``# 使用scapy捕获DNS流量``def start_dns_monitor():` `print("Starting DNS traffic monitor...")` `sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)`` ``if __name__ == "__main__":` `start_dns_monitor()`
检测特征字符串
`from scapy.all import *`` ``# 定义特征字符串或正则表达式模式``cobalt_strike_patterns = [` `re.compile(r'^[a-zA-Z0-9]{16,}\.'),` `re.compile(r'\..*\..*\..*') # 多级子域名``]`` ``# 捕获并解析 DNS 数据包的回调函数``def dns_monitor_callback(packet):` `if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0: # 只关注DNS请求` `dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8')` `src_ip = packet[IP].src` `for pattern in cobalt_strike_patterns:` `if pattern.match(dns_query):` `print(f"[ALERT] Potential Cobalt Strike pattern detected: {dns_query} from {src_ip}")`` ``# 使用scapy捕获DNS流量``def start_dns_monitor():` `print("Starting DNS traffic monitor...")` `sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)`` ``if __name__ == "__main__":` `start_dns_monitor()`
暂时就这些,手搓的好处就是有python环境随时内网搭建一个就算不出网也能监控。还不用担心安全设备本身随时可能被发现的RCE漏洞。
进群后台回复“进群”。