python调用namp.py进行扫描,调用go编译的so文件

佚名 / 2023-08-28 / 原文

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import os
import platform
from ctypes import cdll, c_char_p
from urllib import parse

import netaddr
from typing import List, Dict, Any
import nmap
from log import logger
from concurrent.futures import ThreadPoolExecutor, as_completed

"""
# https://www.keepnight.com/archives/1423/
-PS: 这个选项用于指定TCP SYN扫描的目标端口。当使用-PS选项时,Nmap将发送TCP SYN包到指定的端口,通过观察目标主机返回的不同响应来确定端口的状态。
        如果目标主机响应一个RST包,表示端口是关闭的;如果目标主机响应一个SYN/ACK包,表示端口是开放的;
        如果没有响应或收到其他类型的响应,则可能表示端口被过滤或阻止。

-Pn: 这个选项用于禁用主机存活检测。当使用-Pn选项时,Nmap将不执行任何主机存活检测,而是直接进行端口扫描。这意味着无论目标主机是否存活,Nmap都会尝试进行端口扫描。

-PE: 这个选项用于启用ICMP echo请求(ping)进行主机存活检测。当使用-PE选项时,Nmap将发送ICMP echo请求到目标主机,通过观察是否收到ICMP echo回复来确定主机的存活状态。
        如果目标主机响应了ICMP echo请求,表示主机存活;如果没有响应,则表示主机可能不存活。
"""

