长亭百川云 - 文章详情

HW蓝队高级值守金手指

金色钱江

53

2024-07-13

梦回净土抵万侵!!!

在实力面前,不相信任何运维工具,只相信自己手搓的脚本。

在放工具之前来点小插曲。

之前看公众号有师傅反制了macos,遂自查了一下,不查不知道,一查发现两个进程有网络连接。

PowerChime进程

该进程为MAC的充电提示音进程,一个充电提示音进程有外联,而且还是本地ipv6。通过充电频率来统计数据,不排除还传其他东西。

fe80开头地址为本地地址,但是不排除本地代理流量再传递的可能

`进程名称: PowerChime``用户名: mac``网络连接:`  `本地地址: ipv6:62300`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:52115`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:63501`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:51850`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:60860`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:55092`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:54878`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:54829`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:60721`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED`  `本地地址: ipv6:64410`  `远程地址: fe80:8::aede:48ff:fe33:4455:49178`  `状态: ESTABLISHED``   `

corespeechd进程

该进程为语音识别进程,我没打开语音识别辅助,他已经悄悄在后台给我启动了。

`进程名称: corespeechd``用户名: mac``网络连接:`  `本地地址: ipv6:49742`  `远程地址: fe80:8::aede:48ff:fe33:4455:49162`  `状态: ESTABLISHED`

禁用ipv6命令:

`networksetup -setv6off Wi-Fi``networksetup -setv6off Ethernet`

PowerChime:

`defaults write com.apple.PowerChime ChimeOnNoHardware -bool true``sudo killall PowerChime`

corespeechd(很顽固,kill半天kill不掉,最后直接删掉,让他变哑巴聋子):

`重启你的 Mac。``按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。``csrutil disable``sudo mv /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd /System/Library/PrivateFrameworks/CoreSpeech.framework/corespeechd.bak``重启你的 Mac。``按住 Command (⌘) + R 键,直到看到 Apple 标志或启动到恢复模式。``csrutil enable``关闭恢复模式终端,然后从苹果菜单中选择“重启”。`

一 、内网被动流量探针钉钉提醒监控

该脚本通过监听开启的自定义端口流量来达到内网扫描告警目的。

端口占用会报错,尽量搞台干净系统运行。

常见端口:

`21``22``80``105``135``139``443``445``1433``1521``2181``2379``3000``3306``3389``4443``4848``5432``5672``6379``6443``7001``7002``7003``7077``7848``8009``8080``8081``8161``8181``8200``8443``8848``8983``9000``9001``9042``9080``9090``9200``9300``9848``9990``9999``10250``11211``15672``19999``27017``50000``50070``61616`
`import socket``import threading``import requests``import os``   ``# 从 token.txt 文件中读取钉钉机器人 Webhook 的 token``def load_dingtalk_token(filename):`    `if not os.path.isfile(filename):`        `print(f"文件 {filename} 不存在")`        `return None`    `    with open(filename, 'r') as file:`        `token = file.read().strip()`    `return token``   ``# 使用钉钉机器人发送提醒``def send_dingtalk_alert(token, message):`    `webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'`    `headers = {`        `'Content-Type': 'application/json'`    `}`    `data = {`        `'msgtype': 'text',`        `'text': {`            `'content': message`        `}`    `}`    `response = requests.post(webhook_url, json=data, headers=headers)`    `if response.status_code != 200:`        `print(f"发送钉钉消息失败: {response.text}")``   ``# 处理客户端连接``def handle_client(client_socket, local_ip, local_port, token):`    `remote_ip, remote_port = client_socket.getpeername()`    `hostname = socket.gethostname()`    `    # 接收请求数据`    `request_data = b""`    `while True:`        `data = client_socket.recv(1024)`        `if not data:`            `break`        `request_data += data`    `    request_text = request_data.decode('utf-8', errors='replace')`    `message = f"本机内网IP: {local_ip}\n" \`              `f"主机名: {hostname}\n" \`              `f"被连接端口: {local_port}\n" \`              `f"远程连接IP: {remote_ip}\n" \`              `f"远程连接端口: {remote_port}\n" \`              `f"请求数据:\n{request_text}"`    `print(message)`    `send_dingtalk_alert(token, message)`    `client_socket.close()``   ``# 启动服务器``def start_server(port, token):`    `server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)`    `server.bind(('0.0.0.0', port))`    `server.listen(5)`    `local_ip = socket.gethostbyname(socket.gethostname())``   `    `print(f"服务器启动,监听端口: {port}")`    `while True:`        `client_socket, addr = server.accept()`        `client_handler = threading.Thread(`            `target=handle_client,`            `args=(client_socket, local_ip, port, token)`        `)`        `client_handler.start()``   ``# 从文件中加载端口列表``def load_ports_from_file(filename):`    `if not os.path.isfile(filename):`        `print(f"文件 {filename} 不存在")`        `return []`    `    with open(filename, 'r') as file:`        `ports = [int(line.strip()) for line in file if line.strip().isdigit()]`    `return ports``   ``if __name__ == "__main__":`    `# 加载钉钉 token`    `token = load_dingtalk_token('token.txt')`    `if not token:`        `print("无法加载钉钉 token")`    `else:`        `# 加载端口列表`        `ports = load_ports_from_file('ports.txt')`        `if not ports:`            `print("没有可监听的端口")`        `else:`            `for port in ports:`                `server_thread = threading.Thread(target=start_server, args=(port, token))`                `server_thread.start()`

