"use strict" var r2mConsts = require('./Header.js'); var AppErrors = require('../AppErrors.js'); class R2m_Helper { constructor(objSocket, objR2mClient) { this._ver = r2mConsts.R2M_VERSION; this.objR2mClient = objR2mClient; this._events = []; this.objClient = objSocket.open(); this.listen(); } listen() { let bufData = ''; this.objClient.on('data', (data) => { if (bufData.length == 0) { bufData = data; } else { // 合并包数据 bufData = Buffer.concat([bufData, data], bufData.length + data.length); } let response = this.parse(bufData); // 重置buf bufData = response.buf; for (let i = 0; i < response.results.length; i++) { let result = response.results[i]; // 触发数据接收事件 this._events[result['id']].emit('receive', result); this._events[result['id']] = null; } }); //this.objClient.setTimeout(10000, () => { // this.objClient.end(); //}); this.objClient.on('end', () => { this.objR2mClient._objHelper = null; console.log('client disconnected'); }); this.objClient.on('error', (data) => { throw new AppErrors.R2MError(`socket error ${data}`); }); } request(conf, cmd, data, id) { id = id || 0; return this.cmd(conf, cmd, data, id, r2mConsts.CMD_TYPE_REQUEST); } parse(buf) { let response = {buf:'',results:[]}; let start = 0; let buflen = buf.length; do { let ver = buf.readUInt16BE(start + 0); if (!ver) { break; } if (ver != this._ver) { this.errMsg = `no match redis2mysql service version. client:${ver}, server:${this._ver}`; } let cmd = buf.readUInt16BE(start + 2); let cmdType = buf.readUInt16BE(start + 4); let codeType = buf.readUInt16BE(start + 6); let contentType = buf.readUInt16BE(start + 8); let id = buf.readUInt32BE(start + 10); let length = buf.readUInt32BE(start + 14); let content = buf.toString('utf8', start + 18, length + start + 18); // 出现分包 if (length > this.mbStringLength(content)) { response['buf'] = buf.slice(start, buflen); break; } start = length + start + 18; if (contentType == r2mConsts.CONTENT_TYPE_JSON) { content = JSON.parse(content); } let result = {ver, cmd, cmdType, codeType, contentType, id, content}; response.results.push(result); } while(start < buflen); return response; } cmd(conf, cmd, data, id, cmdType, codeType) { id = id || 0; cmdType = cmdType || r2mConsts.CMD_TYPE_REQUEST; codeType = codeType || r2mConsts.CODE_TYPE_BINARY; let contentType = r2mConsts.CONTENT_TYPE_TEXT; if (typeof data === "object") { contentType = r2mConsts.CONTENT_TYPE_JSON; data = JSON.stringify(data); } if (conf) { var args = JSON.stringify({conf, data}); } else { var args = data; } let strLength = this.mbStringLength(args); let buf = new Buffer(18 + strLength); buf.writeUInt16BE(this._ver, 0); buf.writeUInt16BE(cmd, 2); buf.writeUInt16BE(cmdType, 4); buf.writeUInt16BE(codeType, 6); buf.writeUInt16BE(contentType, 8); buf.writeUInt32BE(id, 10); buf.writeUInt32BE(strLength, 14); buf.write(args, 18); return this.objClient.write(buf); } mbStringLength(s) { let totalLength = 0; let i; let charCode; for (i = 0; i < s.length; i++) { charCode = s.charCodeAt(i); if (charCode < 0x007f) { totalLength = totalLength + 1; } else if ((0x0080 <= charCode) && (charCode <= 0x07ff)) { totalLength += 2; } else if ((0x0800 <= charCode) && (charCode <= 0xffff)) { totalLength += 3; } } return totalLength; } } module.exports = R2m_Helper;