# 资产扫描
class AssetScan(object):

    def __init__(self, proxy: str) -> None:
        if not parse.urlparse(proxy).scheme:
            self.proxy = ''  # 错误的代理格式
        else:
            self.proxy = proxy
        self.sudo = False  # docker中无sudo命令
        # if platform.system() == 'Windows':
        #     self.sudo = False
        self.methods = {
            # nmap探活调用参数
            'AUTO': '-PE -PS{} -n -sn --disable-arp-ping --max-retries=2 --min-rate=1000 -T4'.format(
                ','.join(TOP_1000_PORTS)),
            'PING': '-PE -n -sn --disable-arp-ping --max-retries=2',
            'TOP': '-PS{} -n -sn --disable-arp-ping --max-retries=0 --min-rate=1000 -T4'.format(
                ','.join(TOP_1000_PORTS)),
            # nmap端口扫描调用参数
            'TCP-SYN': '-Pn -sS -n -sU --max-retries=3 --version-light --open --min-rate={} --max-rate={} -T4 --host-timeout {}s',
            'TCP-CONNECT': '-Pn -sT -sU -n --version-light --open --min-rate={} --max-rate={} -T4 --host-timeout {}s',
            'TCP-SYN-WITH-UDP': '-Pn -sS -sU -n --version-light --open --min-rate=1000 -T4 --host-timeout 30m',
            'TCP-CONNECT-WITH-UDP': '-Pn -sT -sU -n --version-light --open --min-rate=1000 -T4 --host-timeout 30m',
        }

    def _get_proxychains(self) -> str:
        """
        :return:
        """
        proxychains_config = ''
        if self.proxy and platform.system() != 'Windows':
            logger.info(self.proxy)
            url_parts = parse.urlparse(self.proxy)
            proxychains_config = os.path.join('/tmp', '{}@{}@{}.conf'.format(
                url_parts.scheme,url_parts.hostname, url_parts.port))

            # 写入代理配置
            tmp_config = '''strict_chain
remote_dns_subnet 224
tcp_read_time_out 1200
tcp_connect_time_out 800
[ProxyList]
'''
            tmp_config = tmp_config + '{} {} {} {} {}'.format(url_parts.scheme, url_parts.hostname,
                                                              url_parts.port,
                                                              url_parts.username if url_parts.username else '',
                                                              url_parts.password if url_parts.password else '')
            try:
                with open(proxychains_config, 'w', encoding='utf-8') as f:
                    f.write(tmp_config)
            except Exception as err:
                logger.error('获取代理配置文件失败: {}'.format(err))
                return ''
        return proxychains_config

    def __host_discover_with_nmap(self, method: str, targets: str, host_scanner_type: int = 1,
                                  host_scanner_ports: list = TOP_30_TCP_PORTS) -> List[str]:
        """
        nmap主机存活扫描
        :param method:  探活方式  AUTO: 智能模式  PING: icmp探活   TOP: 扫描一些常用端口判定目标为存活   PN: 假定目标全部存活
        :param targets:
        :return:
        """
        online_ip = []
        try:
            address = targets.split(' ')
            if method == 'PN':
                # PN模式,假定目标全部存活
                ret = []
                for addr in address:
                    if '-' in addr:
                        a = addr.split('.')
                        range_ = a[3].split('-')
                        tmp_addr = "{}.{}.{}.".format(a[0], a[1], a[2])
                        for i in range(int(range_[0]), int(range_[1]) + 1):
                            ret.append("{}{}".format(tmp_addr, str(i)))
                    else:
                        ips = netaddr.IPNetwork(addr)
                        for ip in ips:
                            ret.append(str(ip))
                return ret
            else:
                nm = nmap.PortScanner()
                arguments = self.__auto_scanner_command()
                    arguments = self.__top_scanner_command(host_scanner_ports)
                result = nm.scan(hosts=targets, arguments=arguments, sudo=self.sudo)
                if host_scanner_type in [0, 4, 3]:
                    for ip, val in result['scan'].items():
                        # if val.get('status', {}).get('state') == 'up':
                        #     online_ip.append(ip)
                        #     continue
                        if any([p for p, v in val.get('tcp', {}).items() if v.get('state') == 'open']):
                            online_ip.append(ip)
                else:
                    online_ip.extend([key for key, val in result['scan'].items() if
                                      val.get('status', {}).get('state') == 'up'] if result else [])

        except Exception as err:
            logger.error('主机探活执行失败: {}'.format(err))

        return online_ip

    def host_discover_scan(self, host_scan_module: str, targets: List[str], host_scanner_type: int=1,
                           host_scanner_ports: list=TOP_30_TCP_PORTS) -> List[str]:
        """
        主机存活扫描
        :param targets: 待扫描目标
        :param host_scan_module:
        :return:
        """
        # 使用nmap扫描
        active_hosts = self.__host_discover_with_nmap(host_scan_module, ' '.join(targets),
                                                      host_scanner_type=host_scanner_type,
                                                      host_scanner_ports=host_scanner_ports)
        return active_hosts

    def scan_os(self, targets: List[str]) -> Dict[str, Any]:
        """
        nmap os识别
        :param targets:
        :return:
        """
        ret = {}
        try:
            nm = nmap.PortScanner()
            result = nm.scan(hosts=' '.join(targets), arguments='-O -F -Pn -n', sudo=self.sudo)
        except Exception as err:
            logger.error('操作系统识别失败: {}'.format(err))
            return ret
        if result is not None:
            for key, val in result['scan'].items():
                if 'osmatch' not in val:
                    continue
                try:
                    os_family = val['osmatch'][0]['osclass'][0]['osfamily']
                except Exception as err:
                    os_family = 'Unknown'
                    logger.error('未识别到操作系统: {}:{}'.format(key, err))
                if os_family in ['Linux', 'Windows', 'Mac OS X', 'Unknown']:
                    ret[key] = os_family
        return ret

    def scan_port(self, targets: List[str], ports: str, method: str, exclude_ports: str, scan_rate: int,
                  host_timeout: int) -> List[dict]:
        """
        端口扫描
        :param targets: 待扫描的目标
        :param ports:   待扫描的端口 80,443,83-87
        :param method:  扫描模式  当扫描模式使用syn时,端口扫描无法走代理
        :param exclude_ports 排除的端口
        :param scan_rate 最小发包速率 慢200 标准500 快速1000 自定义
        :param host_timeout 单个主机超时时间 单位s
        :return:
        """
        ret = []
        result = None
        try:
            arguments = self.methods[method]
            # 最小发包率为最大发包率的一半
            min_rate = scan_rate / 2
            arguments = arguments.format(int(min_rate), scan_rate, host_timeout)
            if exclude_ports:
                arguments += ' --exclude-ports {}'.format(exclude_ports)
            scan_ports = ''
            if 'T:' in ports or 'U:' in ports:
                scan_ports = ports
            # 处理自定义 tcp udp 端口
            try:
                new_port = json.loads(ports)
                if isinstance(new_port, dict):
                    tcp_ports = new_port.get('TCP')
                    udp_ports = new_port.get('UDP')
                    if tcp_ports:
                        scan_ports += 'T:' + tcp_ports.strip()
                    if udp_ports:
                        scan_ports += ',U:' + udp_ports.strip()
                    scan_ports = scan_ports.strip(',')
            except Exception as err:
                logger.warning(err)
            if 'T:' not in scan_ports and 'U:' not in scan_ports:
                # 不是json数据直接视为tcp端口
                scan_ports = 'T:' + ports
            nm = nmap.PortScanner()
            logger.info("端口扫描参数ports: {}, arguments: {}".format(scan_ports, arguments))
            result = nm.scan(hosts=' '.join(targets), ports=scan_ports, arguments=arguments, sudo=self.sudo,
                             proxychains=self._get_proxychains())

        except Exception as err:
            logger.error('端口扫描失败: {}'.format(err))
            logger.exception(err)
        if result is not None:
            for key, val in result['scan'].items():
                if 'tcp' not in val and 'udp' not in val:
                    continue
                for protocol in ['tcp', 'udp']:
                    ports = val.get(protocol, {})
                    for port, data in ports.items():
                        if data.get('state', '') != 'open':
                            continue
                        ret.append({
                            'ip': key,
                            'port': str(port),
                            'protocol': protocol
                        })
        return ret

    @staticmethod
    def __load_service_scan_library():
        """
        加载服务识别golang库
        :return:
        """
        try:
            sys_str = platform.system()
            if sys_str == 'Darwin':
                lib = cdll.LoadLibrary(os.path.join(SCANNER_DIR, 'lib', 'service_scan_darwin.so'))
            elif sys_str == 'Windows':
                lib = cdll.LoadLibrary(os.path.join(SCANNER_DIR, 'lib', 'service_scan.dll'))
            else:
                lib = cdll.LoadLibrary(os.path.join(SCANNER_DIR, 'lib', 'service_scan.so'))
            lib.scan.restype = c_char_p
        except Exception as e:
            logger.error('加载scan库失败: {}'.format(e))
            return None
        return lib

    def scan_service_with_golang(self, targets: str) -> List[dict]:
        """
        使用golang c库进行服务识别
        :param targets:  待扫描的目标json字符串  如 :[{"ip": "129.226.181.188", "port": "22", "protocol": "tcp"}, {"ip": "129.226.181.188", "port": "80", "protocol": "tcp"}, {"ip": "129.226.181.188", "port": "443", "protocol": "tcp"}]
        :return: 返回数据示例:[{'ip': '129.226.181.188', 'port': '22', 'protocol': 'tcp', 'name': 'ssh', 'product': 'OpenSSH', 'version': '7.6p1 Ubuntu 4ubuntu0.3', 'state': 'open'}, {'ip': '129.226.181.188', 'port': '80', 'protocol': 'tcp', 'name': 'http', 'product': 'nginx', 'version': '', 'state': 'open'}, {'ip': '129.226.181.188', 'port': '443', 'protocol': 'tcp', 'name': 'ssl/http', 'product': 'nginx', 'version': '', 'state': 'open'}]
        """
        ret = []
        dll = self.__load_service_scan_library()
        print(dll)
        if dll is None:
            return []
        try:
            """
            参数1: 待扫描的目标
            参数2: 服务识别dll需要用到的指纹数据库路径
            参数3: 代理地址
            """
            result = dll.scan(c_char_p(targets.encode()),
                              c_char_p(os.path.join(SCANNER_DIR, 'db', 'db.sqlite3').encode()),
                              c_char_p(self.proxy.encode()))
            res = json.loads(result.decode('utf-8'))
            if res:
                ret = res
        except Exception as err:
            logger.error('执行失败: {}'.format(err))
        return ret


