|
@@ -0,0 +1,1576 @@
|
|
|
+/**
|
|
|
+ * Created by ben on 2017/10/24.
|
|
|
+ */
|
|
|
+const php = require('phpjs');
|
|
|
+let cheerio = require('cheerio');
|
|
|
+const request = require('request-promise');
|
|
|
+const Tool = require('../framework/lib/Tool.js');
|
|
|
+const iconv = require('iconv-lite');
|
|
|
+const Http = require("http");
|
|
|
+const zlib = require('zlib');
|
|
|
+const Browser = require('../models/Browser');
|
|
|
+const AmcMsg = require('../models/AmcMsg');
|
|
|
+
|
|
|
+// 回调里面可能回用到
|
|
|
+const JTool = require('./JTool');
|
|
|
+const OujRedis = require('../framework/lib/OujRedis');
|
|
|
+const dwHttp = require('../framework/lib/dwHttp.js');
|
|
|
+
|
|
|
+const URL = require('url');
|
|
|
+const puppeteer = require('puppeteer');
|
|
|
+const TableHelper = require('../framework/lib/TableHelper.js');
|
|
|
+
|
|
|
+const MapData = require('./MapData.js');
|
|
|
+const ProxyPool = require('./ProxyPool');
|
|
|
+let pagePool = {};
|
|
|
+
|
|
|
+let uaList = [
|
|
|
+ 'Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1; 125LA; .NET CLR 2.0.50727; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022)',
|
|
|
+ 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)',
|
|
|
+ 'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.108 Safari/537.36 2345Explorer/8.8.3.16721',
|
|
|
+ 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',
|
|
|
+ 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 BIDUBrowser/8.7 Safari/537.36',
|
|
|
+ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063',
|
|
|
+ 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',
|
|
|
+ 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36',
|
|
|
+ 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.3427.400 QQBrowser/9.6.12513.400',
|
|
|
+ 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',
|
|
|
+ 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36',
|
|
|
+ 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:38.0) Gecko/20100101 Firefox/38.0',
|
|
|
+ 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.3) Gecko/2008092416 Firefox/3.0.3',
|
|
|
+];
|
|
|
+
|
|
|
+class Spider {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 基本类,提供增删改查
|
|
|
+ * @param {string} task 表名
|
|
|
+ */
|
|
|
+ constructor(task) {
|
|
|
+ this.task = task;
|
|
|
+ // this.dbKey = dbKey || 'default';
|
|
|
+ // this.objMySql = new MySql(this.dbKey);
|
|
|
+
|
|
|
+ this.STATE_EXECING = 0;
|
|
|
+ this.STATE_SUCC = 1;
|
|
|
+ this.STATE_ERROR = 2;
|
|
|
+ this.STATE_TIMEOUT = 3;
|
|
|
+ this.STATE_PART_SUCC = 4;
|
|
|
+ this.STATE_PROXY_ERROR = 5;
|
|
|
+
|
|
|
+ this.INSERT_TYPE_ONLY_INSERT = 'only_insert';
|
|
|
+ this.INSERT_TYPE_ONLY_UPDATE = 'only_update';
|
|
|
+ this.INSERT_TYPE_UPDATE = 'update';
|
|
|
+
|
|
|
+ this.state = this.STATE_SUCC;
|
|
|
+ this.skip = false;
|
|
|
+ this.http_code = 0;
|
|
|
+ Tool.startRecordLog();
|
|
|
+ }
|
|
|
+
|
|
|
+ async run(preview) {
|
|
|
+ const objTask = new TableHelper('task', 'crawl');
|
|
|
+ const objRule = new TableHelper('rule', 'crawl');
|
|
|
+ this.rule = await objRule.getRow({'rule_id' : this.task['rule_id']});
|
|
|
+ if (!this.rule) {
|
|
|
+ await objTask.updateObject({'enable' : 0}, {'task_id' : this.task.task_id});
|
|
|
+ Tool.error(`error task have no rule. task_id:${this.task.task_id}, rule_id:${this.task.rule_id}`);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ await this._mergeRule();
|
|
|
+ await this._delayRun();
|
|
|
+
|
|
|
+ let startTime = (new Date).getTime();
|
|
|
+
|
|
|
+ let result = {};
|
|
|
+ if (!preview) {
|
|
|
+ result = await this._logStart();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 找这个任务的规则
|
|
|
+ const objItem = new TableHelper('item', 'crawl');
|
|
|
+ let items = await objItem.getAll({'rule_id' : this.task.rule_id, 'enable' : 1});
|
|
|
+ await this._mergeItems(items);
|
|
|
+
|
|
|
+ let content = '';
|
|
|
+ let data = {};
|
|
|
+
|
|
|
+ try {
|
|
|
+ Tool.log(`爬虫:${this.task.url}`);
|
|
|
+ Tool.log(`请求模式:${this.rule.request_mode},更新模式:${this.rule.update_mode}`);
|
|
|
+ JTool.initUrl(this.task.url);
|
|
|
+
|
|
|
+ //this.rule.request_mode = 'headless';
|
|
|
+ if (this.rule.request_mode === 'headless') {
|
|
|
+ content = await this._headless();
|
|
|
+ } else {
|
|
|
+ content = await this._request();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // let objRedis = OujRedis.init('logic');
|
|
|
+ // content = await objRedis.get('globals:url_map:http://14.17.108.216:9998/previewRule?rule_id=steam:game_data:steamdb&url=');
|
|
|
+ Tool.log('http code:' + this.http_code);
|
|
|
+ if (preview) {
|
|
|
+ return content;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.skip) {
|
|
|
+ Tool.log('预处理返回了false,当作成功,并跳过处理');
|
|
|
+ this.state = this.STATE_SUCC;
|
|
|
+ this.http_code = 200;
|
|
|
+ if (this.rule.request_mode === 'headless') {
|
|
|
+ await this.scoreBrowser(this.http_code);
|
|
|
+ }
|
|
|
+ } else if (!content || content.length <= this.rule.min_length) {
|
|
|
+ // 代理出问题,当作超时处理
|
|
|
+ Tool.log('内容过短,代理出问题');
|
|
|
+ if (this.rule.request_mode === 'headless') {
|
|
|
+ await this.scoreBrowser(0);
|
|
|
+ }
|
|
|
+ this.state = this.STATE_TIMEOUT;
|
|
|
+ } else {
|
|
|
+ if (this.rule.request_mode === 'headless') {
|
|
|
+ await this.scoreBrowser(200);
|
|
|
+ }
|
|
|
+
|
|
|
+ Tool.log('开始分析页面');
|
|
|
+ let $ = null;
|
|
|
+ let $el = null;
|
|
|
+ if (this.rule['data_type'] === 'html') {
|
|
|
+ $ = cheerio.load(content, { decodeEntities: false });
|
|
|
+ } else if (this.rule['data_type'] === 'json') {
|
|
|
+ $el = JSON.parse(content);
|
|
|
+ }
|
|
|
+
|
|
|
+ JTool.initJquery($);
|
|
|
+ for (let item of items) {
|
|
|
+ try {
|
|
|
+ let ret = this._handle(item, $, $el);
|
|
|
+ if (!ret) {
|
|
|
+ if (item.require) {
|
|
|
+ this.state = this.STATE_ERROR;
|
|
|
+ Tool.error('lack of required field:' + item.field_name);
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ Tool.log('skip not found field:' + item.field_name);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let {value, task_key} = ret;
|
|
|
+ data[item.field_name] = value;
|
|
|
+ if (item.next_rule_id) {
|
|
|
+ await this._addNewTask(item, value, task_key);
|
|
|
+ }
|
|
|
+ } catch (ex) {
|
|
|
+ this.state = this.STATE_PART_SUCC;
|
|
|
+ let errorMsg = `error item:${item.field_name}\nselector:${item.selector}\nfetch_value:\n${item.fetch_value}`;
|
|
|
+ if (item.new_task_key) {
|
|
|
+ errorMsg += '\n\nnew_task_key:\n' + item.new_task_key;
|
|
|
+ }
|
|
|
+ Tool.log(errorMsg);
|
|
|
+ Tool.error(ex.stack);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (item.require && !data[item.field_name]) {
|
|
|
+ this.state = this.STATE_ERROR;
|
|
|
+ Tool.error(`lack of require item:${item.field_name}\nselector:${item.selector}\nfetch_value:\n${item.fetch_value}`);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.state !== this.STATE_ERROR) {
|
|
|
+ await this._insertData(result.insertId, items, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ await this._logEnd(result.insertId, content, data, startTime);
|
|
|
+ } catch (ex) {
|
|
|
+ if (this.rule.request_mode === 'headless') {
|
|
|
+ await this.closeBrowser();
|
|
|
+ }
|
|
|
+ Tool.err(ex.message);
|
|
|
+ Tool.err(ex.stack);
|
|
|
+
|
|
|
+ if (preview) {
|
|
|
+ return ex.stack;
|
|
|
+ }
|
|
|
+
|
|
|
+ this._preprocess(ex.message);
|
|
|
+ // 如果需要跳过,则跳过
|
|
|
+ let flag2 = false;
|
|
|
+ if (!this.skip) {
|
|
|
+ // 代理出问题的情况
|
|
|
+ let flag = ex.message.indexOf("Error: ") >= 0;
|
|
|
+ flag = flag || ex.message.indexOf("net::") >= 0;
|
|
|
+
|
|
|
+ // 代理访问慢的情况
|
|
|
+ flag2 = ex.message.indexOf('Navigation Timeout Exceeded') >= 0;
|
|
|
+ flag2 = flag2 || ex.message.indexOf("Most likely the page has been closed") >= 0;
|
|
|
+ if (flag) {
|
|
|
+ this.state = this.STATE_PROXY_ERROR;
|
|
|
+ } else if (flag2) {
|
|
|
+ this.state = this.STATE_TIMEOUT;
|
|
|
+ } else {
|
|
|
+ this.state = this.STATE_ERROR;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!this.http_code) {
|
|
|
+ this.http_code = parseInt(ex.message);
|
|
|
+ Tool.err('http_code:' + this.http_code);
|
|
|
+ }
|
|
|
+
|
|
|
+ let error_codes = [403, 429, 502, 503, 504];
|
|
|
+ if (error_codes.indexOf(this.http_code) >= 0) {
|
|
|
+ this.state = this.STATE_PROXY_ERROR;
|
|
|
+ // 请求太频繁了
|
|
|
+ if (this.http_code === 429 || this.http_code === 502) {
|
|
|
+ let next_crawl_time = php.time() + 300 * php.rand(1, 6);
|
|
|
+ this._setNextTime(next_crawl_time);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ await this._logEnd(result.insertId, content, data, startTime);
|
|
|
+
|
|
|
+ if (flag2) {
|
|
|
+ // 异常情况,要重启进程
|
|
|
+
|
|
|
+ process.exit(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return content;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _mergeRule() {
|
|
|
+ // 继承父规则的数据
|
|
|
+ if (this.rule['parent_rule_id']) {
|
|
|
+ const objRule = new TableHelper('rule', 'crawl');
|
|
|
+ let parent_rule = await objRule.getRow({'rule_id': this.rule['parent_rule_id']});
|
|
|
+ Tool.log('old rule:' + JSON.stringify(this.rule));
|
|
|
+ for (let key in parent_rule) {
|
|
|
+ // enable 不继承
|
|
|
+ if (key === 'enable') continue;
|
|
|
+ // 填充空白数据
|
|
|
+ let flag = !this.rule[key] || this.rule[key] === 'undefined';
|
|
|
+ if (flag && parent_rule[key]) {
|
|
|
+ this.rule[key] = parent_rule[key];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Tool.log('new rule:' + JSON.stringify(this.rule));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _mergeItems(items) {
|
|
|
+ // 继承父规则的数据
|
|
|
+ if (this.rule['parent_rule_id']) {
|
|
|
+ const objItem = new TableHelper('item', 'crawl');
|
|
|
+ let parent_items = await objItem.getAll({'rule_id' : this.rule['parent_rule_id'], 'enable' : 1});
|
|
|
+ let items_map = {};
|
|
|
+ for (let item of items) {
|
|
|
+ items_map[item['field_name']] = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 合并规则详情
|
|
|
+ for (let parent_item of parent_items) {
|
|
|
+ if (!items_map[parent_item['field_name']]) {
|
|
|
+ // 要修改rule_id
|
|
|
+ parent_item.rule_id = this.rule.rule_id;
|
|
|
+ items.push(parent_item);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _delayRun() {
|
|
|
+ // 先延后,怕并行运行
|
|
|
+ let next_crawl_time = php.time() + 30;
|
|
|
+ await this._setNextTime(next_crawl_time);
|
|
|
+
|
|
|
+ let objTask = new TableHelper('task', 'crawl');
|
|
|
+ const objCrawlLog = await this._getLogObject();
|
|
|
+ let create_time = php.date('Y-m-d H:i:s', php.time() - 3600);
|
|
|
+ let _where = `create_time > '${create_time}'`;
|
|
|
+ let where = {'task_id' : this.task.task_id, 'state' : [this.STATE_ERROR, this.STATE_TIMEOUT]};
|
|
|
+ let errorNum = await objCrawlLog.getCount(where, {_where});
|
|
|
+
|
|
|
+ let max_exception_count = this.task.max_exception_count || this.rule.max_exception_count;
|
|
|
+ if (errorNum > max_exception_count - 1) {
|
|
|
+ // 大于最大异常数,要停止任务
|
|
|
+ let newData = {'enable' : 0, 'state' : 2};
|
|
|
+ await objTask.updateObject(newData, {'task_id' : this.task.task_id});
|
|
|
+ Tool.log("大于最大异常数,要停止任务");
|
|
|
+ } else if (errorNum > 0) {
|
|
|
+ // 延后重试
|
|
|
+ let next_crawl_time = php.time() + (errorNum + 1) * 30;
|
|
|
+ await this._setNextTime(next_crawl_time);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _request() {
|
|
|
+ await this._resetProxy();
|
|
|
+
|
|
|
+ let r = request;
|
|
|
+ if (this.proxy && this.rule.need_proxy) {
|
|
|
+ r = request.defaults({'proxy':`http://${this.proxy}`});
|
|
|
+ } else if (!this.proxy && this.rule.need_proxy) {
|
|
|
+ return '';
|
|
|
+ }
|
|
|
+
|
|
|
+ // let j = request.jar();
|
|
|
+ // this.task.url = 'http://ka.duowan.com';
|
|
|
+
|
|
|
+ // 先decode,再encode,可以把字符给encode,又不会引起多次encode
|
|
|
+ let url = encodeURI(decodeURI(this.task.url));
|
|
|
+ let response = await r({
|
|
|
+ url : url,
|
|
|
+ // jar: j,
|
|
|
+ headers : this.getHeaders(this.rule.header),
|
|
|
+ timeout: 60000,
|
|
|
+ gzip: true,
|
|
|
+ encoding: null,
|
|
|
+ resolveWithFullResponse: true,
|
|
|
+ rejectUnauthorized: false
|
|
|
+ });
|
|
|
+
|
|
|
+ this.http_code = response.statusCode;
|
|
|
+ let content = response.body.toString('utf8');
|
|
|
+ let $ = cheerio.load(content, { decodeEntities: false });
|
|
|
+
|
|
|
+ let head = $('head').html() || '';
|
|
|
+ let matches = head.match(/[;\s]charset=['"]?(\w+)['"]?/);
|
|
|
+ if (matches && matches[1].match(/gb/ig)) {
|
|
|
+ content = iconv.decode(response.body, 'gbk');
|
|
|
+ }
|
|
|
+
|
|
|
+ return this._preprocess(content);
|
|
|
+ }
|
|
|
+
|
|
|
+ async scoreBrowser(http_code = '') {
|
|
|
+ if (http_code == 200) {
|
|
|
+ Browser.incScore();
|
|
|
+ } else {
|
|
|
+ Browser.reduceScore();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async closeBrowser() {
|
|
|
+ await Browser.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ async _getPage() {
|
|
|
+ var url = this.task.url;
|
|
|
+ var p = URL.parse(this.task.url);
|
|
|
+ var taget_host = p.host;
|
|
|
+ var proxy = this.proxy;
|
|
|
+ if (!this.rule.need_proxy) {
|
|
|
+ proxy = false;
|
|
|
+ }
|
|
|
+ let browser = await Browser.init(proxy);
|
|
|
+
|
|
|
+ const page = await Browser.newPage();
|
|
|
+ const viewport = {
|
|
|
+ width : 1440,
|
|
|
+ height: 706
|
|
|
+ };
|
|
|
+ await page.setViewport(viewport);
|
|
|
+
|
|
|
+ return page;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _headlessCookie(page, headers) {
|
|
|
+ if (headers['Cookie']) {
|
|
|
+ let parts = headers['Cookie'].split(';');
|
|
|
+ let url = this.task.url;
|
|
|
+ let urlInfo = URL.parse(url);
|
|
|
+ for (let part of parts) {
|
|
|
+ part = part.trim();
|
|
|
+ if (!part) continue;
|
|
|
+ let index = part.indexOf('=');
|
|
|
+ if (index < 0) continue;
|
|
|
+ let name = part.substr(0, index);
|
|
|
+ let value = part.substr(index + 1);
|
|
|
+
|
|
|
+ let domain = urlInfo['host'];
|
|
|
+ let path = '/';
|
|
|
+ let expires = php.time() + 86400;
|
|
|
+ await page.setCookie({name, value, url, domain, path, expires});
|
|
|
+ }
|
|
|
+ delete headers['Cookie'];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _headless() {
|
|
|
+ // 设置头部
|
|
|
+ let headers = this.getHeaders(this.rule.header);
|
|
|
+ await this._resetProxy();
|
|
|
+ if (!this.proxy && this.rule.need_proxy) {
|
|
|
+ return '';
|
|
|
+ }
|
|
|
+
|
|
|
+ let page = await this._getPage();
|
|
|
+ await this._headlessCookie(page, headers);
|
|
|
+ page.setExtraHTTPHeaders(headers);
|
|
|
+
|
|
|
+ let cookies = await page.cookies(this.task.url);
|
|
|
+ Tool.log(cookies);
|
|
|
+
|
|
|
+
|
|
|
+ // 开始爬虫
|
|
|
+ if (!headers['User-Agent']) {
|
|
|
+ //因为在请求中有些api会针对 user-agent 做限制,所以动态做一下调整
|
|
|
+ let ua = uaList[Math.floor((Math.random()*uaList.length))];
|
|
|
+ page.setUserAgent(ua);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.rule.wait_request_url) {
|
|
|
+ page.on('request', request => {
|
|
|
+ var request_url = request.url();
|
|
|
+ if (request_url.indexOf(this.rule.wait_request_url) != -1) {
|
|
|
+ Tool.log('请求api链接:' + request.url());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ page.on('requestfinished', async request => {
|
|
|
+
|
|
|
+ if (request.url().indexOf(this.rule.wait_request_url) != -1) {
|
|
|
+ Tool.log('【success】请求api成功:' + this.rule.wait_request_url);
|
|
|
+ let api_response = await request.response().text();
|
|
|
+ Tool.log('请求api内容:' + api_response);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ page.on('requestfailed', async request => {
|
|
|
+ if (request.url().indexOf(this.rule.wait_request_url) != -1) {
|
|
|
+ Tool.log('【error】请求api失败:' + this.rule.wait_request_url);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ //开始打开页面
|
|
|
+ let response = await page.goto(this.task.url, {
|
|
|
+ waitUntil : 'domcontentloaded'
|
|
|
+
|
|
|
+ // waitUntil : 'load'
|
|
|
+ });
|
|
|
+
|
|
|
+ await this._autoScroll(page);
|
|
|
+
|
|
|
+ await this._waitRequired(page);
|
|
|
+
|
|
|
+ this.http_code = response.status();
|
|
|
+ let content = await page.content();
|
|
|
+
|
|
|
+ return this._preprocess(content, page);
|
|
|
+ }
|
|
|
+
|
|
|
+ //页面懒加载-滚动
|
|
|
+ async _autoScroll(page) {
|
|
|
+ return page.evaluate(() => {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ var scroll_count = 0;
|
|
|
+ var totalHeight = 0;
|
|
|
+ var distance = 100;
|
|
|
+ var timer = setInterval(() => {
|
|
|
+ var scrollHeight = document.body.scrollHeight;
|
|
|
+ window.scrollBy(0, distance);
|
|
|
+ totalHeight += distance;
|
|
|
+
|
|
|
+ if(totalHeight >= scrollHeight || scroll_count > 30){
|
|
|
+ clearInterval(timer);
|
|
|
+ resolve();
|
|
|
+ }
|
|
|
+ scroll_count++;
|
|
|
+ }, 100);
|
|
|
+ })
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ async _setNextTime(next_crawl_time) {
|
|
|
+ // next_crawl_time = 0;
|
|
|
+ const where = {'task_id': this.task['task_id']};
|
|
|
+ // 修改最后更新时间
|
|
|
+ let last_crawl_time = php.date('Y-m-d H:i:s');
|
|
|
+
|
|
|
+ const objTask = new TableHelper('task', 'crawl');
|
|
|
+ await objTask.updateObject({last_crawl_time, next_crawl_time}, where);
|
|
|
+ }
|
|
|
+
|
|
|
+ async _getLogObject() {
|
|
|
+ if (!this.objCrawlLog) {
|
|
|
+ // let tableName = 'crawl_log_' + php.date('Ymd');
|
|
|
+ // let objCrawlLog = new TableHelper(tableName, 'crawl');
|
|
|
+ // let flag = await objCrawlLog.checkTableExist(tableName);
|
|
|
+ // if (!flag) {
|
|
|
+ // let sql = `CREATE TABLE ${tableName} LIKE _crawl_log`;
|
|
|
+ // await objCrawlLog.objMySql.update(sql);
|
|
|
+ // }
|
|
|
+ // this.objCrawlLog = objCrawlLog;
|
|
|
+
|
|
|
+ this.objCrawlLog = new TableHelper('crawl_log', 'crawl');
|
|
|
+ }
|
|
|
+
|
|
|
+ return this.objCrawlLog;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _logStart() {
|
|
|
+ const objCrawlLog = await this._getLogObject();
|
|
|
+ let log = await objCrawlLog.getRow({"task_id" : this.task['task_id'], "state" : this.STATE_EXECING});
|
|
|
+ if (log) {
|
|
|
+ await objCrawlLog.updateObject({'state' : this.STATE_TIMEOUT}, {"log_id" : log.log_id});
|
|
|
+ }
|
|
|
+
|
|
|
+ let logData = {
|
|
|
+ 'task_id' : this.task['task_id'],
|
|
|
+ 'task_key' : this.task['task_key'],
|
|
|
+ 'rule_id' : this.task['rule_id'],
|
|
|
+ 'url' : this.task['url'],
|
|
|
+ 'state' : this.STATE_EXECING,
|
|
|
+ 'rule_name' : this.rule['rule_name'],
|
|
|
+ 'create_time' : php.date('Y-m-d H:i:s'),
|
|
|
+ };
|
|
|
+
|
|
|
+ return await objCrawlLog.addObject(logData);
|
|
|
+ }
|
|
|
+
|
|
|
+ async _logEnd(log_id, raw_content, data, startTime) {
|
|
|
+ if (this.state === this.STATE_EXECING) {
|
|
|
+ this.state = this.STATE_SUCC;
|
|
|
+ }
|
|
|
+
|
|
|
+ Tool.log(`log id: ${log_id}, this.state: ${this.state}`);
|
|
|
+ Tool.log('Raw Content:\n' + raw_content);
|
|
|
+
|
|
|
+ let exec_timespan = ((new Date).getTime() - startTime) / 1000;
|
|
|
+ this.reportProxy(exec_timespan);
|
|
|
+
|
|
|
+ let logData = {
|
|
|
+ proxy : this.proxy,
|
|
|
+ state : this.state,
|
|
|
+ content : JSON.stringify(data),
|
|
|
+ 'exec_end_time' : php.date('Y-m-d H:i:s'),
|
|
|
+ 'exec_timespan' : exec_timespan,
|
|
|
+ 'exec_log' : this.getLogs().substr(0, 500000)
|
|
|
+ };
|
|
|
+
|
|
|
+ if(log_id) {
|
|
|
+ const objCrawlLog = await this._getLogObject();
|
|
|
+ let where = {log_id};
|
|
|
+ await objCrawlLog.updateObject(logData, where);
|
|
|
+
|
|
|
+ //通用告警
|
|
|
+ var errMsgList = this.getLogs().substr(0, 500000).match(/【error】([^\n]+)\n/);
|
|
|
+ if (errMsgList) {
|
|
|
+ var errMsg = '';
|
|
|
+ if (errMsgList && errMsgList[1]) {
|
|
|
+ errMsg = errMsgList[1];
|
|
|
+ errMsg = errMsg.substr(0, 50);
|
|
|
+ var result = await this.recordAmcMsg(errMsg);
|
|
|
+ Tool.log(`【AMC_Msg】` + JSON.stringify(result));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 成功后,才修改下一阶段的时间
|
|
|
+ if (this.state === this.STATE_SUCC || this.state === this.STATE_PART_SUCC) {
|
|
|
+ let interval = this.task['interval'] || this.rule['interval'];
|
|
|
+ let next_crawl_time = php.time() + interval;
|
|
|
+ await this._setNextTime(next_crawl_time);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //通用告警
|
|
|
+ async recordAmcMsg(errMsg) {
|
|
|
+ if (errMsg !== '' && errMsg.indexOf('waiting for selector') !== -1) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //过滤掉 400,401,404
|
|
|
+ if (errMsg.match(/4[\d]{2}\s-/)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //过滤掉 5xx
|
|
|
+ if (errMsg.match(/5[\d]{2}\s-/)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (this.http_code == 400 || this.http_code == 401 || this.http_code == 404) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //过滤掉超时,代理错误,成功的状态
|
|
|
+ if (this.state == this.STATE_TIMEOUT || this.state == this.STATE_PROXY_ERROR || this.state == this.STATE_SUCC || this.state == this.STATE_EXECING || this.state == this.STATE_PART_SUCC) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ let objAmcMsg = new AmcMsg;
|
|
|
+
|
|
|
+ var msg_code = -1;
|
|
|
+ if (!msg_code) {
|
|
|
+ msg_code = -1;
|
|
|
+ }
|
|
|
+ var creator = this.rule.creator;
|
|
|
+ if (!creator) {
|
|
|
+ creator = 'spider';
|
|
|
+ }
|
|
|
+ var rule_id = this.rule.rule_id;
|
|
|
+ var rule_name = this.rule.rule_name;
|
|
|
+ var msg_content = `【${creator}】的《${rule_name}》(${rule_id})有异常: ${errMsg}`;
|
|
|
+ let ret = await objAmcMsg.recordMsg(msg_code, msg_content);
|
|
|
+ if (ret.length == 0) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ reportProxy(exec_timespan) {
|
|
|
+ let score = 0;
|
|
|
+ if (this.state === this.STATE_SUCC || this.state === this.STATE_PART_SUCC) {
|
|
|
+ if (exec_timespan < 3) {
|
|
|
+ score = 2;
|
|
|
+ } else if (exec_timespan < 8) {
|
|
|
+ score = 1;
|
|
|
+ } else if (exec_timespan > 30) {
|
|
|
+ score = -1;
|
|
|
+ } else if (exec_timespan > 50) {
|
|
|
+ score = -2;
|
|
|
+ }
|
|
|
+ } else if (this.state === this.STATE_PROXY_ERROR) {
|
|
|
+ score = -20;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (score && this.proxy) {
|
|
|
+ const objProxyPool = new ProxyPool();
|
|
|
+ let p = URL.parse(this.task.url);
|
|
|
+ Tool.log(`reportProxy(p.host:${p.host}, this.proxy:${this.proxy}, score:${score})`);
|
|
|
+ objProxyPool.reportProxy(p.host, this.proxy, score);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ getLogs() {
|
|
|
+ return Tool.stopRecordLog().join('\n');
|
|
|
+ }
|
|
|
+
|
|
|
+ getHeaders(header) {
|
|
|
+ Tool.log(header);
|
|
|
+
|
|
|
+ let headers = {};
|
|
|
+ if (header && header.trim()) {
|
|
|
+ let parts = header.trim().split("\n");
|
|
|
+ for (let part of parts) {
|
|
|
+ let index = part.indexOf(':');
|
|
|
+ let key = part.substr(0, index);
|
|
|
+ let val = part.substr(index + 1);
|
|
|
+ headers[key.trim()] = val.trim();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ headers['Accept'] = headers['Accept'] || 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8';
|
|
|
+ headers['Accept-Encoding'] = headers['Accept-Encoding'] || 'gzip, deflate';
|
|
|
+ headers['Accept-Language'] = headers['Accept-Language'] || 'en-US,en;q=0.8,zh-CN;q=0.6,zh;q=0.4';
|
|
|
+
|
|
|
+ let index = php.rand(0, uaList.length - 1);
|
|
|
+ let ua = uaList[index];
|
|
|
+ if (this.rule.request_mode != 'headless' || !this.rule.wait_request_url) {
|
|
|
+ headers['User-Agent'] = headers['User-Agent'] || ua;
|
|
|
+ }
|
|
|
+
|
|
|
+ return headers;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _resetProxy() {
|
|
|
+ let p = URL.parse(this.task.url);
|
|
|
+ const objProxyPool = new ProxyPool();
|
|
|
+ let proxyList = await objProxyPool.getXProxyBest(p.host);
|
|
|
+
|
|
|
+ this.proxy = '';
|
|
|
+ if (!php.empty(proxyList)) {
|
|
|
+ this.proxy = proxyList[php.rand(0, proxyList.length - 1)];
|
|
|
+ Tool.log('select proxy:' + this.proxy);
|
|
|
+ } else {
|
|
|
+ Tool.log('没有代理,停止爬虫');
|
|
|
+ this.state = this.STATE_PROXY_ERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _getArgs() {
|
|
|
+ let args = [
|
|
|
+ '--no-sandbox',
|
|
|
+ '--disable-setuid-sandbox',
|
|
|
+ // '--proxy-server=211.138.60.25:80'
|
|
|
+ ];
|
|
|
+
|
|
|
+ if (this.proxy && this.rule.need_proxy) {
|
|
|
+ args.push(`--proxy-server=${this.proxy}`);
|
|
|
+ }
|
|
|
+
|
|
|
+ return args;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _waitRequired(page) {
|
|
|
+ let waitFor = this.rule['wait_for'];
|
|
|
+ if (waitFor) {
|
|
|
+ Tool.log('等待页面准备好. waitFor:' + waitFor);
|
|
|
+ let waitForInt = parseInt(waitFor);
|
|
|
+ if (waitForInt) {
|
|
|
+ waitFor = waitForInt;
|
|
|
+ }
|
|
|
+ await page.waitFor(waitFor);
|
|
|
+ } else {
|
|
|
+ //如果没有等待条件 wait_for 取 item 内一条必填的选择器当做等待
|
|
|
+ const objItem = new TableHelper('item', 'crawl');
|
|
|
+ let require_item = await objItem.getRow({'rule_id' : this.task.rule_id, 'enable' : 1, 'require' : 1});
|
|
|
+ if (require_item) {
|
|
|
+ Tool.log(`【timeout】wait for item selector(item.field_name=${require_item.field_name}):`+ require_item.selector);
|
|
|
+ await page.waitForSelector(require_item.selector);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _preprocess(content, page) {
|
|
|
+ if (this.rule.data_type === 'json') {
|
|
|
+ content = content.trim();
|
|
|
+ let lastChar = php.substr(content, -1);
|
|
|
+ if (lastChar === ')') {
|
|
|
+ let pos = content.indexOf('(');
|
|
|
+ content = content.substr(pos + 1, content.length - pos - 2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let preprocess = this.rule.preprocess && this.rule.preprocess.trim();
|
|
|
+ if (preprocess) {
|
|
|
+ let func = php.create_function('$html, $, page, _task, JTool, Tool', preprocess);
|
|
|
+
|
|
|
+ let $ = null;
|
|
|
+ let $html = null;
|
|
|
+ if (this.rule.data_type === 'html') {
|
|
|
+ $ = cheerio.load(content, { decodeEntities: false });
|
|
|
+ JTool.initJquery($);
|
|
|
+
|
|
|
+ $html = $('html');
|
|
|
+ let flag = func($html, $, page, this.task, JTool, Tool);
|
|
|
+ if (flag === false) {
|
|
|
+ this.skip = true;
|
|
|
+ }
|
|
|
+ return $('<div></div>').html($html).html();
|
|
|
+ } else if (this.rule.data_type === 'json') {
|
|
|
+ $html = content;
|
|
|
+ return func($html, $, page, JTool, Tool);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return content;
|
|
|
+ }
|
|
|
+
|
|
|
+ _getElemOnce(key, val, selector) {
|
|
|
+ let sections = [];
|
|
|
+ let parts = selector.split(key);
|
|
|
+ if (parts.length === 1) return [selector];
|
|
|
+
|
|
|
+ for (let part of parts) {
|
|
|
+ if (part.trim()) {
|
|
|
+ sections.push(part);
|
|
|
+ sections.push(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (val[1] === false) {
|
|
|
+ sections.pop();
|
|
|
+ }
|
|
|
+
|
|
|
+ return sections;
|
|
|
+ }
|
|
|
+
|
|
|
+ _getElem($, selector) {
|
|
|
+ // 蛋疼,cheerio不支持:first, :last, :nth(xx), :eq(xx)的选择器,需要自己实现
|
|
|
+ let map = {
|
|
|
+ ':first' : ['eq', 0],
|
|
|
+ ':last' : ['eq', -1],
|
|
|
+ ':nth(' : ['eq', false],
|
|
|
+ ':eq(' : ['eq', false]
|
|
|
+ };
|
|
|
+
|
|
|
+ let sections = [selector];
|
|
|
+ for (let key in map) {
|
|
|
+ for (let i = 0; i < sections.length; i++) {
|
|
|
+ let tempSections = this._getElemOnce(key, map[key], sections[i]);
|
|
|
+ if (tempSections.length > 1) {
|
|
|
+ sections = sections.slice(0, i).concat(tempSections).concat(sections.splice(i + 1));
|
|
|
+ // i += tempSections.length - 1;
|
|
|
+ i--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ let $el = null;
|
|
|
+ for (let i = 0; i < sections.length; i++) {
|
|
|
+ let sl = sections[i];
|
|
|
+ if (!sl.trim()) continue;
|
|
|
+
|
|
|
+ let val = map[sl];
|
|
|
+ if (val) {
|
|
|
+ // 用函数代替选择器
|
|
|
+ if (val[1] === false) {
|
|
|
+ let sl2 = sections[i + 1];
|
|
|
+ let num = parseInt(sl2.trim());
|
|
|
+ sections[i + 1] = sl2.replace(/[\s]*[-\d]+[\s]*\)/, '');
|
|
|
+
|
|
|
+ $el = $el[val[0]](num);
|
|
|
+ } else {
|
|
|
+ $el = $el[val[0]](val[1]);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if ($el === null) {
|
|
|
+ $el = $(sl);
|
|
|
+ } else if (sl[0] === ' ') {
|
|
|
+ $el = $el.find(sl.trim());
|
|
|
+ } else {
|
|
|
+ $el = $el.filter(sl.trim());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return $el;
|
|
|
+ }
|
|
|
+
|
|
|
+ _handle(item, $, $el) {
|
|
|
+ if ($) {
|
|
|
+ // html模式才需要获取元素
|
|
|
+ $el = this._getElem($, item.selector);
|
|
|
+ if ($el.length === 0) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let value = this._fetchVal2($el, item, false, $);
|
|
|
+ let task_key = this._fetchVal2($el, item, true, $);
|
|
|
+
|
|
|
+ if (!task_key) {
|
|
|
+ task_key = value;
|
|
|
+ }
|
|
|
+
|
|
|
+ return {value, task_key};
|
|
|
+ }
|
|
|
+
|
|
|
+ _fetchVal2($el, item, iskey, $) {
|
|
|
+ // var key = iskey ? item.field_name + '-new_task_key' : item.field_name;
|
|
|
+ // var func = __crawPage[key];
|
|
|
+ let str = iskey ? item.new_task_key : item.fetch_value;
|
|
|
+ if (!str) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ let func = php.create_function('$el, $, _task, JTool, Tool', str);
|
|
|
+ if (!func) {
|
|
|
+ console.error('can not find:' + key);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (item.is_multi && this.rule.data_type === 'html') {
|
|
|
+ let value = [];
|
|
|
+ for (let i = 0; i < $el.length; i++) {
|
|
|
+ let val = func($($el[i]), $, this.task, JTool, Tool);
|
|
|
+ if (Array.isArray(val)) {
|
|
|
+ return val;
|
|
|
+ } else {
|
|
|
+ value[i] = val;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+ } else {
|
|
|
+ return func($el, $, this.task, JTool, Tool);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ _predoRuleId(item) {
|
|
|
+ if (!this.rule.parent_rule_id) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ let next_id = item.next_rule_id;
|
|
|
+ Tool.log('next_rule_id:' + next_id + ',rule_id:' + item.rule_id);
|
|
|
+ // 这里特殊处理,支持专区爬虫...
|
|
|
+ let matches = next_id.match(/ka:(\d+):/);
|
|
|
+ if (matches) {
|
|
|
+ let next_game_id = matches[1];
|
|
|
+ if (next_game_id === '0') {
|
|
|
+ matches = item.rule_id.match(/ka:(\d+):/);
|
|
|
+ if (matches && matches[1]) {
|
|
|
+ Tool.log('next_game_id:' + next_game_id + ' => game_id:' + matches[1]);
|
|
|
+ next_id = next_id.replace('ka:0:', `ka:${matches[1]}:`);
|
|
|
+ if (next_id.endsWith('_tpl')) {
|
|
|
+ next_id = next_id.substr(0, next_id.length - 4);
|
|
|
+ }
|
|
|
+ item.next_rule_id = next_id;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _addNewTask(item, value, task_key) {
|
|
|
+ let objTask = new TableHelper('task', 'crawl');
|
|
|
+ if (!Array.isArray(value)) {
|
|
|
+ value = [value];
|
|
|
+ task_key = [task_key];
|
|
|
+ }
|
|
|
+
|
|
|
+ // 预处理下一个规则id
|
|
|
+ this._predoRuleId(item);
|
|
|
+
|
|
|
+ let where = {
|
|
|
+ 'rule_id' : item.next_rule_id,
|
|
|
+ };
|
|
|
+
|
|
|
+ let demo_url = null;
|
|
|
+ let batchNum = 1000;
|
|
|
+ value = php.array_chunk(value, batchNum);
|
|
|
+ task_key = php.array_chunk(task_key, batchNum);
|
|
|
+ for (let batchIndex in value) {
|
|
|
+ let key = task_key[batchIndex];
|
|
|
+ where['task_key'] = key;
|
|
|
+
|
|
|
+ let task_keys = await objTask.getCol(where, {_field : 'task_key'});
|
|
|
+ let map = {};
|
|
|
+ for (let k of task_keys) {
|
|
|
+ map[k] = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ let datas = [];
|
|
|
+ let now = php.date('Y-m-d H:i:s');
|
|
|
+ for (let i in key) {
|
|
|
+ let task_key = key[i];
|
|
|
+ let url = value[batchIndex][i];
|
|
|
+ if (task_key && url && !map[task_key]) {
|
|
|
+ map[task_key] = 1; // 防止自身就有重复链接
|
|
|
+ datas.push({
|
|
|
+ parent_task_id : this.task.task_id,
|
|
|
+ rule_id : item.next_rule_id,
|
|
|
+ url : url,
|
|
|
+ task_key : task_key,
|
|
|
+ create_time : now,
|
|
|
+ update_time : now,
|
|
|
+ });
|
|
|
+
|
|
|
+ demo_url = demo_url || value[batchIndex][i];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (datas.length) {
|
|
|
+ await objTask.addObjectsIfNotExist(datas);
|
|
|
+ }
|
|
|
+
|
|
|
+ const objRule = new TableHelper('rule', 'crawl');
|
|
|
+ let where2 = {
|
|
|
+ 'rule_id' : item.next_rule_id,
|
|
|
+ };
|
|
|
+ let nextRule = await objRule.getRow(where2);
|
|
|
+ if (demo_url && !nextRule['demo_url']) {
|
|
|
+ objRule.updateObject({demo_url}, where2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ _formatData(data) {
|
|
|
+ for (let tableName in data) {
|
|
|
+ let arrLen = 0;
|
|
|
+ let hasArr = false;
|
|
|
+ for (let key in data[tableName]) {
|
|
|
+ let value = data[tableName][key];
|
|
|
+ if (Array.isArray(value)) {
|
|
|
+ arrLen = Math.max(arrLen, value.length);
|
|
|
+ hasArr = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (hasArr) {
|
|
|
+ let newArr = [];
|
|
|
+ for (let i = 0; i < arrLen; i++) {
|
|
|
+ let item = {};
|
|
|
+ let value = data[tableName];
|
|
|
+ for (let key in value) {
|
|
|
+ if (Array.isArray(value[key])) {
|
|
|
+ item[key] = value[key][i];
|
|
|
+ } else {
|
|
|
+ item[key] = value[key];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ newArr[i] = item;
|
|
|
+ }
|
|
|
+ data[tableName] = newArr;
|
|
|
+ } else {
|
|
|
+ // 统一转化为数组
|
|
|
+ data[tableName] = [data[tableName]];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return data;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _queryTables(rule_id) {
|
|
|
+ const objTable = new TableHelper('data_db', 'crawl');
|
|
|
+ let sql = 'SELECT data_db.db_id, db_name, table_name, pri_key, notice_url, rule_id, is_default, update_mode ' +
|
|
|
+ 'FROM data_db JOIN rule_db_conf ON data_db.db_id = rule_db_conf.db_id ' +
|
|
|
+ `WHERE rule_id = '${rule_id}'`;
|
|
|
+
|
|
|
+ return await objTable.objMySql.getAll(sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ async _getTables() {
|
|
|
+ let tables = await this._queryTables(this.rule.rule_id);
|
|
|
+ if (this.rule.parent_rule_id) {
|
|
|
+ let parentTables = await this._queryTables(this.rule.parent_rule_id);
|
|
|
+ let map = {};
|
|
|
+ for (let table of tables) {
|
|
|
+ let key = table.table_name + ':' + table.table_name;
|
|
|
+ map[key] = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (let table of parentTables) {
|
|
|
+ let key = table.table_name + ':' + table.table_name;
|
|
|
+ if (!map[key]) {
|
|
|
+ tables.push(table);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return tables;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _insertData(log_id, items, data) {
|
|
|
+ let tables = await this._getTables();
|
|
|
+ if (tables.length === 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ let defaultTable = tables[0].table_name;
|
|
|
+ for (let table of tables) {
|
|
|
+ if (table.is_default) {
|
|
|
+ defaultTable = table.table_name;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let onlyInsertData = {};
|
|
|
+ let onlyUpdateData = {};
|
|
|
+ let updateData = {};
|
|
|
+ // let data = {};
|
|
|
+ let items2 = {};
|
|
|
+ let saveasItems = {};
|
|
|
+ for (let item of items) {
|
|
|
+ let value = data[item.field_name];
|
|
|
+ if (value === null || value === undefined || item.next_rule_id){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断是否要翻译
|
|
|
+ if (item.map_key) {
|
|
|
+ let map = await MapData.getData(item.map_key, value);
|
|
|
+ if (Array.isArray(value)) {
|
|
|
+ for (let k in value) {
|
|
|
+ value[k] = map[value[k]];
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ value = map[value];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let table_name = defaultTable;
|
|
|
+ let field_names = item.field_name.split(',');
|
|
|
+
|
|
|
+ for (let field_name of field_names) {
|
|
|
+ field_name = field_name.trim();
|
|
|
+ let parts = field_name.split('.');
|
|
|
+ if (parts.length > 1) {
|
|
|
+ table_name = parts[0];
|
|
|
+ field_name = parts[1];
|
|
|
+ }
|
|
|
+
|
|
|
+ items2[table_name] = items2[table_name] || {};
|
|
|
+ if (!item.next_rule_id) {
|
|
|
+ if (item.insert_type === this.INSERT_TYPE_ONLY_INSERT) {
|
|
|
+ onlyInsertData[table_name] = onlyInsertData[table_name] || {};
|
|
|
+ onlyInsertData[table_name][field_name] = value;
|
|
|
+ } else if (item.insert_type === this.INSERT_TYPE_ONLY_UPDATE) {
|
|
|
+ onlyUpdateData[table_name] = onlyUpdateData[table_name] || {};
|
|
|
+ onlyUpdateData[table_name][field_name] = value;
|
|
|
+ } else if (item.insert_type === this.INSERT_TYPE_UPDATE) {
|
|
|
+ updateData[table_name] = updateData[table_name] || {};
|
|
|
+ updateData[table_name][field_name] = value;
|
|
|
+ } else {
|
|
|
+ Tool.error('unknown insert type.');
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ items2[table_name][field_name] = item;
|
|
|
+ if (item.save_as > 0) {
|
|
|
+ saveasItems[table_name] = saveasItems[table_name] || [];
|
|
|
+ saveasItems[table_name].push(field_name);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ this._formatData(onlyInsertData);
|
|
|
+ this._formatData(onlyUpdateData);
|
|
|
+ this._formatData(updateData);
|
|
|
+
|
|
|
+ for (let table of tables) {
|
|
|
+ let tableName = table.table_name;
|
|
|
+ let priKey = table.pri_key && table.pri_key.trim();
|
|
|
+ if (!priKey || php.empty(onlyInsertData[tableName]) && php.empty(onlyUpdateData[tableName]) && php.empty(updateData[tableName])) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let onlyInsertArr = onlyInsertData[tableName] || [];
|
|
|
+ let onlyUpdateArr = onlyUpdateData[tableName] || [];
|
|
|
+ let updateArr = updateData[tableName] || [];
|
|
|
+
|
|
|
+ let result = null;
|
|
|
+ let update_mode = table.update_mode ? table.update_mode : this.rule.update_mode;
|
|
|
+ if (update_mode === 'replace') {
|
|
|
+ result = await this._replaceBatch(table, items2, onlyInsertArr, updateArr, onlyUpdateArr);
|
|
|
+ } else {
|
|
|
+ result = await this._updateOneByOne(table, items2, onlyInsertArr, updateArr, onlyUpdateArr);
|
|
|
+ }
|
|
|
+
|
|
|
+ await this._notifyUrl(log_id, table.notice_url, result);
|
|
|
+
|
|
|
+ // 图片/视频 转存信息
|
|
|
+ await this._saveasData(table, items2, saveasItems, result.datas, result.temp_saveas_fields);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async _notifyUrl(log_id, notice_url, data) {
|
|
|
+ if (!notice_url) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (php.empty(data.datas)) {
|
|
|
+ Tool.log('没发生数据变化,不发送通知');
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ let objHttp = new dwHttp();
|
|
|
+ data.log_id = log_id;
|
|
|
+ data.oldDatas = JSON.stringify(data.oldDatas);
|
|
|
+ data.datas = JSON.stringify(data.datas);
|
|
|
+ let json = await objHttp.post2(notice_url, data);
|
|
|
+
|
|
|
+ Tool.log('发送数据变更通知:' + notice_url + ',返回:' + json);
|
|
|
+ }
|
|
|
+
|
|
|
+ async _replaceBatch(table, items, onlyInsertArr, updateArr, onlyUpdateArr) {
|
|
|
+ let objTable = new TableHelper(table.table_name, table.db_name);
|
|
|
+ let priKey = table.pri_key && table.pri_key.trim();
|
|
|
+
|
|
|
+ let len = onlyInsertArr.length || updateArr.length || onlyUpdateArr.length;
|
|
|
+ let allArr = [];
|
|
|
+ let oldDatas = {};
|
|
|
+ let datas = {};
|
|
|
+
|
|
|
+ Tool.log(`_replaceBatch, onlyInsertArr:${onlyInsertArr.length}, updateArr:${updateArr.length}, onlyUpdateArr:${onlyUpdateArr.length}, getTempData:false`);
|
|
|
+ let {allOldDatas} = await this._getOldDatas(table, onlyInsertArr, updateArr, onlyUpdateArr, false);
|
|
|
+
|
|
|
+ // let objTempData = new TableHelper('temp_data', 'crawl');
|
|
|
+ // let has_temp_data = await objTempData.getCount({db_id:table.db_id});
|
|
|
+
|
|
|
+ for (let i = 0; i < len; i++) {
|
|
|
+ onlyInsertArr[i] = onlyInsertArr[i] || {};
|
|
|
+ updateArr[i] = updateArr[i] || {};
|
|
|
+ onlyUpdateArr[i] = onlyUpdateArr[i] || {};
|
|
|
+
|
|
|
+ let allData = Object.assign({}, onlyInsertArr[i], updateArr[i], onlyUpdateArr[i]);
|
|
|
+ let where = this._getWhere(allData, priKey);
|
|
|
+ if (where === false) {
|
|
|
+ Tool.error('Replace的数据必须要含有主键');
|
|
|
+ this.state = this.STATE_PART_SUCC;
|
|
|
+ } else {
|
|
|
+ // let oldData = await objTable.getRow(where);
|
|
|
+ let whereStr = JSON.stringify(where);
|
|
|
+ let oldData = allOldDatas[whereStr];
|
|
|
+ if (oldData) {
|
|
|
+ // 存在不同的数据才更新
|
|
|
+ for (let key in allData) {
|
|
|
+ if (allData[key] != oldData[key]) {
|
|
|
+ allArr.push(allData);
|
|
|
+ oldDatas[whereStr] = oldData;
|
|
|
+ datas[whereStr] = allData;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // replace方式,暂时不支持暂存数据
|
|
|
+ // if (tempDatas) {
|
|
|
+ // let whereStr2 = table.db_id + php.md5(whereStr);
|
|
|
+ // if (tempDatas[whereStr2]) {
|
|
|
+ // let temp_value = JSON.parse(tempDatas[whereStr2]);
|
|
|
+ // allData = Object.assign(temp_value, allData);
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ allArr.push(allData);
|
|
|
+ datas[whereStr] = allData;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (allArr.length) {
|
|
|
+ await objTable.replaceObjects2(allArr);
|
|
|
+ } else {
|
|
|
+ Tool.log('没有新数据,不需要replaceObjects2');
|
|
|
+ }
|
|
|
+ } catch (ex) {
|
|
|
+ this.state = this.STATE_ERROR;
|
|
|
+ Tool.err(ex.stack);
|
|
|
+ }
|
|
|
+
|
|
|
+ return {oldDatas, datas};
|
|
|
+ }
|
|
|
+
|
|
|
+ async _saveasData(table, items2, saveasItems, datas, temp_saveas_fields) {
|
|
|
+ let table_name = table.table_name;
|
|
|
+ if ((!saveasItems[table_name] && php.empty(temp_saveas_fields)) || php.empty(datas)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ let saveasArr = [];
|
|
|
+ let insert_saveas_data_ids = [];
|
|
|
+ for (let whereStr in datas) {
|
|
|
+ let allData = datas[whereStr];
|
|
|
+ // 判断是否存在转存字段
|
|
|
+ if (saveasItems[table_name]) {
|
|
|
+ for (let field_name of saveasItems[table_name]) {
|
|
|
+ if (!allData[field_name]) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let item = items2[table_name][field_name];
|
|
|
+ if (insert_saveas_data_ids.indexOf(table.db_id + '|' + whereStr + '|' + field_name) == '-1') {
|
|
|
+ saveasArr.push({
|
|
|
+ saveas_data_id : php.md5(table.db_id + '|' + whereStr + '|' + field_name),
|
|
|
+ db_id : table.db_id,
|
|
|
+ key_value : whereStr,
|
|
|
+ save_as : item['save_as'],
|
|
|
+ save_as_referer : item['save_as_referer'],
|
|
|
+ field_name
|
|
|
+ });
|
|
|
+
|
|
|
+ insert_saveas_data_ids.push(table.db_id + '|' + whereStr + '|' + field_name);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ //临时表合并过来的数据需要检查是否插入转存
|
|
|
+ if (temp_saveas_fields[whereStr]) {
|
|
|
+ let temp_data = temp_saveas_fields[whereStr];
|
|
|
+ for (let temp_field in temp_data) {
|
|
|
+ if (!allData[temp_field]) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (insert_saveas_data_ids.indexOf(table.db_id + '|' + whereStr + '|' + temp_field) == '-1') {
|
|
|
+ let saveas_db_id = table.db_id;
|
|
|
+ if (temp_data[temp_field].db_id) {
|
|
|
+ saveas_db_id = temp_data[temp_field].db_id;
|
|
|
+ }
|
|
|
+ saveasArr.push({
|
|
|
+ saveas_data_id : php.md5(table.db_id + '|' + whereStr + '|' + temp_field),
|
|
|
+ db_id : saveas_db_id,
|
|
|
+ key_value : whereStr,
|
|
|
+ save_as : temp_data[temp_field].save_as,
|
|
|
+ save_as_referer : temp_data[temp_field].save_as_referer,
|
|
|
+ field_name : temp_field
|
|
|
+ });
|
|
|
+
|
|
|
+ insert_saveas_data_ids.push(table.db_id + '|' + whereStr + '|' + temp_field);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (saveasArr.length) {
|
|
|
+ let objTable = new TableHelper('saveas_data', 'crawl');
|
|
|
+ await objTable.replaceObjects2(saveasArr);
|
|
|
+ }
|
|
|
+ } catch (ex) {
|
|
|
+ this.state = this.STATE_ERROR;
|
|
|
+ Tool.err(ex.stack);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _getOldDatas(table, onlyInsertArr, updateArr, onlyUpdateArr, getTempData) {
|
|
|
+ let objTable = new TableHelper(table.table_name, table.db_name);
|
|
|
+ let priKey = table.pri_key && table.pri_key.trim();
|
|
|
+
|
|
|
+ let mergeWhere = this._getMergeWhere(priKey, onlyInsertArr, updateArr, onlyUpdateArr);
|
|
|
+ let allOldDatas = {}, tempDatas = {};
|
|
|
+ let len = onlyInsertArr.length || updateArr.length || onlyUpdateArr.length;
|
|
|
+ if (len <= 0) {
|
|
|
+ return {allOldDatas, tempDatas};
|
|
|
+ }
|
|
|
+
|
|
|
+ let row = Object.assign({}, onlyInsertArr[0], updateArr[0], onlyUpdateArr[0]);
|
|
|
+ let keys = Object.keys(row);
|
|
|
+
|
|
|
+ let has_temp_data = false;
|
|
|
+ let objTempData = new TableHelper('temp_data', 'crawl');
|
|
|
+ if (getTempData) {
|
|
|
+ let dateStr = php.date('Y-m-d', php.time() - 86400 * 3);
|
|
|
+ let _where = `create_time > '${dateStr}'`;
|
|
|
+ // 判断是否有临时数据
|
|
|
+ has_temp_data = await objTempData.getCount({db_id: table.db_id}, {_where});
|
|
|
+ }
|
|
|
+
|
|
|
+ let _field = '`' + keys.join('`,`') + '`';
|
|
|
+ let temp_keys = [];
|
|
|
+ if (!mergeWhere) {
|
|
|
+ for (let i = 0; i < len; i++) {
|
|
|
+ let allData = Object.assign({}, onlyInsertArr[i], updateArr[i], onlyUpdateArr[i]);
|
|
|
+ let where = this._getWhere(allData, priKey);
|
|
|
+ if (where === false) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let key = JSON.stringify(where);
|
|
|
+ has_temp_data && temp_keys.push(php.md5(key));
|
|
|
+ allOldDatas[key] = await objTable.getRow(where, {_field});
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ let datas = await objTable.getAll(mergeWhere, {_field});
|
|
|
+ for (let data of datas) {
|
|
|
+ let where = this._getWhere(data, priKey);
|
|
|
+ let key = JSON.stringify(where);
|
|
|
+ allOldDatas[key] = data;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (has_temp_data) {
|
|
|
+ // 构造临时数据的条件
|
|
|
+ for (let i = 0; i < len; i++) {
|
|
|
+ let allData = Object.assign({}, onlyInsertArr[i], updateArr[i], onlyUpdateArr[i]);
|
|
|
+ let where = this._getWhere(allData, priKey);
|
|
|
+ if (where === false) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let key = JSON.stringify(where);
|
|
|
+ temp_keys.push(php.md5(key));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (has_temp_data && temp_keys.length) {
|
|
|
+ let where = {
|
|
|
+ db_id: table.db_id,
|
|
|
+ temp_key: temp_keys,
|
|
|
+ };
|
|
|
+
|
|
|
+ let datas = await objTempData.getAll(where);
|
|
|
+ for (let data of datas) {
|
|
|
+ tempDatas[data.db_id + data.temp_key] = {"temp_value": data.temp_value, "temp_saveas_data": data.temp_saveas_data, "db_id": table.db_id};
|
|
|
+ }
|
|
|
+
|
|
|
+ objTempData.updateObject({'create_time': '2000-01-01'}, where);
|
|
|
+ }
|
|
|
+
|
|
|
+ return {allOldDatas, tempDatas};
|
|
|
+ }
|
|
|
+
|
|
|
+ _getMergeWhere(priKey, onlyInsertArr, updateArr, onlyUpdateArr) {
|
|
|
+ let mergeWhere = {};
|
|
|
+ let len = onlyInsertArr.length || updateArr.length || onlyUpdateArr.length;
|
|
|
+ for (let i = 0; i < len; i++) {
|
|
|
+ let allData = Object.assign({}, onlyInsertArr[i], updateArr[i], onlyUpdateArr[i]);
|
|
|
+ let where = this._getWhere(allData, priKey);
|
|
|
+ if (where === false) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let arrNum = 0;
|
|
|
+ for (let key in where) {
|
|
|
+ let val = where[key];
|
|
|
+ if (!mergeWhere[key]) {
|
|
|
+ // 第一次赋值
|
|
|
+ mergeWhere[key] = val;
|
|
|
+ } else if (mergeWhere[key] !== val) {
|
|
|
+ if (typeof mergeWhere[key] === 'object') {
|
|
|
+ mergeWhere[key].push(val);
|
|
|
+ } else {
|
|
|
+ // 转化为数组
|
|
|
+ arrNum++;
|
|
|
+ if (arrNum > 1) {
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
+ let preValue = mergeWhere[key];
|
|
|
+ mergeWhere[key] = [];
|
|
|
+ mergeWhere[key].push(preValue);
|
|
|
+ mergeWhere[key].push(val);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return mergeWhere;
|
|
|
+ }
|
|
|
+
|
|
|
+ async _updateOneByOne(table, items, onlyInsertArr, updateArr, onlyUpdateArr) {
|
|
|
+ let len = onlyInsertArr.length || updateArr.length || onlyUpdateArr.length;
|
|
|
+ let priKey = table.pri_key && table.pri_key.trim();
|
|
|
+ let oldDatas = {};
|
|
|
+
|
|
|
+ let datas = {};
|
|
|
+ let getTempData = onlyInsertArr.length || updateArr.length;
|
|
|
+ Tool.log(`_updateOneByOne, onlyInsertArr:${onlyInsertArr.length}, updateArr:${updateArr.length}, onlyUpdateArr:${onlyUpdateArr.length}, getTempData:${getTempData}`);
|
|
|
+ let {allOldDatas, tempDatas} = await this._getOldDatas(table, onlyInsertArr, updateArr, onlyUpdateArr, getTempData);
|
|
|
+ let prefix = `db_name:${table.db_name}, table:${table.table_name}: `;
|
|
|
+
|
|
|
+ let objTempData = new TableHelper('temp_data', 'crawl');
|
|
|
+ let need_saveas_data = {};
|
|
|
+ for (let key in items[table.table_name]) {
|
|
|
+ if (items[table.table_name][key]['save_as'] > 0) {
|
|
|
+ need_saveas_data[key] = {'save_as': items[table.table_name][key]['save_as'], 'save_as_referer': items[table.table_name][key]['save_as_referer']};
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ let onlyUpdataDatas = [];
|
|
|
+ let temp_saveas_fields = {};
|
|
|
+ try {
|
|
|
+ for (let i = 0; i < len; i++) {
|
|
|
+ onlyInsertArr[i] = onlyInsertArr[i] || {};
|
|
|
+ updateArr[i] = updateArr[i] || {};
|
|
|
+ onlyUpdateArr[i] = onlyUpdateArr[i] || {};
|
|
|
+
|
|
|
+ let allData = Object.assign({}, onlyInsertArr[i], updateArr[i], onlyUpdateArr[i]);
|
|
|
+
|
|
|
+ let where = this._getWhere(allData, priKey);
|
|
|
+ if (where === false) {
|
|
|
+ Tool.err('数据没包含主键:' + priKey);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ let whereStr = JSON.stringify(where);
|
|
|
+ let objTable = new TableHelper(table.table_name, table.db_name);
|
|
|
+ // let oldData = await objTable.getRow(where);
|
|
|
+ let oldData = allOldDatas[whereStr];
|
|
|
+ if (!oldData) {
|
|
|
+ allData = Object.assign({}, onlyInsertArr[i], updateArr[i]);
|
|
|
+ let newData = {};
|
|
|
+ // 去掉null的数据
|
|
|
+ for (let key in allData) {
|
|
|
+ if (allData[key] !== null && allData[key] !== undefined) {
|
|
|
+ newData[key] = allData[key];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!php.empty(newData)) {
|
|
|
+ // 判断是否有暂存数据,如果有则合并
|
|
|
+ if (tempDatas) {
|
|
|
+ if (tempDatas[table.db_id + php.md5(whereStr)] != undefined) {
|
|
|
+ let temp_value = tempDatas[table.db_id + php.md5(whereStr)].temp_value;
|
|
|
+ temp_value = JSON.parse(temp_value);
|
|
|
+ newData = Object.assign(temp_value, newData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ await objTable.addObject(newData);
|
|
|
+ if (tempDatas[table.db_id + php.md5(whereStr)] != undefined) {
|
|
|
+ let temp_saveas_data = tempDatas[table.db_id + php.md5(whereStr)].temp_saveas_data;
|
|
|
+ temp_saveas_data = JSON.parse(temp_saveas_data);
|
|
|
+ temp_saveas_fields[whereStr] = temp_saveas_data;
|
|
|
+ }
|
|
|
+
|
|
|
+ datas[whereStr] = newData;
|
|
|
+ } else if (!php.empty(onlyUpdateArr)) {
|
|
|
+ // 处理暂存的数据
|
|
|
+ let db_id = table.db_id;
|
|
|
+ let temp_key = php.md5(whereStr);
|
|
|
+ let temp_value = JSON.stringify(onlyUpdateArr[i]);
|
|
|
+ let temp_saveas_data = {}
|
|
|
+ for (let field in onlyUpdateArr[i]) {
|
|
|
+ if (need_saveas_data[field]) {
|
|
|
+ temp_saveas_data[field] = need_saveas_data[field];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ temp_saveas_data = JSON.stringify(temp_saveas_data);
|
|
|
+ onlyUpdataDatas.push({db_id, temp_key, temp_value, temp_saveas_data});
|
|
|
+ } else {
|
|
|
+ Tool.log(prefix + "没有'只插入'或'更新'的数据");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ allData = Object.assign({}, updateArr[i], onlyUpdateArr[i]);
|
|
|
+ if (!php.empty(allData)) {
|
|
|
+ let newData = {};
|
|
|
+ // 存在老数据,只更新updateData
|
|
|
+ for (let key in allData) {
|
|
|
+ if (allData[key] === null && allData[key] === undefined) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 只填充的字段,只在更新模式有限
|
|
|
+ if (items[table.table_name][key]['only_fill']) {
|
|
|
+ // 老数据不存在时才填充
|
|
|
+ if (!oldData[key] && allData[key]) {
|
|
|
+ newData[key] = allData[key];
|
|
|
+ } else {
|
|
|
+ // Tool.log(`只填充字段:${key}, 已存在老数据:${oldData[key]}, 新数据:${allData[key]}`);
|
|
|
+ Tool.log(`只填充字段:${key}, 已存在老数据,不需要填充`);
|
|
|
+ }
|
|
|
+ } else if (allData[key] != oldData[key]) {
|
|
|
+ newData[key] = allData[key];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!php.empty(newData)) {
|
|
|
+ await objTable.updateObject(newData, where);
|
|
|
+ oldDatas[whereStr] = oldData;
|
|
|
+ datas[whereStr] = newData;
|
|
|
+ } else {
|
|
|
+ Tool.log(prefix + "newData和oldData数据一样,不需要更新");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Tool.log(prefix + "没有'只更新'或'更新'的数据");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 暂存的数据
|
|
|
+ if (onlyUpdataDatas.length) {
|
|
|
+ await objTempData.replaceObjects2(onlyUpdataDatas);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (ex) {
|
|
|
+ this.state = this.STATE_ERROR;
|
|
|
+ Tool.err(ex.stack);
|
|
|
+ }
|
|
|
+
|
|
|
+ return {oldDatas, datas, temp_saveas_fields};
|
|
|
+ }
|
|
|
+
|
|
|
+ _getWhere(allData, priKey) {
|
|
|
+ let where = {};
|
|
|
+ let parts = priKey.split(',');
|
|
|
+ for (let part of parts) {
|
|
|
+ priKey = part.trim();
|
|
|
+ if (allData[priKey] === undefined) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ where[priKey] = allData[priKey] + '';
|
|
|
+ }
|
|
|
+
|
|
|
+ return where;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+module.exports = Spider;
|