Helper.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. "use strict"
  2. var r2mConsts = require('./Header.js');
  3. var AppErrors = require('../AppErrors.js');
  4. class R2m_Helper {
  5. constructor(objSocket, objR2mClient) {
  6. this._ver = r2mConsts.R2M_VERSION;
  7. this.objR2mClient = objR2mClient;
  8. this._events = [];
  9. this.objClient = objSocket.open();
  10. this.listen();
  11. }
  12. listen() {
  13. let bufData = '';
  14. this.objClient.on('data', (data) => {
  15. if (bufData.length == 0) {
  16. bufData = data;
  17. } else {
  18. // 合并包数据
  19. bufData = Buffer.concat([bufData, data], bufData.length + data.length);
  20. }
  21. let response = this.parse(bufData);
  22. // 重置buf
  23. bufData = response.buf;
  24. for (let i = 0; i < response.results.length; i++) {
  25. let result = response.results[i];
  26. // 触发数据接收事件
  27. this._events[result['id']].emit('receive', result);
  28. this._events[result['id']] = null;
  29. }
  30. });
  31. //this.objClient.setTimeout(10000, () => {
  32. // this.objClient.end();
  33. //});
  34. this.objClient.on('end', () => {
  35. this.objR2mClient._objHelper = null;
  36. console.log('client disconnected');
  37. });
  38. this.objClient.on('error', (data) => {
  39. throw new AppErrors.R2MError(`socket error ${data}`);
  40. });
  41. }
  42. request(conf, cmd, data, id) {
  43. id = id || 0;
  44. return this.cmd(conf, cmd, data, id, r2mConsts.CMD_TYPE_REQUEST);
  45. }
  46. parse(buf) {
  47. let response = {buf:'',results:[]};
  48. let start = 0;
  49. let buflen = buf.length;
  50. do {
  51. let ver = buf.readUInt16BE(start + 0);
  52. if (!ver) {
  53. break;
  54. }
  55. if (ver != this._ver) {
  56. this.errMsg = `no match redis2mysql service version. client:${ver}, server:${this._ver}`;
  57. }
  58. let cmd = buf.readUInt16BE(start + 2);
  59. let cmdType = buf.readUInt16BE(start + 4);
  60. let codeType = buf.readUInt16BE(start + 6);
  61. let contentType = buf.readUInt16BE(start + 8);
  62. let id = buf.readUInt32BE(start + 10);
  63. let length = buf.readUInt32BE(start + 14);
  64. let content = buf.toString('utf8', start + 18, length + start + 18);
  65. // 出现分包
  66. if (length > this.mbStringLength(content)) {
  67. response['buf'] = buf.slice(start, buflen);
  68. break;
  69. }
  70. start = length + start + 18;
  71. if (contentType == r2mConsts.CONTENT_TYPE_JSON) {
  72. content = JSON.parse(content);
  73. }
  74. let result = {ver, cmd, cmdType, codeType, contentType, id, content};
  75. response.results.push(result);
  76. } while(start < buflen);
  77. return response;
  78. }
  79. cmd(conf, cmd, data, id, cmdType, codeType) {
  80. id = id || 0;
  81. cmdType = cmdType || r2mConsts.CMD_TYPE_REQUEST;
  82. codeType = codeType || r2mConsts.CODE_TYPE_BINARY;
  83. let contentType = r2mConsts.CONTENT_TYPE_TEXT;
  84. if (typeof data === "object") {
  85. contentType = r2mConsts.CONTENT_TYPE_JSON;
  86. data = JSON.stringify(data);
  87. }
  88. if (conf) {
  89. var args = JSON.stringify({conf, data});
  90. } else {
  91. var args = data;
  92. }
  93. let strLength = this.mbStringLength(args);
  94. let buf = new Buffer(18 + strLength);
  95. buf.writeUInt16BE(this._ver, 0);
  96. buf.writeUInt16BE(cmd, 2);
  97. buf.writeUInt16BE(cmdType, 4);
  98. buf.writeUInt16BE(codeType, 6);
  99. buf.writeUInt16BE(contentType, 8);
  100. buf.writeUInt32BE(id, 10);
  101. buf.writeUInt32BE(strLength, 14);
  102. buf.write(args, 18);
  103. return this.objClient.write(buf);
  104. }
  105. mbStringLength(s) {
  106. let totalLength = 0;
  107. let i;
  108. let charCode;
  109. for (i = 0; i < s.length; i++) {
  110. charCode = s.charCodeAt(i);
  111. if (charCode < 0x007f) {
  112. totalLength = totalLength + 1;
  113. } else if ((0x0080 <= charCode) && (charCode <= 0x07ff)) {
  114. totalLength += 2;
  115. } else if ((0x0800 <= charCode) && (charCode <= 0xffff)) {
  116. totalLength += 3;
  117. }
  118. }
  119. return totalLength;
  120. }
  121. }
  122. module.exports = R2m_Helper;