二 、服务器指定目录指定后缀文件增加钉钉提醒监控

看一眼便懂,不介绍了。

`import os``import time``import socket``import requests``import psutil``from watchdog.observers import Observer``from watchdog.events import FileSystemEventHandler``   ``def load_from_file(filename):`    `if not os.path.isfile(filename):`        `print(f"文件 {filename} 不存在")`        `return None`    `with open(filename, 'r') as file:`        `return [line.strip() for line in file.readlines()]``   ``def send_dingtalk_alert(token, message):`    `webhook_url = f'https://oapi.dingtalk.com/robot/send?access_token={token}'`    `headers = {'Content-Type': 'application/json'}`    `data = {'msgtype': 'text', 'text': {'content': message}}`    `response = requests.post(webhook_url, json=data, headers=headers)`    `if response.status_code != 200:`        `print(f"发送钉钉消息失败: {response.text}")``   ``def get_local_info():`    `hostname = socket.gethostname()`    `local_ip = socket.gethostbyname(hostname)`    `return local_ip, hostname``   ``def get_process_info(path):`    `for pid in psutil.pids():`        `try:`            `p = psutil.Process(pid)`            `for file in p.open_files():`                `if file.path == path:`                    `return p.name(), pid`        `except (psutil.NoSuchProcess, psutil.AccessDenied):`            `continue`    `return None, None``   ``class DirectoryEventHandler(FileSystemEventHandler):`    `def __init__(self, file_extension, token, local_ip, hostname):`        `self.file_extension = file_extension`        `self.token = token`        `self.local_ip = local_ip`        `self.hostname = hostname``   `    `def on_created(self, event):`        `if not event.is_directory and event.src_path.endswith(self.file_extension):`            `process_name, pid = get_process_info(event.src_path)`            `directory = os.path.dirname(event.src_path)`            `message = (`                `f"检测到新文件: {event.src_path}\n"`                `f"所在目录: {directory}\n"`                `f"本机内网IP: {self.local_ip}\n"`                `f"主机名: {self.hostname}\n"`                `f"进程名称: {process_name if process_name else '未知'}\n"`                `f"进程ID: {pid if pid else '未知'}"`            `)`            `print(message)`            `send_dingtalk_alert(self.token, message)``   ``if __name__ == "__main__":`    `token = load_from_file('token.txt')`    `if not token:`        `print("无法加载钉钉 token")`        `exit(1)``   `    `directories_to_watch = load_from_file('directories.txt')`    `if not directories_to_watch:`        `print("无法加载监控目录")`        `exit(1)``   `    `local_ip, hostname = get_local_info()`    `file_extension = '.jsp'  # 请替换为你要监控的文件后缀``   `    `event_handler = DirectoryEventHandler(file_extension, token[0], local_ip, hostname)`    `observer = Observer()``   `    `for directory in directories_to_watch:`        `observer.schedule(event_handler, directory, recursive=True)`        `print(f"开始监控目录: {directory}, 监控文件后缀: {file_extension}")``   `    `observer.start()`    `try:`        `while True:`            `time.sleep(5)`    `except KeyboardInterrupt:`        `observer.stop()`        `print("停止监控")`    `observer.join()`

三、Linux服务器简单应急python脚本