def port_scan(**kwargs):
    """
    端口扫描job
    kwargs :
        proxy: 代理地址
        targets: 存活的主机 [{'ip':'8.8.8.8', 'asset': 1, "asset_parent": None}, {'ip':'129.226.181.188', 'asset': 2}]
        scan_ports: 待扫描的端口 80,443,22-25
        port_scan_module: 扫描模式  TCP-SYN、TCP-CONNECT、...
        exclude_ports: 排除的端口
    """
    ret = []
    asset_id_map = {}  # 保存asset_id和ip的对应关系
    asset_parent_map = {}
    targets = kwargs.get('targets', [])
    exclude_ports = kwargs.get('exclude_ports', '')
    scan_ports = kwargs.get('scan_ports', '')
    port_scan_rate = kwargs.get('port_scan_rate', NMAP_PORT_DEFAULT_SCAN_RATE)
    proxy = kwargs.get('proxy', '')
    host_timeout = kwargs.get('host_timeout', PORT_SCAN_HOST_TIMEOUT)
    scan_targets = []
    for target in targets:
        asset_id_map[target.get('ip', '')] = target.get('asset')
        asset_parent_map[target.get('ip', '')] = target.get('asset_parent')
        scan_targets.append(target.get('ip', ''))
    open_port_list = []
    s = AssetScan(proxy=proxy)
    if (scan_ports == "T:1-65535" or scan_ports == "1-65535") and proxy != "":
        # 多进程并发提升proxychains性能
        executor = ThreadPoolExecutor(max_workers=10)
        futures = []
        for i in range(1, 65535, 1000):
            start = i
            end = i + 999
            if end > 65535:
                end = 65535
            job = executor.submit(s.scan_port, targets=scan_targets, ports='T:{}-{}'.format(start, end),
                                  method=kwargs.get('port_scan_module', ''), exclude_ports=exclude_ports,
                                  scan_rate=port_scan_rate, host_timeout=host_timeout)
            futures.append(job)

        for future in as_completed(futures):
            res = future.result()
            if res:
                open_port_list.extend(res)
    else:
        open_port_list = s.scan_port(targets=scan_targets, ports=kwargs.get('scan_ports', ''),
                                     method=kwargs.get('port_scan_module', ''), exclude_ports=exclude_ports,
                                     scan_rate=port_scan_rate, host_timeout=host_timeout)
    # 扫出端口的资产
    open_port_asset = []
    for port in open_port_list:
        tmp = {
            'ip': port.get('ip'),
            'port': port.get('port'),
            'protocol': port.get('protocol'),
            'asset': asset_id_map[port.get('ip')],
            "asset_parent": asset_parent_map[port.get('ip')],
        }
        ret.append(tmp)
        open_port_asset.append(asset_id_map[port.get('ip')])
    return ret


