tableName = $tableName; $this->codeTable = $codeTable ?: $tableName; $this->priKey = $priKey; $this->indexName = $indexName; $this->code = $code ?: $GLOBALS['codeMee']; $this->dataTableWhere = $dataTableWhere ?: []; $this->objRedis = dwRedis::init($this->cacheKey); $this->objTable = new TableHelper($this->tableName, $this->dbKey); } private static function getPreKey($tableName) { return "globals:syncEosTable:{$tableName}:"; } private static function _getClearKey($tableName) { return "globals:clearEosTable:{$tableName}"; } protected static function getRequestKey($tableName) { return self::getPreKey($tableName) . 'request'; } protected static function getResponseKey($tableName) { return self::getPreKey($tableName) . 'response'; } public function isTimeout($time) { $responseKey = self::getResponseKey($this->tableName); $runTime = $this->objRedis->get($responseKey); $span = $time - $runTime; if ($span >= $this->frequency) { $msg = "Sync {$this->code}.{$this->codeTable} => {$this->dbKey}.{$this->tableName} timeout, time:{$time}, runTime:{$runTime}, span:{$span} > frequency:{$this->frequency}"; Eos::log($msg); if ($runTime && $span > $this->frequency * 5 && $span > 10) { alermErrorMsg($msg); } return true; } else { return false; } } public function pubSubscribe($waitNewer = true, $all = false) { $time = microtime(true); $requestKey = self::getRequestKey($this->tableName); $this->objRedis->setex($requestKey, $this->frequency, $time); // Eos::log("time:{$time}"); Eos::pubEvent($requestKey, compact('time', 'all')); if ($waitNewer) { // 最多等10秒 for ($i = 0; $i < 100; $i++) { $responseKey = self::getResponseKey($this->tableName); $runTime = $this->objRedis->get($responseKey); if ($runTime > $time) { // Eos::log("runTime:{$runTime} > time:{$time}"); return true; } else { // Eos::log("sleep, runTime:{$runTime} < time:{$time}"); // 休息 100ms Tool::usleep(100 * 1000); } } } return false; } public function getNewestDatas($lower = null, $limit = 1000) { $url = "{$GLOBALS['eosUrl']}v1/chain/get_table_rows"; $objHttp = new dwHttp(); $data = [ 'scope' => $this->code, 'code' => $this->code, 'table' => $this->codeTable, 'json' => true, 'limit' => $limit, 'key_type' => $this->keyType, 'index_position' => $this->index, ]; if ($lower) { $data['lower_bound'] = $lower; } // 第一次2秒就超时 $json = $objHttp->post2($url, json_encode($data), 5, 2); $datas = json_decode($json, true); Eos::log("get {$this->code} {$this->codeTable} lower:{$lower}, count:" . count($datas['rows'])); return $datas; } protected function updateRow($row, $priValue, $dbRow) { if ($this->dataField) { $newRow = arrayFilter($row, $this->dataField); $needUpdate = false; foreach ($newRow as $k => $v) { if ($dbRow[$k] != $v) { $needUpdate = true; break; } } // 不需要更新,则跳过 if (!$needUpdate) { // Eos::log("skip same row, priKey:{$dbRow[$this->priKey]}, dbRow:" . json_encode($dbRow)); return; } } $this->objTable->updateObject($row, [$this->priKey => $priValue]); Eos::log("update {$this->tableName}, dbRow:" . json_encode($dbRow)); Eos::log("update {$this->tableName}, new row:" . json_encode($row)); } protected function addNewRows($newRows) { // 批量添加 Eos::log("addObjectsIfNoExist {$this->tableName} count:" . count($newRows)); foreach ($newRows as $newRow) { Eos::log("addNewRow {$this->tableName}: new row:" . json_encode($newRow)); } $this->objTable->addObjectsIfNoExist($newRows); } protected function tryClearData() { if ($this->clearTtl > 0) { $key = $this->_getClearKey($this->tableName); $flag = $this->objRedis->exists($key); Eos::log("exists ClearKey:{$flag}"); if (!$flag) { $this->clearData(); $this->objRedis->setex($key, $this->clearTtl, time()); } } } /** * 清除数据 */ protected function clearData() { if ($this->clearAction) { // 取倒数第20条 $_sortKey = "update_time_int DESC"; $last = $this->objTable->getRow(compact('_sortKey')); $update_time_int = $last['update_time_int']; $maxUpdateTime = 1000000 * (time() - 180); $update_time_int = max($update_time_int, $maxUpdateTime); // 清除数据 $cmd = "cleos -u {$GLOBALS['eosUrl']} push action {$this->code} {$this->clearAction} '[ \"{$update_time_int}\" ]' -j -p {$this->code}"; $json = Eos::execCmd($cmd); $ret = json_decode($json, true); Eos::log($cmd); Eos::log("transaction_id: {$ret['transaction_id']}"); return $ret['transaction_id']; } else { throw new Exception("{$this->tableName} have not clearAction"); } } /** * 更新数据库,返回新增数据 和 更新的数据 * * @param $rows * * @return array */ protected function updateDb($rows) { if (!$rows) { return []; } $_field = "`{$this->priKey}`"; if ($this->dataField) { $_field .= ", `" . join("`, `", $this->dataField) . "`"; } // 考虑到有可能会丢数据,所以不能只拿合约上的数据 $ids = array_column($rows, $this->priKey); // $existMap = $this->objTable->getAll(, compact('_field')); // 获取线上时间最小的时间 $maxTime = time() * 1000000; foreach ($rows as $row) { $maxTime = max($row[$this->indexName], $maxTime); } $_where = "`{$this->indexName}` >= '{$maxTime}' "; if ($ids) { $_where .= " OR `{$this->priKey}` IN ('" . join("', '", $ids) . "') "; } $existMap = $this->objTable->getAll(compact('_field', '_where')); $existMap = arrayFormatKey($existMap, $this->priKey); $existMap2 = $existMap; $newDatas = []; foreach ($rows as $row) { $id = $row[$this->priKey]; $dbRow = $existMap[$id]; if ($dbRow) { // 匹配到,则删除 unset($existMap2[$id]); // 去掉主键 unset($row[$this->priKey]); $this->updateRow($row, $id, $dbRow); } else { $newDatas[] = $row; } } if ($newDatas) { // 转化回数组 $this->addNewRows($newDatas); } if ($existMap2) { $objLostLog = new TableHelper('lost_log', 'dw_eos'); // 要删除的多余数据...,这个是因为交易丢失造成的脏数据 $lostLogs = []; $lostIds = []; $timeout = (time() - 15) * 1000000; foreach ($existMap2 as $id => $lostRow) { // 超过15秒的数据,还出现丢失,说明有问题 if ($lostRow['update_time_int'] < $timeout) { $lostLogs[] = [ 'db_name' => $this->dbKey, 'table_name' => $this->tableName, 'pri_key' => $id, 'data' => json_encode($lostRow), ]; $lostIds[] = $id; } } if ($lostIds) { // 删除数据,并记录log $objLostLog->addObjects2($lostLogs); // 先注释删除 // $this->objTable->delObject([$this->priKey => $lostIds]); // 来个告警 alermErrorMsg("同步{$this->tableName}, 发现脏数据, id:" . join(',', $lostIds)); } } // 添加数据的时候,尝试删除老数据 $this->tryClearData(); return compact('existMap', 'newDatas'); } /** * 同步一次 */ public function syncOnce($all = false) { if ($all) { $maxIndex = null; } else { // 注意,这里最新的数据可能是不靠谱的,需要过了3分钟的不可逆的数据才靠谱 $maxIndex = $this->objTable->getOne($this->dataTableWhere, [ '_field' => "`{$this->indexName}`", '_sortKey' => "`{$this->indexName}` DESC" ]); // 获取3分钟内的数据 $maxIndex = min($maxIndex, (time() - 180) * 1000000); } $ret = $this->getNewestDatas($maxIndex); $this->updateDb($ret['rows']); return $ret; } /** * 守卫进程,定时同步 */ public function syncDaemon() { $this->sync(); // 订阅事件 $requestKey = self::getRequestKey($this->tableName); try { $objSubRedis = dwRedis::initNewOne($this->cacheKey); $objSubRedis->setOption(Redis::OPT_READ_TIMEOUT, $this->frequency * $this->maxTimes); $objSubRedis->subscribe([$requestKey], function($objRedis, $channel, $json) use($requestKey) { $data = json_decode($json, true); $max = (int) ($this->objRedis->get($requestKey) * 1000); $time = (int) ($data['time'] * 1000); if ($time >= $max) { // Eos::log("subscribe channel: {$channel}, all:{$data['all']}, msg: {$data['time']} >= max: {$max}"); $this->sync($data['all']); } else { Eos::log("skip, msg:{$data['time']} < max:{$max}."); } }); } catch (RedisException $ex) { Eos::log("timeout, ex:" . $ex->getMessage()); } } /** * 守卫进程,定时同步 */ private function sync($all = false, $times = 0) { if ($times++ > $this->maxTimes) { return; } $runTime = microtime(true); $this->syncOnce($all); $responseKey = self::getResponseKey($this->tableName); $this->objRedis->setex($responseKey, $this->frequency * 10, $runTime); $requestKey = self::getRequestKey($this->tableName); $requestTime = $this->objRedis->get($requestKey); if ($requestTime >= $runTime) { Eos::log("find requestTime: {$requestTime}, syncDaemon at once."); $clientNum = Eos::pubEvent($responseKey, $runTime); Eos::log("pubEvent: {$responseKey}, clientNum: {$clientNum}"); $this->afterSync(); $this->sync($all, $times); } else { $this->afterSync(); } $msg = 'sync done, memory used: ' . memory_get_usage() . ' bytes'; CallLog::logSelfCall(CODE_SUCCESS, $msg); // 记录日志后,重置时间,才能统计每次循环所用的时间 global $startTime; $startTime = microtime(true); } protected function afterSync() { } protected function formatDateTime($str) { if (strpos($str, 'T') > 0) { return date('Y-m-d H:i:s', strtotime($str) + 8 * 3600); } else { return date('Y-m-d H:i:s', strtotime($str)); } } }