123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- "use strict"
- var r2mConsts = require('./Header.js');
- var AppErrors = require('../AppErrors.js');
- class R2m_Helper {
- constructor(objSocket, objR2mClient) {
- this._ver = r2mConsts.R2M_VERSION;
- this.objR2mClient = objR2mClient;
- this._events = [];
- this.objClient = objSocket.open();
- this.listen();
- }
- listen() {
- let bufData = '';
- this.objClient.on('data', (data) => {
- if (bufData.length == 0) {
- bufData = data;
- } else {
- // 合并包数据
- bufData = Buffer.concat([bufData, data], bufData.length + data.length);
- }
- let response = this.parse(bufData);
- // 重置buf
- bufData = response.buf;
- for (let i = 0; i < response.results.length; i++) {
- let result = response.results[i];
- // 触发数据接收事件
- this._events[result['id']].emit('receive', result);
- this._events[result['id']] = null;
- }
- });
- //this.objClient.setTimeout(10000, () => {
- // this.objClient.end();
- //});
- this.objClient.on('end', () => {
- this.objR2mClient._objHelper = null;
- console.log('client disconnected');
- });
- this.objClient.on('error', (data) => {
- throw new AppErrors.R2MError(`socket error ${data}`);
- });
- }
- request(conf, cmd, data, id) {
- id = id || 0;
- return this.cmd(conf, cmd, data, id, r2mConsts.CMD_TYPE_REQUEST);
- }
- parse(buf) {
- let response = {buf:'',results:[]};
- let start = 0;
- let buflen = buf.length;
- do {
- let ver = buf.readUInt16BE(start + 0);
- if (!ver) {
- break;
- }
- if (ver != this._ver) {
- this.errMsg = `no match redis2mysql service version. client:${ver}, server:${this._ver}`;
- }
- let cmd = buf.readUInt16BE(start + 2);
- let cmdType = buf.readUInt16BE(start + 4);
- let codeType = buf.readUInt16BE(start + 6);
- let contentType = buf.readUInt16BE(start + 8);
- let id = buf.readUInt32BE(start + 10);
- let length = buf.readUInt32BE(start + 14);
- let content = buf.toString('utf8', start + 18, length + start + 18);
- // 出现分包
- if (length > this.mbStringLength(content)) {
- response['buf'] = buf.slice(start, buflen);
- break;
- }
- start = length + start + 18;
- if (contentType == r2mConsts.CONTENT_TYPE_JSON) {
- content = JSON.parse(content);
- }
- let result = {ver, cmd, cmdType, codeType, contentType, id, content};
- response.results.push(result);
- } while(start < buflen);
- return response;
- }
- cmd(conf, cmd, data, id, cmdType, codeType) {
- id = id || 0;
- cmdType = cmdType || r2mConsts.CMD_TYPE_REQUEST;
- codeType = codeType || r2mConsts.CODE_TYPE_BINARY;
- let contentType = r2mConsts.CONTENT_TYPE_TEXT;
- if (typeof data === "object") {
- contentType = r2mConsts.CONTENT_TYPE_JSON;
- data = JSON.stringify(data);
- }
- if (conf) {
- var args = JSON.stringify({conf, data});
- } else {
- var args = data;
- }
- let strLength = this.mbStringLength(args);
- let buf = new Buffer(18 + strLength);
- buf.writeUInt16BE(this._ver, 0);
- buf.writeUInt16BE(cmd, 2);
- buf.writeUInt16BE(cmdType, 4);
- buf.writeUInt16BE(codeType, 6);
- buf.writeUInt16BE(contentType, 8);
- buf.writeUInt32BE(id, 10);
- buf.writeUInt32BE(strLength, 14);
- buf.write(args, 18);
- return this.objClient.write(buf);
- }
- mbStringLength(s) {
- let totalLength = 0;
- let i;
- let charCode;
- for (i = 0; i < s.length; i++) {
- charCode = s.charCodeAt(i);
- if (charCode < 0x007f) {
- totalLength = totalLength + 1;
- } else if ((0x0080 <= charCode) && (charCode <= 0x07ff)) {
- totalLength += 2;
- } else if ((0x0800 <= charCode) && (charCode <= 0xffff)) {
- totalLength += 3;
- }
- }
- return totalLength;
- }
- }
- module.exports = R2m_Helper;
|