def service_scan(**kwargs):
    """
    服务识别job
    kwargs :
        proxy: 代理地址
        targets: 端口扫描结果传递  格式: [
            {'ip': '8.8.8.8', 'port': '443', 'protocol': 'tcp', 'asset': 1},
            {'ip': '129.226.181.188', 'port': '22', 'protocol':'tcp', 'asset':2}, 
            {'ip': '129.226.181.188', 'port': '80', 'protocol': 'tcp', 'asset':2}
        ]
    """
    ret = []
    asset_id_map = {}  # 保存asset_id和ip的对应关系
    asset_parent_map = {}
    targets = kwargs.get('targets', [])

    for target in targets:
        asset_id_map[target.get('ip', '')] = target.get('asset')
        asset_parent_map[target.get('ip', '')] = target.get('asset_parent')

    s = AssetScan(proxy=kwargs.get('proxy', ''))
    for t in targets:
        del t['asset']
    service_result_list = s.scan_service_with_golang(targets=json.dumps(targets))
    for row in service_result_list:
        tmp = {
            'asset': asset_id_map[row.get('ip')],
            'asset_parent': asset_parent_map[row.get('ip')],
            'ip': row.get('ip', ''),
            "protocol": row.get('protocol'),
            "port": row.get('port'),
            "name": row.get('name'),
            "product": row.get('product') or 'unknown',
            "version": row.get('version') or 'unknown',
            "state": row.get('state')
        }
        ret.append(tmp)
    return ret


def os_scan(**kwargs):
    """
    os识别job
    kwargs :
        targets: 存活的主机 [{'ip':'8.8.8.8', 'asset': 1}, {'ip':'129.226.181.188', 'asset': 2}]
    """
    targets = kwargs.get('targets', [])
    scan_targets = [t.get('ip', '') for t in targets]

    s = AssetScan(proxy=kwargs.get('proxy', ''))
    os_map = s.scan_os(scan_targets)
    for target in targets:
        target["os"] = os_map.get(target.get('ip', ''), 'Unknown')

    return targets


if __name__ == '__main__':
    pass