`import os``import subprocess``import json``   ``def run_command(command):`    `"""运行系统命令并返回输出"""`    `try:`        `result = subprocess.run(command, shell=True, capture_output=True, text=True)`        `return result.stdout.strip()`    `except Exception as e:`        `return str(e)``   ``def collect_system_info():`    `"""收集系统基本信息"""`    `info = {`        `'hostname': run_command('hostname'),`        `'uptime': run_command('uptime'),`        `'users': run_command('who'),`        `'last_logins': run_command('last -n 10'),`        `'disk_usage': run_command('df -h'),`        `'memory_usage': run_command('free -h'),`        `'cpu_info': run_command('lscpu'),`        `'network_info': run_command('ifconfig -a'),`    `}`    `return info``   ``def check_open_ports():`    `"""检查开放端口"""`    `open_ports = run_command('netstat -tuln')`    `return open_ports``   ``def list_running_processes():`    `"""列出当前运行的进程"""`    `processes = run_command('ps aux')`    `return processes``   ``def search_suspicious_files():`    `"""搜索可疑文件(例如最近修改的文件)"""`    `suspicious_files = run_command('find / -type f -mtime -1 2>/dev/null')`    `return suspicious_files``   ``def main():`    `# 收集系统信息`    `system_info = collect_system_info()`    `with open('system_info.json', 'w') as f:`        `json.dump(system_info, f, indent=4)``   `    `# 检查开放端口`    `open_ports = check_open_ports()`    `with open('open_ports.txt', 'w') as f:`        `f.write(open_ports)`    `    # 列出运行的进程`    `running_processes = list_running_processes()`    `with open('running_processes.txt', 'w') as f:`        `f.write(running_processes)``   `    `# 搜索可疑文件`    `suspicious_files = search_suspicious_files()`    `with open('suspicious_files.txt', 'w') as f:`        `f.write(suspicious_files)``   `    `print("应急响应数据已收集完毕,保存在当前目录下的相关文件中。")``   ``if __name__ == "__main__":`    `main()`

四、Windows简单应急python脚本

`import os``import subprocess``import json``   ``def run_command(command):`    `"""运行系统命令并返回输出"""`    `try:`        `result = subprocess.run(command, shell=True, capture_output=True, text=True)`        `return result.stdout.strip()`    `except Exception as e:`        `return str(e)``   ``def collect_system_info():`    `"""收集系统基本信息"""`    `info = {`        `'hostname': run_command('hostname'),`        `'systeminfo': run_command('systeminfo'),`        `'uptime': run_command('net statistics workstation'),`        `'users': run_command('query user'),`        `'last_logins': run_command('quser'),`        `'disk_usage': run_command('wmic logicaldisk get size,freespace,caption'),`        `'memory_usage': run_command('systeminfo | findstr /C:"Total Physical Memory" /C:"Available Physical Memory"'),`        `'cpu_info': run_command('wmic cpu get name,NumberOfCores,NumberOfLogicalProcessors'),`        `'network_info': run_command('ipconfig /all'),`    `}`    `return info``   ``def check_open_ports():`    `"""检查开放端口"""`    `open_ports = run_command('netstat -an')`    `return open_ports``   ``def list_running_processes():`    `"""列出当前运行的进程"""`    `processes = run_command('tasklist')`    `return processes``   ``def search_suspicious_files():`    `"""搜索可疑文件(例如最近修改的文件)"""`    `suspicious_files = run_command('dir /s /b /a-d /o-d')`    `return suspicious_files``   ``def main():`    `# 收集系统信息`    `system_info = collect_system_info()`    `with open('system_info.json', 'w') as f:`        `json.dump(system_info, f, indent=4)``   `    `# 检查开放端口`    `open_ports = check_open_ports()`    `with open('open_ports.txt', 'w') as f:`        `f.write(open_ports)`    `    # 列出运行的进程`    `running_processes = list_running_processes()`    `with open('running_processes.txt', 'w') as f:`        `f.write(running_processes)``   `    `# 搜索可疑文件`    `suspicious_files = search_suspicious_files()`    `with open('suspicious_files.txt', 'w') as f:`        `f.write(suspicious_files)``   `    `print("应急响应数据已收集完毕,保存在当前目录下的相关文件中。")``   ``if __name__ == "__main__":`    `main()`

五、Linux日志分析脚本

自己改下日志路径

