123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- <?php
- /**
- * 同布表格数据基类
- * User: ben
- */
- class Sync_Base extends Singleton {
- protected $objRedis;
- protected $objTable;
- /**
- * 表名
- * @var string
- */
- protected $code;
- /**
- * 表名
- * @var string
- */
- protected $tableName;
- /**
- * 合约表名,默认跟tableName一样
- * @var string
- */
- protected $codeTable;
- /**
- * 缓存的key
- * @var string
- */
- protected $cacheKey = 'default';
- /**
- * 数据库key
- * @var string
- */
- protected $dbKey = 'default';
- /**
- * 主键的字段名
- * @var string
- */
- protected $priKey;
- /**
- * 关键数据字段(无需传主键)
- * @var array
- */
- protected $dataField = [];
- /**
- * 索引的字段名
- * @var string
- */
- protected $indexName;
- /**
- * 索引,Index number, 1 - primary (first), 2 - secondary index (in order defined by multi_index), 3 - third index, etc.
- * @var int
- */
- protected $index = 1;
- /**
- * 索引类型,The key type of --index, primary only supports (i64), all others support (i64, i128, i256, float64, float128, ripemd160, sha256).
- * @var string
- */
- protected $keyType = 'i64';
- /**
- * 频率,几秒同步一次
- * @var int
- */
- protected $frequency = 10;
- /**
- * 数据清除间隔
- * @var int
- */
- protected $clearTtl = -1;
- /**
- * 清除数据的动作名
- * @var null
- */
- protected $clearAction = null;
- /**
- * 执行最大次数
- * @var int
- */
- protected $maxTimes = 1000;
- /**
- * 数据表查询条件
- * @var array
- */
- protected $dataTableWhere;
- public function __construct($tableName, $priKey = 'id', $indexName = 'id', $codeTable = null, $code = null, $dataTableWhere = []) {
- $this->tableName = $tableName;
- $this->codeTable = $codeTable ?: $tableName;
- $this->priKey = $priKey;
- $this->indexName = $indexName;
- $this->code = $code ?: $GLOBALS['codeMee'];
- $this->dataTableWhere = $dataTableWhere ?: [];
- $this->objRedis = dwRedis::init($this->cacheKey);
- $this->objTable = new TableHelper($this->tableName, $this->dbKey);
- }
- private static function getPreKey($tableName) {
- return "globals:syncEosTable:{$tableName}:";
- }
- private static function _getClearKey($tableName) {
- return "globals:clearEosTable:{$tableName}";
- }
- protected static function getRequestKey($tableName) {
- return self::getPreKey($tableName) . 'request';
- }
- protected static function getResponseKey($tableName) {
- return self::getPreKey($tableName) . 'response';
- }
- public function isTimeout($time) {
- $responseKey = self::getResponseKey($this->tableName);
- $runTime = $this->objRedis->get($responseKey);
- $span = $time - $runTime;
- if ($span >= $this->frequency) {
- $msg = "Sync {$this->code}.{$this->codeTable} => {$this->dbKey}.{$this->tableName} timeout, time:{$time}, runTime:{$runTime}, span:{$span} > frequency:{$this->frequency}";
- Eos::log($msg);
- if ($runTime && $span > $this->frequency * 5 && $span > 10) {
- alermErrorMsg($msg);
- }
- return true;
- } else {
- return false;
- }
- }
- public function pubSubscribe($waitNewer = true, $all = false) {
- $time = microtime(true);
- $requestKey = self::getRequestKey($this->tableName);
- $this->objRedis->setex($requestKey, $this->frequency, $time);
- // Eos::log("time:{$time}");
- Eos::pubEvent($requestKey, compact('time', 'all'));
- if ($waitNewer) {
- // 最多等10秒
- for ($i = 0; $i < 100; $i++) {
- $responseKey = self::getResponseKey($this->tableName);
- $runTime = $this->objRedis->get($responseKey);
- if ($runTime > $time) {
- // Eos::log("runTime:{$runTime} > time:{$time}");
- return true;
- } else {
- // Eos::log("sleep, runTime:{$runTime} < time:{$time}");
- // 休息 100ms
- Tool::usleep(100 * 1000);
- }
- }
- }
- return false;
- }
- public function getNewestDatas($lower = null, $limit = 1000) {
- $url = "{$GLOBALS['eosUrl']}v1/chain/get_table_rows";
- $objHttp = new dwHttp();
- $data = [
- 'scope' => $this->code,
- 'code' => $this->code,
- 'table' => $this->codeTable,
- 'json' => true,
- 'limit' => $limit,
- 'key_type' => $this->keyType,
- 'index_position' => $this->index,
- ];
- if ($lower) {
- $data['lower_bound'] = $lower;
- }
- // 第一次2秒就超时
- $json = $objHttp->post2($url, json_encode($data), 5, 2);
- $datas = json_decode($json, true);
- Eos::log("get {$this->code} {$this->codeTable} lower:{$lower}, count:" . count($datas['rows']));
- return $datas;
- }
- protected function updateRow($row, $priValue, $dbRow) {
- if ($this->dataField) {
- $newRow = arrayFilter($row, $this->dataField);
- $needUpdate = false;
- foreach ($newRow as $k => $v) {
- if ($dbRow[$k] != $v) {
- $needUpdate = true;
- break;
- }
- }
- // 不需要更新,则跳过
- if (!$needUpdate) {
- // Eos::log("skip same row, priKey:{$dbRow[$this->priKey]}, dbRow:" . json_encode($dbRow));
- return;
- }
- }
- $this->objTable->updateObject($row, [$this->priKey => $priValue]);
- Eos::log("update {$this->tableName}, dbRow:" . json_encode($dbRow));
- Eos::log("update {$this->tableName}, new row:" . json_encode($row));
- }
- protected function addNewRows($newRows) {
- // 批量添加
- Eos::log("addObjectsIfNoExist {$this->tableName} count:" . count($newRows));
- foreach ($newRows as $newRow) {
- Eos::log("addNewRow {$this->tableName}: new row:" . json_encode($newRow));
- }
- $this->objTable->addObjectsIfNoExist($newRows);
- }
- protected function tryClearData() {
- if ($this->clearTtl > 0) {
- $key = $this->_getClearKey($this->tableName);
- $flag = $this->objRedis->exists($key);
- Eos::log("exists ClearKey:{$flag}");
- if (!$flag) {
- $this->clearData();
- $this->objRedis->setex($key, $this->clearTtl, time());
- }
- }
- }
- /**
- * 清除数据
- */
- protected function clearData() {
- if ($this->clearAction) {
- // 取倒数第20条
- $_sortKey = "update_time_int DESC";
- $last = $this->objTable->getRow(compact('_sortKey'));
- $update_time_int = $last['update_time_int'];
- $maxUpdateTime = 1000000 * (time() - 180);
- $update_time_int = max($update_time_int, $maxUpdateTime);
- // 清除数据
- $cmd = "cleos -u {$GLOBALS['eosUrl']} push action {$this->code} {$this->clearAction} '[ \"{$update_time_int}\" ]' -j -p {$this->code}";
- $json = Eos::execCmd($cmd);
- $ret = json_decode($json, true);
- Eos::log($cmd);
- Eos::log("transaction_id: {$ret['transaction_id']}");
- return $ret['transaction_id'];
- } else {
- throw new Exception("{$this->tableName} have not clearAction");
- }
- }
- /**
- * 更新数据库,返回新增数据 和 更新的数据
- *
- * @param $rows
- *
- * @return array
- */
- protected function updateDb($rows) {
- if (!$rows) {
- return [];
- }
- $_field = "`{$this->priKey}`";
- if ($this->dataField) {
- $_field .= ", `" . join("`, `", $this->dataField) . "`";
- }
- // 考虑到有可能会丢数据,所以不能只拿合约上的数据
- $ids = array_column($rows, $this->priKey);
- // $existMap = $this->objTable->getAll(, compact('_field'));
- // 获取线上时间最小的时间
- $maxTime = time() * 1000000;
- foreach ($rows as $row) {
- $maxTime = max($row[$this->indexName], $maxTime);
- }
- $_where = "`{$this->indexName}` >= '{$maxTime}' ";
- if ($ids) {
- $_where .= " OR `{$this->priKey}` IN ('" . join("', '", $ids) . "') ";
- }
- $existMap = $this->objTable->getAll(compact('_field', '_where'));
- $existMap = arrayFormatKey($existMap, $this->priKey);
- $existMap2 = $existMap;
- $newDatas = [];
- foreach ($rows as $row) {
- $id = $row[$this->priKey];
- $dbRow = $existMap[$id];
- if ($dbRow) {
- // 匹配到,则删除
- unset($existMap2[$id]);
- // 去掉主键
- unset($row[$this->priKey]);
- $this->updateRow($row, $id, $dbRow);
- } else {
- $newDatas[] = $row;
- }
- }
- if ($newDatas) {
- // 转化回数组
- $this->addNewRows($newDatas);
- }
- if ($existMap2) {
- $objLostLog = new TableHelper('lost_log', 'dw_eos');
- // 要删除的多余数据...,这个是因为交易丢失造成的脏数据
- $lostLogs = [];
- $lostIds = [];
- $timeout = (time() - 15) * 1000000;
- foreach ($existMap2 as $id => $lostRow) {
- // 超过15秒的数据,还出现丢失,说明有问题
- if ($lostRow['update_time_int'] < $timeout) {
- $lostLogs[] = [
- 'db_name' => $this->dbKey,
- 'table_name' => $this->tableName,
- 'pri_key' => $id,
- 'data' => json_encode($lostRow),
- ];
- $lostIds[] = $id;
- }
- }
- if ($lostIds) {
- // 删除数据,并记录log
- $objLostLog->addObjects2($lostLogs);
- // 先注释删除
- // $this->objTable->delObject([$this->priKey => $lostIds]);
- // 来个告警
- alermErrorMsg("同步{$this->tableName}, 发现脏数据, id:" . join(',', $lostIds));
- }
- }
- // 添加数据的时候,尝试删除老数据
- $this->tryClearData();
- return compact('existMap', 'newDatas');
- }
- /**
- * 同步一次
- */
- public function syncOnce($all = false) {
- if ($all) {
- $maxIndex = null;
- } else {
- // 注意,这里最新的数据可能是不靠谱的,需要过了3分钟的不可逆的数据才靠谱
- $maxIndex = $this->objTable->getOne($this->dataTableWhere, [
- '_field' => "`{$this->indexName}`",
- '_sortKey' => "`{$this->indexName}` DESC"
- ]);
- // 获取3分钟内的数据
- $maxIndex = min($maxIndex, (time() - 180) * 1000000);
- }
- $ret = $this->getNewestDatas($maxIndex);
- $this->updateDb($ret['rows']);
- return $ret;
- }
- /**
- * 守卫进程,定时同步
- */
- public function syncDaemon() {
- $this->sync();
- // 订阅事件
- $requestKey = self::getRequestKey($this->tableName);
- try {
- $objSubRedis = dwRedis::initNewOne($this->cacheKey);
- $objSubRedis->setOption(Redis::OPT_READ_TIMEOUT, $this->frequency * $this->maxTimes);
- $objSubRedis->subscribe([$requestKey], function($objRedis, $channel, $json) use($requestKey) {
- $data = json_decode($json, true);
- $max = (int) ($this->objRedis->get($requestKey) * 1000);
- $time = (int) ($data['time'] * 1000);
- if ($time >= $max) {
- // Eos::log("subscribe channel: {$channel}, all:{$data['all']}, msg: {$data['time']} >= max: {$max}");
- $this->sync($data['all']);
- } else {
- Eos::log("skip, msg:{$data['time']} < max:{$max}.");
- }
- });
- } catch (RedisException $ex) {
- Eos::log("timeout, ex:" . $ex->getMessage());
- }
- }
- /**
- * 守卫进程,定时同步
- */
- private function sync($all = false, $times = 0) {
- if ($times++ > $this->maxTimes) {
- return;
- }
- $runTime = microtime(true);
- $this->syncOnce($all);
- $responseKey = self::getResponseKey($this->tableName);
- $this->objRedis->setex($responseKey, $this->frequency * 10, $runTime);
- $requestKey = self::getRequestKey($this->tableName);
- $requestTime = $this->objRedis->get($requestKey);
- if ($requestTime >= $runTime) {
- Eos::log("find requestTime: {$requestTime}, syncDaemon at once.");
- $clientNum = Eos::pubEvent($responseKey, $runTime);
- Eos::log("pubEvent: {$responseKey}, clientNum: {$clientNum}");
- $this->afterSync();
- $this->sync($all, $times);
- } else {
- $this->afterSync();
- }
- $msg = 'sync done, memory used: ' . memory_get_usage() . ' bytes';
- CallLog::logSelfCall(CODE_SUCCESS, $msg);
- // 记录日志后,重置时间,才能统计每次循环所用的时间
- global $startTime;
- $startTime = microtime(true);
- }
- protected function afterSync() {
- }
- protected function formatDateTime($str) {
- if (strpos($str, 'T') > 0) {
- return date('Y-m-d H:i:s', strtotime($str) + 8 * 3600);
- } else {
- return date('Y-m-d H:i:s', strtotime($str));
- }
- }
- }
|