'use strict'; const EventEmitter = require('events'); const { createHash } = require('crypto'); const { createServer, STATUS_CODES } = require('http'); const PerMessageDeflate = require('./permessage-deflate'); const WebSocket = require('./websocket'); const { format, parse } = require('./extension'); const { GUID } = require('./constants'); const keyRegex = /^[+/0-9A-Za-z]{22}==$/; const kUsedByWebSocketServer = Symbol('kUsedByWebSocketServer'); /** * Class representing a WebSocket server. * * @extends EventEmitter */ class WebSocketServer extends EventEmitter { /** * Create a `WebSocketServer` instance. * * @param {Object} options Configuration options * @param {Number} options.backlog The maximum length of the queue of pending * connections * @param {Boolean} options.clientTracking Specifies whether or not to track * clients * @param {Function} options.handleProtocols A hook to handle protocols * @param {String} options.host The hostname where to bind the server * @param {Number} options.maxPayload The maximum allowed message size * @param {Boolean} options.noServer Enable no server mode * @param {String} options.path Accept only connections matching this path * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable * permessage-deflate * @param {Number} options.port The port where to bind the server * @param {http.Server} options.server A pre-created HTTP/S server to use * @param {Function} options.verifyClient A hook to reject connections * @param {Function} callback A listener for the `listening` event */ constructor(options, callback) { super(); options = { maxPayload: 100 * 1024 * 1024, perMessageDeflate: false, handleProtocols: null, clientTracking: true, verifyClient: null, noServer: false, backlog: null, // use default (511 as implemented in net.js) server: null, host: null, path: null, port: null, ...options }; if (options.port == null && !options.server && !options.noServer) { throw new TypeError( 'One of the "port", "server", or "noServer" options must be specified' ); } if (options.port != null) { this._server = createServer((req, res) => { const body = STATUS_CODES[426]; res.writeHead(426, { 'Content-Length': body.length, 'Content-Type': 'text/plain' }); res.end(body); }); this._server.listen( options.port, options.host, options.backlog, callback ); } else if (options.server) { if (options.server[kUsedByWebSocketServer]) { throw new Error( 'The HTTP/S server is already being used by another WebSocket server' ); } options.server[kUsedByWebSocketServer] = true; this._server = options.server; } if (this._server) { this._removeListeners = addListeners(this._server, { listening: this.emit.bind(this, 'listening'), error: this.emit.bind(this, 'error'), upgrade: (req, socket, head) => { this.handleUpgrade(req, socket, head, (ws) => { this.emit('connection', ws, req); }); } }); } if (options.perMessageDeflate === true) options.perMessageDeflate = {}; if (options.clientTracking) this.clients = new Set(); this.options = options; } /** * Returns the bound address, the address family name, and port of the server * as reported by the operating system if listening on an IP socket. * If the server is listening on a pipe or UNIX domain socket, the name is * returned as a string. * * @return {(Object|String|null)} The address of the server * @public */ address() { if (this.options.noServer) { throw new Error('The server is operating in "noServer" mode'); } if (!this._server) return null; return this._server.address(); } /** * Close the server. * * @param {Function} cb Callback * @public */ close(cb) { if (cb) this.once('close', cb); // // Terminate all associated clients. // if (this.clients) { for (const client of this.clients) client.terminate(); } const server = this._server; if (server) { this._removeListeners(); this._removeListeners = this._server = null; // // Close the http server if it was internally created. // if (this.options.port != null) { server.close(() => this.emit('close')); return; } delete server[kUsedByWebSocketServer]; } process.nextTick(emitClose, this); } /** * See if a given request should be handled by this server instance. * * @param {http.IncomingMessage} req Request object to inspect * @return {Boolean} `true` if the request is valid, else `false` * @public */ shouldHandle(req) { if (this.options.path) { const index = req.url.indexOf('?'); const pathname = index !== -1 ? req.url.slice(0, index) : req.url; if (pathname !== this.options.path) return false; } return true; } /** * Handle a HTTP Upgrade request. * * @param {http.IncomingMessage} req The request object * @param {net.Socket} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Function} cb Callback * @public */ handleUpgrade(req, socket, head, cb) { socket.on('error', socketOnError); const key = req.headers['sec-websocket-key'] !== undefined ? req.headers['sec-websocket-key'].trim() : false; const version = +req.headers['sec-websocket-version']; const extensions = {}; if ( req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket' || !key || !keyRegex.test(key) || (version !== 8 && version !== 13) || !this.shouldHandle(req) ) { return abortHandshake(socket, 400); } if (this.options.perMessageDeflate) { const perMessageDeflate = new PerMessageDeflate( this.options.perMessageDeflate, true, this.options.maxPayload ); try { const offers = parse(req.headers['sec-websocket-extensions']); if (offers[PerMessageDeflate.extensionName]) { perMessageDeflate.accept(offers[PerMessageDeflate.extensionName]); extensions[PerMessageDeflate.extensionName] = perMessageDeflate; } } catch (err) { return abortHandshake(socket, 400); } } // // Optionally call external client verification handler. // if (this.options.verifyClient) { const info = { origin: req.headers[`${version === 8 ? 'sec-websocket-origin' : 'origin'}`], secure: !!(req.connection.authorized || req.connection.encrypted), req }; if (this.options.verifyClient.length === 2) { this.options.verifyClient(info, (verified, code, message, headers) => { if (!verified) { return abortHandshake(socket, code || 401, message, headers); } this.completeUpgrade(key, extensions, req, socket, head, cb); }); return; } if (!this.options.verifyClient(info)) return abortHandshake(socket, 401); } this.completeUpgrade(key, extensions, req, socket, head, cb); } /** * Upgrade the connection to WebSocket. * * @param {String} key The value of the `Sec-WebSocket-Key` header * @param {Object} extensions The accepted extensions * @param {http.IncomingMessage} req The request object * @param {net.Socket} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Function} cb Callback * @private */ completeUpgrade(key, extensions, req, socket, head, cb) { // // Destroy the socket if the client has already sent a FIN packet. // if (!socket.readable || !socket.writable) return socket.destroy(); const digest = createHash('sha1') .update(key + GUID) .digest('base64'); const headers = [ 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', `Sec-WebSocket-Accept: ${digest}` ]; const ws = new WebSocket(null); let protocol = req.headers['sec-websocket-protocol']; if (protocol) { protocol = protocol.trim().split(/ *, */); // // Optionally call external protocol selection handler. // if (this.options.handleProtocols) { protocol = this.options.handleProtocols(protocol, req); } else { protocol = protocol[0]; } if (protocol) { headers.push(`Sec-WebSocket-Protocol: ${protocol}`); ws.protocol = protocol; } } if (extensions[PerMessageDeflate.extensionName]) { const params = extensions[PerMessageDeflate.extensionName].params; const value = format({ [PerMessageDeflate.extensionName]: [params] }); headers.push(`Sec-WebSocket-Extensions: ${value}`); ws._extensions = extensions; } // // Allow external modification/inspection of handshake headers. // this.emit('headers', headers, req); socket.write(headers.concat('\r\n').join('\r\n')); socket.removeListener('error', socketOnError); ws.setSocket(socket, head, this.options.maxPayload); if (this.clients) { this.clients.add(ws); ws.on('close', () => this.clients.delete(ws)); } cb(ws); } } module.exports = WebSocketServer; /** * Add event listeners on an `EventEmitter` using a map of <event, listener> * pairs. * * @param {EventEmitter} server The event emitter * @param {Object.<String, Function>} map The listeners to add * @return {Function} A function that will remove the added listeners when called * @private */ function addListeners(server, map) { for (const event of Object.keys(map)) server.on(event, map[event]); return function removeListeners() { for (const event of Object.keys(map)) { server.removeListener(event, map[event]); } }; } /** * Emit a `'close'` event on an `EventEmitter`. * * @param {EventEmitter} server The event emitter * @private */ function emitClose(server) { server.emit('close'); } /** * Handle premature socket errors. * * @private */ function socketOnError() { this.destroy(); } /** * Close the connection when preconditions are not fulfilled. * * @param {net.Socket} socket The socket of the upgrade request * @param {Number} code The HTTP response status code * @param {String} [message] The HTTP response body * @param {Object} [headers] Additional HTTP response headers * @private */ function abortHandshake(socket, code, message, headers) { if (socket.writable) { message = message || STATUS_CODES[code]; headers = { Connection: 'close', 'Content-type': 'text/html', 'Content-Length': Buffer.byteLength(message), ...headers }; socket.write( `HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n` + Object.keys(headers) .map((h) => `${h}: ${headers[h]}`) .join('\r\n') + '\r\n\r\n' + message ); } socket.removeListener('error', socketOnError); socket.destroy(); }