Base.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. <?php
  2. /**
  3. * 同布表格数据基类
  4. * User: ben
  5. */
  6. class Sync_Base extends Singleton {
  7. protected $objRedis;
  8. protected $objTable;
  9. /**
  10. * 表名
  11. * @var string
  12. */
  13. protected $code;
  14. /**
  15. * 表名
  16. * @var string
  17. */
  18. protected $tableName;
  19. /**
  20. * 合约表名,默认跟tableName一样
  21. * @var string
  22. */
  23. protected $codeTable;
  24. /**
  25. * 缓存的key
  26. * @var string
  27. */
  28. protected $cacheKey = 'default';
  29. /**
  30. * 数据库key
  31. * @var string
  32. */
  33. protected $dbKey = 'default';
  34. /**
  35. * 主键的字段名
  36. * @var string
  37. */
  38. protected $priKey;
  39. /**
  40. * 关键数据字段(无需传主键)
  41. * @var array
  42. */
  43. protected $dataField = [];
  44. /**
  45. * 索引的字段名
  46. * @var string
  47. */
  48. protected $indexName;
  49. /**
  50. * 索引,Index number, 1 - primary (first), 2 - secondary index (in order defined by multi_index), 3 - third index, etc.
  51. * @var int
  52. */
  53. protected $index = 1;
  54. /**
  55. * 索引类型,The key type of --index, primary only supports (i64), all others support (i64, i128, i256, float64, float128, ripemd160, sha256).
  56. * @var string
  57. */
  58. protected $keyType = 'i64';
  59. /**
  60. * 频率,几秒同步一次
  61. * @var int
  62. */
  63. protected $frequency = 10;
  64. /**
  65. * 数据清除间隔
  66. * @var int
  67. */
  68. protected $clearTtl = -1;
  69. /**
  70. * 清除数据的动作名
  71. * @var null
  72. */
  73. protected $clearAction = null;
  74. /**
  75. * 执行最大次数
  76. * @var int
  77. */
  78. protected $maxTimes = 1000;
  79. public function __construct($tableName, $priKey = 'id', $indexName = 'id', $codeTable = null, $code = null) {
  80. $this->tableName = $tableName;
  81. $this->codeTable = $codeTable ?: $tableName;
  82. $this->priKey = $priKey;
  83. $this->indexName = $indexName;
  84. $this->code = $code ?: $GLOBALS['codeMee'];
  85. $this->objRedis = dwRedis::init($this->cacheKey);
  86. $this->objTable = new TableHelper($this->tableName, $this->dbKey);
  87. }
  88. private static function getPreKey($tableName) {
  89. return "globals:syncEosTable:{$tableName}:";
  90. }
  91. private static function _getClearKey($tableName) {
  92. return "globals:clearEosTable:{$tableName}";
  93. }
  94. protected static function getRequestKey($tableName) {
  95. return self::getPreKey($tableName) . 'request';
  96. }
  97. protected static function getResponseKey($tableName) {
  98. return self::getPreKey($tableName) . 'response';
  99. }
  100. public function isTimeout($time) {
  101. $responseKey = self::getResponseKey($this->tableName);
  102. $runTime = $this->objRedis->get($responseKey);
  103. $span = $time - $runTime;
  104. if ($span >= $this->frequency) {
  105. $msg = "Sync {$this->code}.{$this->codeTable} => {$this->dbKey}.{$this->tableName} timeout, time:{$time}, runTime:{$runTime}, span:{$span} > frequency:{$this->frequency}";
  106. Eos::log($msg);
  107. if ($runTime && $span > $this->frequency * 5 && $span > 10) {
  108. alermErrorMsg($msg);
  109. }
  110. return true;
  111. } else {
  112. return false;
  113. }
  114. }
  115. public function pubSubscribe($waitNewer = true, $all = false) {
  116. $time = microtime(true);
  117. $requestKey = self::getRequestKey($this->tableName);
  118. $this->objRedis->setex($requestKey, $this->frequency, $time);
  119. // Eos::log("time:{$time}");
  120. Eos::pubEvent($requestKey, compact('time', 'all'));
  121. if ($waitNewer) {
  122. // 最多等10秒
  123. for ($i = 0; $i < 100; $i++) {
  124. $responseKey = self::getResponseKey($this->tableName);
  125. $runTime = $this->objRedis->get($responseKey);
  126. if ($runTime > $time) {
  127. // Eos::log("runTime:{$runTime} > time:{$time}");
  128. return true;
  129. } else {
  130. // Eos::log("sleep, runTime:{$runTime} < time:{$time}");
  131. // 休息 100ms
  132. Tool::usleep(100 * 1000);
  133. }
  134. }
  135. }
  136. return false;
  137. }
  138. public function getNewestDatas($lower = null, $limit = 1000) {
  139. $cmd = "cleos -u {$GLOBALS['eosUrl']} get table {$this->code} {$this->code} {$this->codeTable} --index {$this->index} --key-type {$this->keyType} -l {$limit} ";
  140. if ($lower) {
  141. $cmd .= " -L '{$lower}' ";
  142. }
  143. $json = Eos::execCmd($cmd);
  144. $datas = json_decode($json, true);
  145. Eos::log("get {$this->code} {$this->codeTable} lower:{$lower}, count:" . count($datas['rows']));
  146. return $datas;
  147. }
  148. protected function updateRow($row, $priValue, $dbRow) {
  149. if ($this->dataField) {
  150. $newRow = arrayFilter($row, $this->dataField);
  151. $needUpdate = false;
  152. foreach ($newRow as $k => $v) {
  153. if ($dbRow[$k] != $v) {
  154. $needUpdate = true;
  155. break;
  156. }
  157. }
  158. // 不需要更新,则跳过
  159. if (!$needUpdate) {
  160. // Eos::log("skip same row, priKey:{$dbRow[$this->priKey]}, dbRow:" . json_encode($dbRow));
  161. return;
  162. }
  163. }
  164. $this->objTable->updateObject($row, [$this->priKey => $priValue]);
  165. Eos::log("update {$this->tableName}, dbRow:" . json_encode($dbRow));
  166. Eos::log("update {$this->tableName}, new row:" . json_encode($row));
  167. }
  168. protected function addNewRows($newRows) {
  169. // 批量添加
  170. Eos::log("addObjectsIfNoExist {$this->tableName} count:" . count($newRows));
  171. foreach ($newRows as $newRow) {
  172. Eos::log("addNewRow {$this->tableName}: new row:" . json_encode($newRow));
  173. }
  174. $this->objTable->addObjectsIfNoExist($newRows);
  175. }
  176. protected function tryClearData() {
  177. if ($this->clearTtl > 0) {
  178. $key = $this->_getClearKey($this->tableName);
  179. $flag = $this->objRedis->exists($key);
  180. Eos::log("exists ClearKey:{$flag}");
  181. if (!$flag) {
  182. $this->clearData();
  183. $this->objRedis->setex($key, $this->clearTtl, time());
  184. }
  185. }
  186. }
  187. /**
  188. * 清除数据
  189. */
  190. protected function clearData() {
  191. if ($this->clearAction) {
  192. // 取倒数第20条
  193. $_sortKey = "update_time_int DESC";
  194. $last = $this->objTable->getRow(compact('_sortKey'));
  195. $update_time_int = $last['update_time_int'];
  196. $maxUpdateTime = 1000000 * (time() - 180);
  197. $update_time_int = max($update_time_int, $maxUpdateTime);
  198. // 清除数据
  199. $cmd = "cleos -u {$GLOBALS['eosUrl']} push action {$this->code} {$this->clearAction} '[ \"{$update_time_int}\" ]' -j -p {$this->code}";
  200. $json = Eos::execCmd($cmd);
  201. $ret = json_decode($json, true);
  202. Eos::log($cmd);
  203. Eos::log("transaction_id: {$ret['transaction_id']}");
  204. return $ret['transaction_id'];
  205. } else {
  206. throw new Exception("{$this->tableName} have not clearAction");
  207. }
  208. }
  209. /**
  210. * 更新数据库,返回新增数据 和 更新的数据
  211. *
  212. * @param $rows
  213. *
  214. * @return array
  215. */
  216. protected function updateDb($rows) {
  217. if (!$rows) {
  218. return [];
  219. }
  220. $_field = "`{$this->priKey}`";
  221. if ($this->dataField) {
  222. $_field .= ", `" . join("`, `", $this->dataField) . "`";
  223. }
  224. // 考虑到有可能会丢数据,所以不能只拿合约上的数据
  225. $ids = array_column($rows, $this->priKey);
  226. // $existMap = $this->objTable->getAll(, compact('_field'));
  227. // 获取线上时间最小的时间
  228. $maxTime = time() * 1000000;
  229. foreach ($rows as $row) {
  230. $maxTime = max($row[$this->indexName], $maxTime);
  231. }
  232. $_where = "`{$this->indexName}` >= '{$maxTime}' ";
  233. if ($ids) {
  234. $_where .= " OR `{$this->priKey}` IN ('" . join("', '", $ids) . "') ";
  235. }
  236. $existMap = $this->objTable->getAll(compact('_field', '_where'));
  237. $existMap = arrayFormatKey($existMap, $this->priKey);
  238. $existMap2 = $existMap;
  239. $newDatas = [];
  240. foreach ($rows as $row) {
  241. $id = $row[$this->priKey];
  242. $dbRow = $existMap[$id];
  243. if ($dbRow) {
  244. // 匹配到,则删除
  245. unset($existMap2[$id]);
  246. // 去掉主键
  247. unset($row[$this->priKey]);
  248. $this->updateRow($row, $id, $dbRow);
  249. } else {
  250. $newDatas[] = $row;
  251. }
  252. }
  253. if ($newDatas) {
  254. // 转化回数组
  255. $this->addNewRows($newDatas);
  256. }
  257. if ($existMap2) {
  258. $objLostLog = new TableHelper('lost_log', 'dw_eos');
  259. // 要删除的多余数据...,这个是因为交易丢失造成的脏数据
  260. $lostLogs = [];
  261. $lostIds = [];
  262. $timeout = (time() - 15) * 1000000;
  263. foreach ($existMap2 as $id => $lostRow) {
  264. // 超过15秒的数据,还出现丢失,说明有问题
  265. if ($lostRow['update_time_int'] < $timeout) {
  266. $lostLogs[] = [
  267. 'db_name' => $this->dbKey,
  268. 'table_name' => $this->tableName,
  269. 'pri_key' => $id,
  270. 'data' => json_encode($lostRow),
  271. ];
  272. $lostIds[] = $id;
  273. }
  274. }
  275. if ($lostIds) {
  276. // 删除数据,并记录log
  277. $objLostLog->addObjects2($lostLogs);
  278. // 先注释删除
  279. // $this->objTable->delObject([$this->priKey => $lostIds]);
  280. // 来个告警
  281. alermErrorMsg("同步{$this->tableName}, 发现脏数据, id:" . join(',', $lostIds));
  282. }
  283. }
  284. // 添加数据的时候,尝试删除老数据
  285. $this->tryClearData();
  286. return compact('existMap', 'newDatas');
  287. }
  288. /**
  289. * 同步一次
  290. */
  291. public function syncOnce($all = false) {
  292. if ($all) {
  293. $maxIndex = null;
  294. } else {
  295. // 注意,这里最新的数据可能是不靠谱的,需要过了3分钟的不可逆的数据才靠谱
  296. $maxIndex = $this->objTable->getOne([
  297. '_field' => "`{$this->indexName}`",
  298. '_sortKey' => "`{$this->indexName}` DESC"
  299. ]);
  300. // 获取3分钟内的数据
  301. $maxIndex = min($maxIndex, (time() - 180) * 1000000);
  302. }
  303. $ret = $this->getNewestDatas($maxIndex);
  304. $this->updateDb($ret['rows']);
  305. return $ret;
  306. }
  307. /**
  308. * 守卫进程,定时同步
  309. */
  310. public function syncDaemon() {
  311. $this->sync();
  312. // 订阅事件
  313. $requestKey = self::getRequestKey($this->tableName);
  314. try {
  315. $objSubRedis = dwRedis::initNewOne($this->cacheKey);
  316. $objSubRedis->setOption(Redis::OPT_READ_TIMEOUT, $this->frequency * $this->maxTimes);
  317. $objSubRedis->subscribe([$requestKey], function($objRedis, $channel, $json) use($requestKey) {
  318. $data = json_decode($json, true);
  319. $max = (int) ($this->objRedis->get($requestKey) * 1000);
  320. $time = (int) ($data['time'] * 1000);
  321. if ($time >= $max) {
  322. // Eos::log("subscribe channel: {$channel}, all:{$data['all']}, msg: {$data['time']} >= max: {$max}");
  323. $this->sync($data['all']);
  324. } else {
  325. Eos::log("skip, msg:{$data['time']} < max:{$max}.");
  326. }
  327. });
  328. } catch (RedisException $ex) {
  329. Eos::log("timeout, ex:" . $ex->getMessage());
  330. }
  331. }
  332. /**
  333. * 守卫进程,定时同步
  334. */
  335. private function sync($all = false, $times = 0) {
  336. if ($times++ > $this->maxTimes) {
  337. return;
  338. }
  339. $runTime = microtime(true);
  340. $this->syncOnce($all);
  341. $responseKey = self::getResponseKey($this->tableName);
  342. $this->objRedis->setex($responseKey, $this->frequency * 10, $runTime);
  343. $requestKey = self::getRequestKey($this->tableName);
  344. $requestTime = $this->objRedis->get($requestKey);
  345. if ($requestTime >= $runTime) {
  346. Eos::log("find requestTime: {$requestTime}, syncDaemon at once.");
  347. $clientNum = Eos::pubEvent($responseKey, $runTime);
  348. Eos::log("pubEvent: {$responseKey}, clientNum: {$clientNum}");
  349. $this->afterSync();
  350. $this->sync($all, $times);
  351. } else {
  352. $this->afterSync();
  353. }
  354. $msg = 'sync done, memory used: ' . memory_get_usage() . ' bytes';
  355. CallLog::logSelfCall(CODE_SUCCESS, $msg);
  356. // 记录日志后,重置时间,才能统计每次循环所用的时间
  357. global $startTime;
  358. $startTime = microtime(true);
  359. }
  360. protected function afterSync() {
  361. }
  362. protected function formatDateTime($str) {
  363. if (strpos($str, 'T') > 0) {
  364. return date('Y-m-d H:i:s', strtotime($str) + 8 * 3600);
  365. } else {
  366. return date('Y-m-d H:i:s', strtotime($str));
  367. }
  368. }
  369. }