`import os``import re``import gzip``import json``import inquirer``import subprocess``from datetime import datetime, timedelta``import argparse``   ``# 日志文件路径``LOG_FILES = {`    `"system": ["/var/log/syslog", "/var/log/auth.log", "/var/log/kern.log"],`    `"container": ["/var/lib/docker/containers/*/*.log"],`    `"java": ["/path/to/java/application/logs/*.log"],  # 修改为实际路径`    `"network": ["/var/log/ufw.log", "/var/log/iptables.log"],`    `"middleware": ["/var/log/nginx/access.log", "/var/log/nginx/error.log",`                   `"/var/log/apache2/access.log", "/var/log/apache2/error.log",`                   `"/var/log/mysql/error.log"],``}``   ``DEFAULT_KEYWORDS = ["error", "fail", "denied", "segfault", "panic", "exception", "timeout"]``   ``def read_log_file(file_path):`    `with (gzip.open(file_path, 'rt') if file_path.endswith('.gz') else open(file_path, 'r')) as f:`        `return f.readlines()``   ``def glob_files(pattern):`    `return subprocess.getoutput(f"ls {pattern}").split('\n')``   ``def parse_log_line(line, date_format="%b %d %H:%M:%S"):`    `try:`        `timestamp_str = line.split(' ')[0]`        `timestamp = datetime.strptime(timestamp_str, date_format)`        `return timestamp, line`    `except Exception:`        `return None, line``   ``def filter_logs_by_time(logs, time_delta):`    `now = datetime.now()`    `return [line for log in logs if (timestamp := parse_log_line(log)[0]) and now - timestamp <= time_delta]``   ``def search_logs(logs, keywords):`    `results = {}`    `for keyword in keywords:`        `pattern = re.compile(keyword, re.IGNORECASE)`        `matching_lines = [line for line in logs if pattern.search(line)]`        `if matching_lines:`            `results[keyword] = matching_lines`    `return results``   ``def save_results(results, output_file):`    `with open(output_file, 'w') as f:`        `json.dump(results, f, indent=4)``   ``def interactive_input():`    `questions = [`        `inquirer.Checkbox('categories', message="请选择要分析的日志类别", choices=list(LOG_FILES.keys())),`        `inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"),`        `inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)),`    `]`    `answers = inquirer.prompt(questions)`    `return answers['categories'], timedelta(hours=int(answers['time_delta'])), answers['keywords'].split(',')``   ``def main():`    `parser = argparse.ArgumentParser(description="日志收集和分析脚本")`    `parser.add_argument('--interactive', action='store_true', help="启用交互模式")`    `args = parser.parse_args()``   `    `if args.interactive:`        `selected_categories, time_delta, keywords = interactive_input()`    `else:`        `selected_categories, time_delta, keywords = list(LOG_FILES.keys()), timedelta(hours=24), DEFAULT_KEYWORDS``   `    `all_results = {}`    `for category in selected_categories:`        `for pattern in LOG_FILES[category]:`            `for log_file in glob_files(pattern):`                `if os.path.exists(log_file):`                    `logs = read_log_file(log_file)`                    `filtered_logs = filter_logs_by_time(logs, time_delta)`                    `results = search_logs(filtered_logs, keywords)`                    `if results:`                        `all_results.setdefault(category, {})[log_file] = results``   `    `output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"`    `save_results(all_results, output_file)`    `print(f"日志分析结果已保存到 {output_file}")``   ``if __name__ == "__main__":`    `main()`

六、windows日志分析脚本

自己改路径

