1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
| # coding=utf-8 import requests import time import json import base64 from StringIO import StringIO from PIL import Image from mail_util import MailReceiver import lz_api import poplib import logging import logging.config import csv import utils import common_header import sys reload(sys) sys.setdefaultencoding('utf-8')
# apple 会话session session = None # 请求超时时间,秒 default_timeout = 30 # apple返回的response cur_rsp = None # 当前注册账号的数据 cur_data = None # 验证码 captcha_result = None verify_code = None verification_id = None # 尝试获取邮件次数 email_try_times = 20 # 获取邮件延时,秒 fetch_email_delay = 5 # 邮件发送时间 string 2016-07-14 20:20:20 send_email_time = None
# logging logging.config.fileConfig("file/logging.conf") log = logging.getLogger("apple")
# 验证码图片 captcha_path = "file/apple_captcha.jpg" # 待注册账号 account_file = "file/apple_account.csv" # 注册结果 result_file = "file/result.csv"
class AppleRequest(object): def get_url(self): pass
def get_payload(self): pass
def get_header(self): pass
def request(self): pass
def process(self): pass
def process_status_error(self): pass
def do_request(self): log.info("request: %s", self.get_url()) global cur_rsp cur_rsp = self.request() if not cur_rsp.ok: self.process_status_error() raise Exception("status code:" + str(cur_rsp.status_code) + ", info:" + cur_rsp.text) self.process()
class RegisterPageRequest(AppleRequest): """step1: 打开注册页面, http get""" def get_url(self): return "https://appleid.apple.com/#!&page=create"
def get_header(self): return common_header.get_home_header()
def request(self): return session.get(self.get_url(), headers=self.get_header(), timeout=default_timeout)
class GetAccountRequest(AppleRequest): """step2: 获取注册资料信息 ajax get""" def get_url(self): return "https://appleid.apple.com/account"
def get_header(self): return common_header.combine_request_header(cur_rsp)
def request(self): return session.get(self.get_url(), headers=self.get_header(), timeout=default_timeout)
class GetCaptchaRequest(AppleRequest): """step3: 获取验证码图片,解析验证码 ajax post""" def get_url(self): return "https://appleid.apple.com/captcha"
def get_payload(self): return {"type": "IMAGE"}
def get_header(self): return common_header.combine_request_header(cur_rsp)
def request(self): return session.post(self.get_url(), data=json.dumps(self.get_payload()), headers=self.get_header(), timeout=default_timeout)
def process(self): self.save_captcha() self.parse_captcha()
def save_captcha(self): captcha_json = cur_rsp.json() image_data = captcha_json["payload"]["content"] img_io = StringIO() base64.decode(StringIO(image_data), img_io) captcha_img = Image.open(img_io) captcha_img.save(captcha_path) img_io.close() log.info("save captcha image done!")
def parse_captcha(self): log.info("parsing captcha image...") result = lz_api.postCaptcha(captcha_path) log.info("captcha result: %s", result) global captcha_result captcha_result = json.loads(result) if not captcha_result["result"]: raise Exception("lz parse captcha failed." + result)
class AnswerCaptchaRequest(AppleRequest): """step4: 校验图片验证码,拉取邮箱验证码 ajax post""" def get_url(self): return "https://appleid.apple.com/account/verification"
def get_payload(self): captcha_json = cur_rsp.json() return { "account": { "name": cur_data["name"], "person": { "name": { "firstName": cur_data["firstName"], "lastName": cur_data["lastName"] } } }, "captcha": { "answer": captcha_result["data"]["val"], "id": captcha_json["id"], "token": captcha_json["token"] } }
def get_header(self): return common_header.combine_request_header(cur_rsp)
def request(self): global send_email_time send_email_time = utils.get_cur_time_str() return session.post(self.get_url(), data=json.dumps(self.get_payload()), headers=self.get_header(), timeout=default_timeout)
def process_status_error(self): error_info = cur_rsp.json() for item in error_info["validationErrors"]: if item["code"] == "captchaAnswer.Invalid": log.info("post captcha error to lz") lz_api.postError(captcha_result["data"]["id"]) break
def process(self): global verification_id verification_id = cur_rsp.json()["verificationId"] log.info("the captcha is correct, try to get the email code") self.get_mail_code()
def get_mail_code(self): config = { "account": cur_data["name"], "pwd": cur_data["emailPwd"], "host": "pop.139.com", "port": 110 } global verify_code global email_try_times global fetch_email_delay global send_email_time try_times = email_try_times email_code = None log.info("waiting %d seconds for fetch the email code", fetch_email_delay) while try_times > 0: try: time.sleep(fetch_email_delay) log.info("try...%d", try_times) receiver = MailReceiver(config).initialize() count = receiver.get_mail_count() if count > 0: mail = receiver.get_mail(count) content = ",".join(mail) is_apple_mail = utils.is_apple_mail(content) if is_apple_mail: email_code = utils.find_apple_code(content, send_email_time) if email_code is not None: verify_code = email_code log.info("got it! %s", verify_code) break receiver.quit() except poplib.error_proto, e: log.error(str(e).decode("gb2312")) except Exception, e: log.error(e) finally: try_times -= 1
if email_code is None: raise Exception("can not find email code")
class EmailVerificationRequest(AppleRequest): """step5: 校验邮箱验证码 ajax put""" def get_url(self): return "https://appleid.apple.com/account/verification/"
def get_payload(self): return { "name": cur_data["name"], "verificationInfo": { "id": verification_id, "answer": verify_code } }
def get_header(self): return common_header.combine_request_header(cur_rsp)
def request(self): return session.put(self.get_url(), data=json.dumps(self.get_payload()), headers=self.get_header(), timeout=default_timeout)
class RegisterRequest(AppleRequest): """step6: 校验都成功后发送用户注册的信息 ajax post""" def get_url(self): return "https://appleid.apple.com/account"
def get_payload(self): return { "account": { "name": cur_data["name"], "password": cur_data["applePwd"], "person": { "birthday": cur_data["birthday"], "name": { "firstName": cur_data["firstName"], "lastName": cur_data["lastName"] }, "primaryAddress": { "country": "CHN" } }, "preferences": { "preferredLanguage": "zh_CN", "marketingPreferences": { "appleNews": False, "appleUpdates": True, "iTunesUpdates": True } }, "security": { "questions": [ { "answer": cur_data["question1Answer"], "id": cur_data["question1Id"], "question": cur_data["question1"] }, { "answer": cur_data["question2Answer"], "id": cur_data["question2Id"], "question": cur_data["question2"] }, { "answer": cur_data["question3Answer"], "id": cur_data["question3Id"], "question": cur_data["question3"] } ] }, "verificationInfo": { "id": verification_id, "answer": verify_code } } }
def get_header(self): return common_header.combine_request_header(cur_rsp)
def request(self): return session.post(self.get_url(), data=json.dumps(self.get_payload()), headers=self.get_header(), timeout=default_timeout)
def process(self): log.info("successfully!")
handlers = [ RegisterPageRequest(), GetAccountRequest(), GetCaptchaRequest(), AnswerCaptchaRequest(), EmailVerificationRequest(), RegisterRequest() ]
log.info("========================Start:%s======================", utils.get_cur_time_str()) with open(account_file, "r") as r, open(result_file, "ab") as w: reader = csv.reader(r) writer = csv.writer(w) line = 0 for row in reader: row = [cell.decode("gbk") for cell in row] line += 1 if line == 1: row.insert(0, "errorInfo") row.insert(0, "status") writer.writerow(row) continue
try: cur_data = utils.wrap_data(row) log.info("------------seq: %d, account: %s--------------", line, cur_data["name"])
session = requests.session() for handler in handlers: handler.do_request() row.insert(0, "") row.insert(0, "ok")
# switch vpn time.sleep(3) except Exception, ex: row.insert(0, ex) row.insert(0, "error") log.exception("got an exception") finally: log.info("-----------------------------------------------------------") writer.writerow(row) session.close() log.info("========================End:%s======================", utils.get_cur_time_str())
|