Aller au contenu principal

Private messaging - Part IV

This guide has four distinct parts:

  • Part I: initial implementation
  • Part II : persistent user ID
  • Part III : persistent messages
  • Part IV (current): scaling up

Here's where we were at the end of the 3rd part:

Chat

We will see now how we can scale to multiple Socket.IO servers, for high availability / load-balancing purposes.

Installation​

Let's checkout the branch for part IV:

git checkout examples/private-messaging-part-4

Here's what you should see in the current directory:

β”œβ”€β”€ babel.config.js
β”œβ”€β”€ package.json
β”œβ”€β”€ public
β”‚ β”œβ”€β”€ favicon.ico
β”‚ β”œβ”€β”€ fonts
β”‚ β”‚ └── Lato-Regular.ttf
β”‚ └── index.html
β”œβ”€β”€ README.md
β”œβ”€β”€ server
β”‚ β”œβ”€β”€ cluster.js (created)
β”‚ β”œβ”€β”€ docker-compose.yml (created)
β”‚ β”œβ”€β”€ index.js (updated)
β”‚ β”œβ”€β”€ messageStore.js (updated)
β”‚ β”œβ”€β”€ package.json (updated)
β”‚ └── sessionStore.js (updated)
└── src
β”œβ”€β”€ App.vue
β”œβ”€β”€ components
β”‚ β”œβ”€β”€ Chat.vue
β”‚ β”œβ”€β”€ MessagePanel.vue
β”‚ β”œβ”€β”€ SelectUsername.vue
β”‚ β”œβ”€β”€ StatusIcon.vue
β”‚ └── User.vue
β”œβ”€β”€ main.js
└── socket.js

The complete diff can be found here.

Updating the server​

For this last part, we need 3 additional dependencies on the server-side:

We also need a Redis instance. For your convenience, a docker-compose.yml file is provided:

cd server
docker-compose up -d

npm install
npm start

This will create 4 Node.js workers, each running the same index.js file.

On the client-side, no change is needed, we will focus on the server-side here.

How it works​

Creating multiple servers​

When creating multiple Socket.IO servers, there are two things to do:

  • you need to enable sticky-session (please see here for the complete explanation)
  • you need to replace the default in-memory adapter with the Redis adapter (or another compatible adapter)

In our example, the @socket.io/sticky module is used to ensure that requests from a given client are always routed to the same Socket.IO server. This is what is called "sticky-session":

Sticky session

Note: we could also have created several processes listening to different ports (or used multiple hosts), and add a reverse-proxy in front of them. Enabling sticky-session for common reverse-proxy solutions like NginX or HAProxy is covered in the documentation.

The cluster is created in the server/cluster.js file:

const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");

const WORKERS_COUNT = 4;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}

cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});

const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}

In our existing server/index.js file, there is a single change: the HTTP server created by the worker process does not actually listen to a given port, the requests will be handled by the master process and then forwarded to the right worker.

Before:

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);

After:

setupWorker(io);

The setupWorker method provided by the @socket.io/sticky will take care of the synchronization between the master and the worker.

Sessions & messages​

Now that sticky-session is enabled, we need to share sessions and messages across the Socket.IO servers.

We create a new SessionStore based on Redis. We will store each session in a Redis hash, with the HSET command:

class RedisSessionStore extends SessionStore {
// ...
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
// ...
}

We also set an expiry to the key in order to clean up old sessions.

Fetching the session is quite straightforward, with the HMGET command:

const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;

class RedisSessionStore extends SessionStore {
// ...
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
// ...
}

Fetching all sessions is a bit more complex:

class RedisSessionStore extends SessionStore {
// ...
async findAllSessions() {
// first, we fetch all the keys with the SCAN command
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);

// and then we retrieve the session details with multiple HMGET commands
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}

Similarly, we create a new MessageStore based on Redis. We will store all the messages linked to a given user in a Redis list, with the RPUSH command:

class RedisMessageStore extends MessageStore {
// ...
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
// ...
}

Retrieving the messages is done with the LRANGE command:

class RedisMessageStore extends MessageStore {
// ...
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}

Forwarding messages​

There is one last modification that is needed: we need to make sure that messages actually reach the recipient, even if this recipient is not connected on the same Socket.IO server:

Broadcasting with the Redis adapter

This is the duty of the Redis adapter, which relies on the Redis pub/sub mechanism to broadcast messages between the Socket.IO servers and eventually reach all clients.

const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});

And that's it! If you have a Redis CLI on your machine, you can check the messages that are sent on the wire:

$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."

Documentation:

Note: with the Redis adapter, the allSockets() method which is used in the "disconnect" handler automatically returns the Socket IDs across all Socket.IO servers, so there is nothing to update.

Review​

OK, so let's sum it up: we have created a fully functional chat (yes, once again!), robust, ready to scale horizontally, which allowed us to introduce some useful Socket.IO features:

Thanks for reading!