let Promise
let EventEmitter
try {
Promise = require('bluebird')
} catch (err) {
Promise = global.Promise
}
try {
EventEmitter = require('eventemitter3')
} catch (err) {
EventEmitter = require('events')
}
const path = require('path')
const { Collection, delay } = require('../util')
/**
* Shard cluster manager
* @prop {Collection} clusters Collection of clusters
*/
class Crystal extends EventEmitter {
/**
* Creates a new Crystal instance
* @arg {String} file Relative or absolute path to file to run
* @arg {Number} [count] Number of clusters to create, defaults to number of CPU cores
*/
constructor (file, count = require('os').cpus().length) {
super()
this.clusters = new Collection()
this._count = count
this._file = path.isAbsolute(file) ? file : path.join(process.cwd(), file)
}
/** Spawns new clusters */
async createClusters () {
for (let i = 0; i < this._count; i++) {
this.createCluster(i)
await delay(6000)
}
}
/**
* Spawns a cluster
* @arg {Number} id Cluster ID
*/
createCluster (id) {
const cluster = new (require('../structures')).Cluster(this._file, id)
const worker = cluster.worker
worker.on('exit', () => this.onExit(worker))
worker.on('message', (msg) => this.onMessage(worker, msg))
this.clusters.set(cluster.id, cluster)
this.emit('clusterCreate', cluster.id)
}
/**
* Fetches a cluster
* @arg {Number} pid Process ID to find
*/
getCluster (pid) {
return this.clusters.find(s => s.pid === pid)
}
onExit (worker) {
const cluster = this.getCluster(worker.pid)
if (!cluster) return
this.emit('clusterExit', worker.pid, cluster.id)
this.clusters.delete(cluster.id)
this.createCluster(cluster.id)
}
onMessage (worker, message) {
if (!message.op) return
if (message.op === 'resp') return
if (this[message.op]) {
return this[message.op](message)
}
this.awaitResponse(worker, message)
}
awaitResponse (worker, message) {
const promises = []
for (const cluster of this.clusters.values()) {
promises.push(cluster.awaitResponse(message))
}
Promise.all(promises)
.then(results => worker.send({ op: 'resp', d: results, code: message.code }))
.catch(err => worker.send({ op: 'error', d: err, code: message.code }))
}
/**
* Broadcast a message to all clusters
* @arg {Object} message Message to send
* @arg {String} message.op Message topic
* @arg {String} message.d Message data
*/
broadcast (message) {
if (message.op === 'broadcast') {
message = message.d
}
for (const cluster of this.clusters.values()) {
cluster.worker.send(message)
}
}
/**
* Restarts all clusters, or a specific one
* @arg {Object} [message] The message sent
* @arg {Number} [message.d] The cluster ID to restart
*/
async restart (message = {}) {
if (typeof message.d === 'number') {
const cluster = this.clusters.get(message.d)
if (!cluster) return
cluster.worker.kill()
} else {
for (let cluster of this.clusters.values()) {
cluster.worker.kill()
await Promise.delay(6000)
}
}
}
}
module.exports = Crystal