`import win32evtlog``import win32evtlogutil``import win32security``import inquirer``import argparse``import json``from datetime import datetime, timedelta``   ``# 默认的日志源``LOG_SOURCES = {`    `"System": "System",`    `"Application": "Application",`    `"Security": "Security",``}``   ``DEFAULT_KEYWORDS = ["error", "fail", "denied", "exception", "timeout"]``   ``def read_event_log(source, time_delta):`    `server = 'localhost'`    `log_type = source`    `hand = win32evtlog.OpenEventLog(server, log_type)`    `flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ`    `total = win32evtlog.GetNumberOfEventLogRecords(hand)``   `    `events = []`    `now = datetime.now()`    `time_threshold = now - time_delta``   `    `while True:`        `records = win32evtlog.ReadEventLog(hand, flags, 0)`        `if not records:`            `break`        `for record in records:`            `event_time = datetime.fromtimestamp(record.TimeGenerated)`            `if event_time < time_threshold:`                `return events`            `events.append((event_time, record))``   `    `win32evtlog.CloseEventLog(hand)`    `return events``   ``def search_logs(events, keywords):`    `results = {}`    `for keyword in keywords:`        `keyword_lower = keyword.lower()`        `matching_events = [event for event in events if keyword_lower in event[1].StringInserts.lower()]`        `if matching_events:`            `results[keyword] = matching_events`    `return results``   ``def save_results(results, output_file):`    `with open(output_file, 'w') as f:`        `json.dump(results, f, indent=4, default=str)``   ``def interactive_input():`    `questions = [`        `inquirer.Checkbox('sources', message="请选择要分析的日志源", choices=list(LOG_SOURCES.keys())),`        `inquirer.Text('time_delta', message="请输入要分析的时间范围(小时)", default="24"),`        `inquirer.Text('keywords', message="请输入要搜索的关键字(逗号分隔)", default=",".join(DEFAULT_KEYWORDS)),`    `]`    `answers = inquirer.prompt(questions)`    `selected_sources = [LOG_SOURCES[source] for source in answers['sources']]`    `time_delta = timedelta(hours=int(answers['time_delta']))`    `keywords = answers['keywords'].split(',')`    `return selected_sources, time_delta, keywords``   ``def main():`    `parser = argparse.ArgumentParser(description="Windows 日志收集和分析脚本")`    `parser.add_argument('--interactive', action='store_true', help="启用交互模式")`    `args = parser.parse_args()``   `    `if args.interactive:`        `selected_sources, time_delta, keywords = interactive_input()`    `else:`        `selected_sources = list(LOG_SOURCES.values())`        `time_delta = timedelta(hours=24)`        `keywords = DEFAULT_KEYWORDS``   `    `all_results = {}`    `for source in selected_sources:`        `events = read_event_log(source, time_delta)`        `results = search_logs(events, keywords)`        `if results:`            `all_results[source] = results``   `    `output_file = f"log_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"`    `save_results(all_results, output_file)`    `print(f"日志分析结果已保存到 {output_file}")``   ``if __name__ == "__main__":`    `main()`

七、内存马筛查脚本

`import psutil``import subprocess``import os``import json``   ``# 检查可疑进程``def check_suspicious_processes():`    `suspicious_processes = []`    `for proc in psutil.process_iter(['pid', 'name', 'cmdline']):`        `if any(keyword in proc.info['cmdline'] for keyword in ['java', 'tomcat', 'nacos']):`            `suspicious_processes.append(proc.info)`    `return suspicious_processes``   ``# 分析网络连接``def analyze_network_connections():`    `network_connections = []`    `for conn in psutil.net_connections(kind='inet'):`        `if conn.laddr.port in range(8000, 9000) or conn.status == 'LISTEN':`            `try:`                `proc = psutil.Process(conn.pid)`                `if any(keyword in proc.cmdline() for keyword in ['java', 'tomcat', 'nacos']):`                    `network_connections.append({`                        `'pid': conn.pid,`                        `'name': proc.name(),`                        `'cmdline': proc.cmdline(),`                        `'local_address': conn.laddr,`                        `'remote_address': conn.raddr,`                        `'status': conn.status`                    `})`            `except psutil.NoSuchProcess:`                `continue`    `return network_connections``   ``# 检查文件系统中的可疑文件``def check_suspicious_files():`    `suspicious_files = []`    `search_paths = ['/var/www/html', '/usr/share/nginx/html']`    `keywords = ['Behinder', 'AntSword', 'Godzilla']`    `for path in search_paths:`        `for root, _, files in os.walk(path):`            `for file in files:`                `if file.endswith(('.jsp', '.php', '.aspx')):`                    `file_path = os.path.join(root, file)`                    `with open(file_path, 'r', errors='ignore') as f:`                        `content = f.read()`                        `if any(keyword in content for keyword in keywords):`                            `suspicious_files.append(file_path)`    `return suspicious_files``   ``# 获取Java进程ID``def get_java_pids():`    `java_pids = []`    `for proc in psutil.process_iter(['pid', 'name', 'cmdline']):`        `if 'java' in proc.info['cmdline']:`            `java_pids.append(proc.info['pid'])`    `return java_pids``   ``# 检查Java进程中的内存马``def check_memory_malware(pids):`    `heap_dumps = []`    `for pid in pids:`        `dump_file = f'/tmp/heapdump_{pid}.hprof'`        `cmd = ['jmap', '-dump:live,format=b,file=' + dump_file, str(pid)]`        `try:`            `subprocess.run(cmd, check=True)`            `heap_dumps.append(dump_file)`        `except subprocess.CalledProcessError:`            `print(f"内存转储失败: {pid}")`    `return heap_dumps``   ``# 分析内存转储文件``def analyze_heap_dump(dump_file):`    `analysis_result = {}`    `# 假设我们使用 Eclipse MAT 的命令行工具进行分析`    `mat_cmd = [`        `'java', '-jar', 'org.eclipse.mat.cli-1.11.0.jar',  # 替换为实际的 Eclipse MAT CLI 工具路径`        `'-consolelog',`        `'-heapdump', dump_file,`        `'-query', 'find_leaks',  # 这里使用 MAT 内置的 find_leaks 查询`        `'-format', 'JSON',`        `'-output', dump_file + '.json'`    `]`    `try:`        `subprocess.run(mat_cmd, check=True)`        `with open(dump_file + '.json') as f:`            `analysis_result = json.load(f)`    `except subprocess.CalledProcessError:`        `print(f"内存分析失败: {dump_file}")`    `except FileNotFoundError:`        `print(f"找不到分析结果文件: {dump_file}.json")`    `return analysis_result``   ``# 主函数``def main():`    `results = {`        `'suspicious_processes': check_suspicious_processes(),`        `'network_connections': analyze_network_connections`

