Base.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. <?php
  2. /**
  3. * 同布表格数据基类
  4. * User: ben
  5. */
  6. class Sync_Base extends Singleton {
  7. protected $objRedis;
  8. protected $objTable;
  9. /**
  10. * 表名
  11. * @var string
  12. */
  13. protected $code;
  14. /**
  15. * 表名
  16. * @var string
  17. */
  18. protected $tableName;
  19. /**
  20. * 合约表名,默认跟tableName一样
  21. * @var string
  22. */
  23. protected $codeTable;
  24. /**
  25. * 缓存的key
  26. * @var string
  27. */
  28. protected $cacheKey = 'default';
  29. /**
  30. * 数据库key
  31. * @var string
  32. */
  33. protected $dbKey = 'default';
  34. /**
  35. * 主键的字段名
  36. * @var string
  37. */
  38. protected $priKey;
  39. /**
  40. * 关键数据字段(无需传主键)
  41. * @var array
  42. */
  43. protected $dataField = [];
  44. /**
  45. * 索引的字段名
  46. * @var string
  47. */
  48. protected $indexName;
  49. /**
  50. * 索引,Index number, 1 - primary (first), 2 - secondary index (in order defined by multi_index), 3 - third index, etc.
  51. * @var int
  52. */
  53. protected $index = 1;
  54. /**
  55. * 索引类型,The key type of --index, primary only supports (i64), all others support (i64, i128, i256, float64, float128, ripemd160, sha256).
  56. * @var string
  57. */
  58. protected $keyType = 'i64';
  59. /**
  60. * 频率,几秒同步一次
  61. * @var int
  62. */
  63. protected $frequency = 10;
  64. /**
  65. * 数据清除间隔
  66. * @var int
  67. */
  68. protected $clearTtl = -1;
  69. /**
  70. * 清除数据的动作名
  71. * @var null
  72. */
  73. protected $clearAction = null;
  74. /**
  75. * 执行最大次数
  76. * @var int
  77. */
  78. protected $maxTimes = 1000;
  79. /**
  80. * 数据表查询条件
  81. * @var array
  82. */
  83. protected $dataTableWhere;
  84. public function __construct($tableName, $priKey = 'id', $indexName = 'id', $codeTable = null, $code = null, $dataTableWhere = []) {
  85. $this->tableName = $tableName;
  86. $this->codeTable = $codeTable ?: $tableName;
  87. $this->priKey = $priKey;
  88. $this->indexName = $indexName;
  89. $this->code = $code ?: $GLOBALS['codeMee'];
  90. $this->dataTableWhere = $dataTableWhere ?: [];
  91. $this->objRedis = dwRedis::init($this->cacheKey);
  92. $this->objTable = new TableHelper($this->tableName, $this->dbKey);
  93. }
  94. private static function getPreKey($tableName) {
  95. return "globals:syncEosTable:{$tableName}:";
  96. }
  97. private static function _getClearKey($tableName) {
  98. return "globals:clearEosTable:{$tableName}";
  99. }
  100. protected static function getRequestKey($tableName) {
  101. return self::getPreKey($tableName) . 'request';
  102. }
  103. protected static function getResponseKey($tableName) {
  104. return self::getPreKey($tableName) . 'response';
  105. }
  106. public function isTimeout($time) {
  107. $responseKey = self::getResponseKey($this->tableName);
  108. $runTime = $this->objRedis->get($responseKey);
  109. $span = $time - $runTime;
  110. if ($span >= $this->frequency) {
  111. $msg = "Sync {$this->code}.{$this->codeTable} => {$this->dbKey}.{$this->tableName} timeout, time:{$time}, runTime:{$runTime}, span:{$span} > frequency:{$this->frequency}";
  112. Eos::log($msg);
  113. if ($runTime && $span > $this->frequency * 5 && $span > 10) {
  114. alermErrorMsg($msg);
  115. }
  116. return true;
  117. } else {
  118. return false;
  119. }
  120. }
  121. public function pubSubscribe($waitNewer = true, $all = false) {
  122. $time = microtime(true);
  123. $requestKey = self::getRequestKey($this->tableName);
  124. $this->objRedis->setex($requestKey, $this->frequency, $time);
  125. // Eos::log("time:{$time}");
  126. Eos::pubEvent($requestKey, compact('time', 'all'));
  127. if ($waitNewer) {
  128. // 最多等10秒
  129. for ($i = 0; $i < 100; $i++) {
  130. $responseKey = self::getResponseKey($this->tableName);
  131. $runTime = $this->objRedis->get($responseKey);
  132. if ($runTime > $time) {
  133. // Eos::log("runTime:{$runTime} > time:{$time}");
  134. return true;
  135. } else {
  136. // Eos::log("sleep, runTime:{$runTime} < time:{$time}");
  137. // 休息 100ms
  138. Tool::usleep(100 * 1000);
  139. }
  140. }
  141. }
  142. return false;
  143. }
  144. public function getNewestDatas($lower = null, $limit = 1000) {
  145. $url = "{$GLOBALS['eosUrl']}v1/chain/get_table_rows";
  146. $objHttp = new dwHttp();
  147. $data = [
  148. 'scope' => $this->code,
  149. 'code' => $this->code,
  150. 'table' => $this->codeTable,
  151. 'json' => true,
  152. 'limit' => $limit,
  153. 'key_type' => $this->keyType,
  154. 'index_position' => $this->index,
  155. ];
  156. if ($lower) {
  157. $data['lower_bound'] = $lower;
  158. }
  159. // 第一次2秒就超时
  160. $json = $objHttp->post2($url, json_encode($data), 5, 2);
  161. $datas = json_decode($json, true);
  162. Eos::log("get {$this->code} {$this->codeTable} lower:{$lower}, count:" . count($datas['rows']));
  163. return $datas;
  164. }
  165. protected function updateRow($row, $priValue, $dbRow) {
  166. if ($this->dataField) {
  167. $newRow = arrayFilter($row, $this->dataField);
  168. $needUpdate = false;
  169. foreach ($newRow as $k => $v) {
  170. if ($dbRow[$k] != $v) {
  171. $needUpdate = true;
  172. break;
  173. }
  174. }
  175. // 不需要更新,则跳过
  176. if (!$needUpdate) {
  177. // Eos::log("skip same row, priKey:{$dbRow[$this->priKey]}, dbRow:" . json_encode($dbRow));
  178. return;
  179. }
  180. }
  181. $this->objTable->updateObject($row, [$this->priKey => $priValue]);
  182. Eos::log("update {$this->tableName}, dbRow:" . json_encode($dbRow));
  183. Eos::log("update {$this->tableName}, new row:" . json_encode($row));
  184. }
  185. protected function addNewRows($newRows) {
  186. // 批量添加
  187. Eos::log("addObjectsIfNoExist {$this->tableName} count:" . count($newRows));
  188. foreach ($newRows as $newRow) {
  189. Eos::log("addNewRow {$this->tableName}: new row:" . json_encode($newRow));
  190. }
  191. $this->objTable->addObjectsIfNoExist($newRows);
  192. }
  193. protected function tryClearData() {
  194. if ($this->clearTtl > 0) {
  195. $key = $this->_getClearKey($this->tableName);
  196. $flag = $this->objRedis->exists($key);
  197. Eos::log("exists ClearKey:{$flag}");
  198. if (!$flag) {
  199. $this->clearData();
  200. $this->objRedis->setex($key, $this->clearTtl, time());
  201. }
  202. }
  203. }
  204. /**
  205. * 清除数据
  206. */
  207. protected function clearData() {
  208. if ($this->clearAction) {
  209. // 取倒数第20条
  210. $_sortKey = "update_time_int DESC";
  211. $last = $this->objTable->getRow(compact('_sortKey'));
  212. $update_time_int = $last['update_time_int'];
  213. $maxUpdateTime = 1000000 * (time() - 180);
  214. $update_time_int = max($update_time_int, $maxUpdateTime);
  215. // 清除数据
  216. $cmd = "cleos -u {$GLOBALS['eosUrl']} push action {$this->code} {$this->clearAction} '[ \"{$update_time_int}\" ]' -j -p {$this->code}";
  217. $json = Eos::execCmd($cmd);
  218. $ret = json_decode($json, true);
  219. Eos::log($cmd);
  220. Eos::log("transaction_id: {$ret['transaction_id']}");
  221. return $ret['transaction_id'];
  222. } else {
  223. throw new Exception("{$this->tableName} have not clearAction");
  224. }
  225. }
  226. /**
  227. * 更新数据库,返回新增数据 和 更新的数据
  228. *
  229. * @param $rows
  230. *
  231. * @return array
  232. */
  233. protected function updateDb($rows) {
  234. if (!$rows) {
  235. return [];
  236. }
  237. $_field = "`{$this->priKey}`";
  238. if ($this->dataField) {
  239. $_field .= ", `" . join("`, `", $this->dataField) . "`";
  240. }
  241. // 考虑到有可能会丢数据,所以不能只拿合约上的数据
  242. $ids = array_column($rows, $this->priKey);
  243. // $existMap = $this->objTable->getAll(, compact('_field'));
  244. // 获取线上时间最小的时间
  245. $maxTime = time() * 1000000;
  246. foreach ($rows as $row) {
  247. $maxTime = max($row[$this->indexName], $maxTime);
  248. }
  249. $_where = "`{$this->indexName}` >= '{$maxTime}' ";
  250. if ($ids) {
  251. $_where .= " OR `{$this->priKey}` IN ('" . join("', '", $ids) . "') ";
  252. }
  253. $existMap = $this->objTable->getAll(compact('_field', '_where'));
  254. $existMap = arrayFormatKey($existMap, $this->priKey);
  255. $existMap2 = $existMap;
  256. $newDatas = [];
  257. foreach ($rows as $row) {
  258. $id = $row[$this->priKey];
  259. $dbRow = $existMap[$id];
  260. if ($dbRow) {
  261. // 匹配到,则删除
  262. unset($existMap2[$id]);
  263. // 去掉主键
  264. unset($row[$this->priKey]);
  265. $this->updateRow($row, $id, $dbRow);
  266. } else {
  267. $newDatas[] = $row;
  268. }
  269. }
  270. if ($newDatas) {
  271. // 转化回数组
  272. $this->addNewRows($newDatas);
  273. }
  274. if ($existMap2) {
  275. $objLostLog = new TableHelper('lost_log', 'dw_eos');
  276. // 要删除的多余数据...,这个是因为交易丢失造成的脏数据
  277. $lostLogs = [];
  278. $lostIds = [];
  279. $timeout = (time() - 15) * 1000000;
  280. foreach ($existMap2 as $id => $lostRow) {
  281. // 超过15秒的数据,还出现丢失,说明有问题
  282. if ($lostRow['update_time_int'] < $timeout) {
  283. $lostLogs[] = [
  284. 'db_name' => $this->dbKey,
  285. 'table_name' => $this->tableName,
  286. 'pri_key' => $id,
  287. 'data' => json_encode($lostRow),
  288. ];
  289. $lostIds[] = $id;
  290. }
  291. }
  292. if ($lostIds) {
  293. // 删除数据,并记录log
  294. $objLostLog->addObjects2($lostLogs);
  295. // 先注释删除
  296. // $this->objTable->delObject([$this->priKey => $lostIds]);
  297. // 来个告警
  298. alermErrorMsg("同步{$this->tableName}, 发现脏数据, id:" . join(',', $lostIds));
  299. }
  300. }
  301. // 添加数据的时候,尝试删除老数据
  302. $this->tryClearData();
  303. return compact('existMap', 'newDatas');
  304. }
  305. /**
  306. * 同步一次
  307. */
  308. public function syncOnce($all = false) {
  309. if ($all) {
  310. $maxIndex = null;
  311. } else {
  312. // 注意,这里最新的数据可能是不靠谱的,需要过了3分钟的不可逆的数据才靠谱
  313. $maxIndex = $this->objTable->getOne($this->dataTableWhere, [
  314. '_field' => "`{$this->indexName}`",
  315. '_sortKey' => "`{$this->indexName}` DESC"
  316. ]);
  317. // 获取3分钟内的数据
  318. $maxIndex = min($maxIndex, (time() - 180) * 1000000);
  319. }
  320. $ret = $this->getNewestDatas($maxIndex);
  321. $this->updateDb($ret['rows']);
  322. return $ret;
  323. }
  324. /**
  325. * 守卫进程,定时同步
  326. */
  327. public function syncDaemon() {
  328. $this->sync();
  329. // 订阅事件
  330. $requestKey = self::getRequestKey($this->tableName);
  331. try {
  332. $objSubRedis = dwRedis::initNewOne($this->cacheKey);
  333. $objSubRedis->setOption(Redis::OPT_READ_TIMEOUT, $this->frequency * $this->maxTimes);
  334. $objSubRedis->subscribe([$requestKey], function($objRedis, $channel, $json) use($requestKey) {
  335. $data = json_decode($json, true);
  336. $max = (int) ($this->objRedis->get($requestKey) * 1000);
  337. $time = (int) ($data['time'] * 1000);
  338. if ($time >= $max) {
  339. // Eos::log("subscribe channel: {$channel}, all:{$data['all']}, msg: {$data['time']} >= max: {$max}");
  340. $this->sync($data['all']);
  341. } else {
  342. Eos::log("skip, msg:{$data['time']} < max:{$max}.");
  343. }
  344. });
  345. } catch (RedisException $ex) {
  346. Eos::log("timeout, ex:" . $ex->getMessage());
  347. }
  348. }
  349. /**
  350. * 守卫进程,定时同步
  351. */
  352. private function sync($all = false, $times = 0) {
  353. if ($times++ > $this->maxTimes) {
  354. return;
  355. }
  356. $runTime = microtime(true);
  357. $this->syncOnce($all);
  358. $responseKey = self::getResponseKey($this->tableName);
  359. $this->objRedis->setex($responseKey, $this->frequency * 10, $runTime);
  360. $requestKey = self::getRequestKey($this->tableName);
  361. $requestTime = $this->objRedis->get($requestKey);
  362. if ($requestTime >= $runTime) {
  363. Eos::log("find requestTime: {$requestTime}, syncDaemon at once.");
  364. $clientNum = Eos::pubEvent($responseKey, $runTime);
  365. Eos::log("pubEvent: {$responseKey}, clientNum: {$clientNum}");
  366. $this->afterSync();
  367. $this->sync($all, $times);
  368. } else {
  369. $this->afterSync();
  370. }
  371. $msg = 'sync done, memory used: ' . memory_get_usage() . ' bytes';
  372. CallLog::logSelfCall(CODE_SUCCESS, $msg);
  373. // 记录日志后,重置时间,才能统计每次循环所用的时间
  374. global $startTime;
  375. $startTime = microtime(true);
  376. }
  377. protected function afterSync() {
  378. }
  379. protected function formatDateTime($str) {
  380. if (strpos($str, 'T') > 0) {
  381. return date('Y-m-d H:i:s', strtotime($str) + 8 * 3600);
  382. } else {
  383. return date('Y-m-d H:i:s', strtotime($str));
  384. }
  385. }
  386. }