'use strict' let index = process.argv[2] || 0; let fileName = __filename; let basePort = 8390; if (fileName.indexOf('/new.') !== -1 || fileName.indexOf('/new-') !== -1 || fileName.indexOf('_new/') !== -1) { // 预发布 basePort = 8590; } global['WS_PORTS'] = basePort + parseInt(index); // 加载扩展函数 require('./extensions/function_extend.js'); require('./conf/config.inc.js'); require('./conf/code.inc.js'); const WebSocket = require('ws'); const OujRedis = require('./framework/lib/OujRedis.js'); const Log = require('./framework/lib/CallLog.js'); const Qs = require("querystring"); const Url = require("url"); const CallLog = new Log(); const wss = new WebSocket.Server({ port: global['WS_PORTS'], }); const serverIp = getServerIp(); let groupClients = {}; // groupId => [client ... ] let clientMap = {}; let wsInterval = 0; let times = 0; initRedis(); // Broadcast to group. wss.broadcastToGroup = function (groupId, data) { let i = 1; let startTime = (new Date).getTime(); if (!groupClients[groupId]) { groupClients[groupId] = []; } let msg = `ServerIp: ${serverIp}, No: ${index}th ws, group: ${groupId}, broadcast: ${groupClients[groupId].length} clients.\n`; groupClients[groupId].forEach(function each(client) { let client_id = client.client_id; if (client.readyState === WebSocket.OPEN) { client.send(data); let endTime = (new Date).getTime(); let span = endTime - startTime; startTime = endTime; msg += `${i}th child:${client_id}, use ${span} ms.\n`; } else { msg += `${i}th error child:${client_id}, state: ${client.readyState}.\n`; } i++; }); CallLog.setFromUrl('/ws_group/broadcast'); CallLog.logSelfCall(0, msg); _log(msg); }; wss.on('close', function close(ws) { let clientId = ws.client_id; let groupId = clientMap[clientId] || 0; delete clientMap[clientId]; delete groupClients[groupId]; }); wss.on('connection', function connection(ws, params) { initWs(ws); let query = {}; try { query = Qs.parse(Url.parse(params.url).query); } catch (e) {} let group_id = query['group_id']; CallLog.setFromUrl('/ws_group/connection'); ws.client_id = ws._socket._handle.fd; let msg = ''; if (group_id) { msg = 'connect new group_id:' + group_id + ', url:' + params.url; if (!groupClients[group_id]) { groupClients[group_id] = []; } groupClients[group_id].push(ws); clientMap[ws.client_id] = group_id; CallLog.logSelfCall(CODE_SUCCESS, msg); } else { msg = 'connect unknown group_id:' + group_id + ', url:' + params.url; CallLog.logSelfCall(CODE_UNKNOW_ERROT, msg); ws.terminate(); } _log(msg); }); function initWs(ws) { ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); // 每30秒检测一次客户端是否正常 if (wsInterval === 0) { wsInterval = setInterval(() => { wss.clients.forEach(function each(ws) { if (ws.isAlive === false) { _log('ws.terminate():'); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, 30000); } } function initRedis() { _log('initRedis, times:' + times++); let objRedis = OujRedis.init('dw_chat'); let pattern = 'chat:group:*'; objRedis.punsubscribe(pattern); objRedis.psubscribe(pattern, function (err, count) { // Now we are subscribed to both the 'news' and 'music' channels. // `count` represents the number of channels we are currently subscribed to. if (!err) { _log('psubscribe success, count:' + count); } }); let last_msg = null; objRedis.on('pmessage', function (pattern, channel, message) { let data = JSON.parse(message); let msg = channel + ' => ' + message; // 防止重复消息推送 if (last_msg !== msg) { CallLog.setFromUrl('/ws_group/redisMessage'); // chat:group [timestamp, type, group_id, data] let groupId = data.group_id; if (groupId > 0) { let sendData = JSON.stringify({channel, data}); wss.broadcastToGroup(groupId, sendData); msg = `No: ${index}th ws, objRedis broadcast: ` + msg; CallLog.logSelfCall(CODE_SUCCESS, msg); } else { CallLog.logSelfCall(CODE_UNKNOW_ERROT, `No: ${index}th ws, objRedis miss groupId: ` + msg); } last_msg = msg; } else { _log('【error】objRedis, repeat message:' + last_msg); } _log(msg); }); // 点一次要初始化定时器 checkRedis(objRedis); return objRedis; } function checkRedis(objRedis) { // 每秒检测redis是否正常 objRedis.ping(function (err, result) { if (err) { console.error(err); initRedis(); } else { //_log(result); setTimeout(function() { objRedis = checkRedis(objRedis); }, 1000); } }); return objRedis; } function _log(msg) { let time = new Date().toLocaleString(); console.log(`【${time}】${msg}`); }