八、cobalt等远控的dns请求筛查脚本

根据DNS请求频率

`from scapy.all import *``import time``   ``# 存储 DNS 请求的字典``dns_requests = {}``   ``# 捕获并解析 DNS 数据包的回调函数``def dns_monitor_callback(packet):`    `if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0:  # 只关注DNS请求`        `dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8')`        `src_ip = packet[IP].src`        `current_time = time.time()``   `        `# 记录每个源IP的DNS请求时间`        `if src_ip not in dns_requests:`            `dns_requests[src_ip] = []`        `dns_requests[src_ip].append(current_time)``   `        `# 检查最近的一段时间内的请求频率`        `dns_requests[src_ip] = [t for t in dns_requests[src_ip] if current_time - t < 60]  # 只保留最近60秒的请求`        `if len(dns_requests[src_ip]) > 20:  # 如果60秒内的请求数超过20次,触发警报`            `print(f"[ALERT] High DNS request frequency from {src_ip}: {len(dns_requests[src_ip])} requests in the last minute")``   ``# 使用scapy捕获DNS流量``def start_dns_monitor():`    `print("Starting DNS traffic monitor...")`    `sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)``   ``if __name__ == "__main__":`    `start_dns_monitor()`

dns响应数据包类型

`from scapy.all import *``   ``# 捕获并解析 DNS 数据包的回调函数``def dns_monitor_callback(packet):`    `if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 1:  # 只关注DNS响应`        `dns_response = packet.getlayer(DNS).an`        `if dns_response:`            `response_size = len(dns_response)`            `if response_size > 512:  # 检查响应数据包大小,超过512字节的可能是异常的`                `src_ip = packet[IP].src`                `print(f"[ALERT] Large DNS response detected from {src_ip}: {response_size} bytes")``   ``# 使用scapy捕获DNS流量``def start_dns_monitor():`    `print("Starting DNS traffic monitor...")`    `sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)``   ``if __name__ == "__main__":`    `start_dns_monitor()`

检测特征字符串

`from scapy.all import *``   ``# 定义特征字符串或正则表达式模式``cobalt_strike_patterns = [`    `re.compile(r'^[a-zA-Z0-9]{16,}\.'),`    `re.compile(r'\..*\..*\..*')  # 多级子域名``]``   ``# 捕获并解析 DNS 数据包的回调函数``def dns_monitor_callback(packet):`    `if packet.haslayer(DNS) and packet.getlayer(DNS).qr == 0:  # 只关注DNS请求`        `dns_query = packet.getlayer(DNS).qd.qname.decode('utf-8')`        `src_ip = packet[IP].src`        `for pattern in cobalt_strike_patterns:`            `if pattern.match(dns_query):`                `print(f"[ALERT] Potential Cobalt Strike pattern detected: {dns_query} from {src_ip}")``   ``# 使用scapy捕获DNS流量``def start_dns_monitor():`    `print("Starting DNS traffic monitor...")`    `sniff(filter="udp port 53", prn=dns_monitor_callback, store=0)``   ``if __name__ == "__main__":`    `start_dns_monitor()`

暂时就这些,手搓的好处就是有python环境随时内网搭建一个就算不出网也能监控。还不用担心安全设备本身随时可能被发现的RCE漏洞。

进群后台回复“进群”。

相关推荐
关注或联系我们
添加百川云公众号,移动管理云安全产品
咨询热线:
4000-327-707
百川公众号
百川公众号
百川云客服
百川云客服

Copyright ©2024 北京长亭科技有限公司
icon
京ICP备 2024055124号-2