Back

September 2024

client health checks

  • now how does nats know if a listener has gone offline? we sort of already wrote that in the nats-depl file
  • -hbi -5s is a basically like a heart beat flag thats sent by nats to the entity every 5s
  • -hbt -5s is how long the entity has to give a response to the heartbeat
  • -hbf 2 is how many times the entity can fail before its removed from the subscription
  • which is also why we've added 2 signal listeners SIGTERM and SIGINT
  • upon receiving either of these signals, we'll be triggering the shutdown process
process.on("SIGINT", () => stan.close());
process.on("SIGTERM", () => stan.close());
  • but lets say we force quit the terminal or something, the server would still think the listener is active, which is why we have a fallback to heartbeat and failure etc

NOTE ABOUT SCALABILITY

  • scaling upwards - increasing specs, making it faster
  • scaling horizontally - increasing the no: of replicas and instances running

event redelivery

  • in the case that a listener goes down, or if you added a new listener to the queue group
  • there is one way you can update it w all the events that have taken place so far, and that is by chaining another option, the setDeliverAllAvailable()
  • this option will deliver all the events that took place
  • but this isnt feasible, because what if we have 1000s or even more events, sending everything could crash our service
  • so to fix this we're going to work on something called a durable subscription, which is basically a sub with an id
  • so to do this, we add another option called setDurableName() and within this we pass in an identifier
  • inside, NATS is going to keep track of all the durable subs we have
  • then, for every event that was published and ack, NATS is going to keep track of all the events successfully processed along side this subs' id
  • so a combination of these gives us a durable subscription that remembers what events were passed, ackd and procd
const options = stan
  .subscriptionOptions()
  .setManualAckMode(true)
  .setDeliverAllAvailable()
  .setDurableName("orders-service");

const subscription = stan.subscribe(
  "ticket:created",
  "orders-service-queue-group",
  options
);

connecting to nats from node js

  • rn this listener and publisher file is moderately and relatively big ad we cant copy-paste all this because we'll have many listeners and we need to reuse code as much as we can
  • so we'll modularise it and put it in that npm package
  • so..

listener

  • create an abstract class called Listener with the aim of automating a lot of the tasks that took place rn
  • why abs? so we can create instances of them like TicketCreatedListener and so on while having specific logic for that listener and also some general purpose code
  • like this
Listener.ts
abstract class Listener {
  abstract subject: string;
  abstract queueGroupName: string;
  abstract onMessage(data: any, msg: Message): void;
  private client: nats.Stan;
  protected ackWait = 5 * 1000;

  constructor(client: nats.Stan) {
    this.client = client;
  }

  subscriptionOptions() {
    return this.client
      .subscriptionOptions()
      .setDeliverAllAvailable()
      .setManualAckMode(true)
      .setAckWait(this.ackWait)
      .setDurableName(this.queueGroupName);
  }

  listen() {
    const subscription = this.client.subscribe(
      this.subject,
      this.queueGroupName,
      this.subscriptionOptions()
    );

    subscription.on("message", (msg: Message) => {
      console.log(`Message received: ${this.subject} / ${this.queueGroupName}`);

      const parsedData = this.parseMessage(msg);
      this.onMessage(parsedData, msg);
    });
  }

  parseMessage(msg: Message) {
    const data = msg.getData();
    return typeof data === "string"
      ? JSON.parse(data)
      : JSON.parse(data.toString("utf8"));
  }
}
  • and how do we integrate this class?
const stan = nats.connect("ticketing", randomBytes(4).toString("hex"), {
  url: "http://localhost:4222",
});

stan.on("connect", () => {
  console.log("🚀 Listener connected to NATS");

  stan.on("close", () => {
    console.log("🚀 NATS connection closed!");
    process.exit();
  });

  new TicketCreatedListener(stan).listen();
});

abstract class Listener {
  // code
}

class TicketCreatedListener extends Listener {
  subject: string = "ticket:created";
  queueGroupName: string = "payments-service";
  onMessage(data: any, msg: Message) {
    console.log(data);
    msg.ack();
  }
}
  • but the big issue still remains, how do we remember what props an event has and what happens if we accidentally mistype the name of the listener?
  • because rn theres no type validation at all
  • so what we did now is this

type validations

  • now what are some fields that we know EVERY event will have? a subject and some data right?
  • so we can make that into an interface called Event
interface Event {
  subject: string;
  data: any;
}
  • but its entirely possible that we'll make mistakes while typing the names of the subjects
  • so lets eliminate that error and make an enum of all the channel names
enum Subjects {
    TicketCreated: 'ticket:created'
}
  • and now, we can simply modify the event interface to have subject of type Subjects
interface Event {
  subject: Subjects;
  data: any;
}
  • now that thats done, coming to the base-listener, now we need to somehow enforce a relationship or a dependency of sorts, what i mean is, if the type is of TicketCreated, then i want the data property to have these attributes, if its of type TicketUpdated, then i want the data prop to have these attributes
  • so that dependency between the sub and the data is something that has to be established, and we can do that by creating a Generic type of the Base Listener like this
interface Event {
  subject: Subjects;
  data: any;
}

export abstract class Listener<T extends Event> {
  abstract subject: T["subject"];
  abstract queueGroupName: string;
  abstract onMessage(data: T["data"], msg: Message): void;
  private client: nats.Stan;
  protected ackWait = 5 * 1000;

  constructor(client: nats.Stan) {
    this.client = client;
  }

  subscriptionOptions() {
    return this.client
      .subscriptionOptions()
      .setDeliverAllAvailable()
      .setManualAckMode(true)
      .setAckWait(this.ackWait)
      .setDurableName(this.queueGroupName);
  }

  listen() {
    const subscription = this.client.subscribe(
      this.subject,
      this.queueGroupName,
      this.subscriptionOptions()
    );

    subscription.on("message", (msg: Message) => {
      console.log(`Message received: ${this.subject} / ${this.queueGroupName}`);

      const parsedData = this.parseMessage(msg);
      this.onMessage(parsedData, msg);
    });
  }

  parseMessage(msg: Message) {
    const data = msg.getData();
    return typeof data === "string"
      ? JSON.parse(data)
      : JSON.parse(data.toString("utf8"));
  }
}
  • now, this class is of type Event and this way, we can say the subject is now of type T['subject'] and then the data is of type T['data] whichever form that takes
  • and now how do we enforce this?
  • in a separate file, types.ts let's create the interface for how we'd like the data for TicketCreated
types.ts
export interface TicketCreatedEvent {
  subject: Subjects.TicketCreated;
  data: {
    id: string;
    title: string;
    price: number;
  };
}
  • and now when we create an instance of TicketCreatedListener, we simply do this and extend the TicketCreatedEvent
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {
  readonly subject: Subjects.TicketCreated = Subjects.TicketCreated;
  queueGroupName: string = "payments-service";

  onMessage(data: TicketCreatedEvent["data"], msg: Message) {
    console.log(data);

    msg.ack();
  }
}
  • now we dont want anyone to modify the name of the subject, which is why we've added a readonly for the subject property
  • TS would throw an error, so to bypass that we annotate it as a type of Subject.TicketCreated and assign it the same value
  • and then the rest is the same

publisher

  • we perform the same steps for a publisher as well, with the generics, and this time we have a function called publish() that publishes the data that we pass to all the channels
interface Event {
  subject: Subjects;
  data: any;
}

export abstract class Publisher<T extends Event> {
  abstract subject: T["subject"];
  private client: nats.Stan;

  constructor(client: nats.Stan) {
    this.client = client;
  }

  publish(data: T["data"]) {
    this.client.publish(this.subject, JSON.stringify(data), (err) => {
      if (err) {
        return console.log("🚨 Error publishing message", err);
      }
      console.log("🚀 Message published");
    });
  }
}
  • we create an instance of TicketCreatedPublisher like this and we annotate of type TicketCreatedEvent
export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
  subject: Subjects.TicketCreated = Subjects.TicketCreated;
}
  • and we can simply use this class in our publisher.ts file, like this
stan.on("connect", () => {
  console.log("🚀 Publisher connected to NATS");

  new TicketCreatedPublisher(stan).publish({
    id: "123",
    title: "concert 11",
    price: 20,
  });
});
  • event publishing is supposed to be an async affair, so lets annotate the return type of the publish() in Publisher as Promise<void> and this would throw a TS error because, we're not really returning a promise. to fix that we enclose the body in a Promise and reject if it comes across any errors, and resolve it if it worked all fine
publish(data: T["data"]): Promise<void> {
    return new Promise((resolve, reject) => {
      this.client.publish(this.subject, JSON.stringify(data), (err) => {
        if (err) {
          return reject(err);
        }
        console.log("🚀 Message published to subject", this.subject);
        resolve();
      });
    });
  }
  • and this way, we can enclose the function inside our publisher.ts as async, and then apply the await for the publish call
stan.on("connect", async () => {
  console.log("🚀 Publisher connected to NATS");
  const publisher = new TicketCreatedPublisher(stan);
  await publisher.publish({
    id: "123",
    title: "concert 11",
    price: 20,
  });
});
  • coming to testing pubs and lists, pubs are very similar to network erqs and theres not much we can test in that aspect
  • but listeners on the other hand contain all the business logic for our application and are very similar to event handlers, so we can fully test these out
  • so we take our custom listener, publisher, subjects and our types file and we paste into a folder called events inside of our common folder

managing a nats client

  • now that we've pushed the npm package and have updated it in the ticketing module
  • so now, lets do this, as soon as someone creates a new ticket, lets publish an event
  • fairly straightforward no?
ticket-created-publisher.ts
import { Publisher, Subjects, TicketCreatedEvent } from "@brktickets/common";

export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
  subject: Subjects.TicketCreated = Subjects.TicketCreated;
}
new.ts
import { requireAuth, validateRequest } from "@brktickets/common";
import express, { NextFunction, Request, Response } from "express";
import { body } from "express-validator";
import { Ticket } from "../models/ticket";
import { TicketCreatedPublisher } from "../events/publishers/ticket-created";

const router = express.Router();

router.post(
  "/api/tickets",
  requireAuth,
  [
    body("title").not().isEmpty().withMessage("Title is required"),
    body("price")
      .isFloat({ gt: 0 })
      .withMessage("Price must be greater than 0"),
  ],
  validateRequest,
  async (req: Request, res: Response, next: NextFunction) => {
    const { title, price } = req.body;

    const ticket = Ticket.build({ title, price, userId: req.currentUser!.id });
    await ticket.save();

    // where is client? do we have to create another connection instance?
    const publisher = new TicketCreatedPublisher(client);
    await publisher.publish({
      id: ticket.id,
      title: ticket.title,
      price: ticket.price,
      userId: ticket.userId,
    });

    res.status(201).send(ticket);
  }
);

export { router as newTicketRouter };
  • but there's an error as mentioned in the above code snippet, we havent initialised the publisher with a NATS instance
  • and if we look at the code we wrote in the sample app, we stored the instance of nats in a variable called stan
  • and we passed this stan as a client param while publishing and listening to events, yes?
  • now, lets introspect and look at how mongoose works, because we arent storing the connection of mongoose in a variable or anything, we just say await mongoose.connect(uri) and it works
  • and we can import mongoose from anywhere and just use it because it internally has kept track of that connection
  • we could do something where we initialise the nats client in index.ts and import it everywhere, but if you notice, all of our main logic is in app.ts, and we import that here in index
  • so if you think about it, we'll essentially be importing this nats in the opp direction, as in, instead of everything being imported INTO index, were importing FROM index
  • so thats got to change and this is a CYCLIC dependency
  • so, lets se how mongoose works internally, they must have some class that keeps track of all the connections and when we try to import it, it returns a new instance of the class which has all the info it needs
  • so lets try and replicate it
  • if this isnt clear, then hopefully the code does a better job
  • lets start off with a new file called nats-wrapper.ts
nats-wrapper.ts
import nats from "node-nats-streaming";

class NatsWrapper {
  // optional because initially before connection, it maybe undefined
  private _client?: nats.Stan;

  // since _client is priv, we're going to use getters to get the client and then return some err if its undefined
  get client() {
    if (!this._client) {
      throw new Error("Cannot access NATS client before initialisation");
    }

    return this._client;
  }

  connect(clusterId: string, clientId: string, url: string) {
    this._client = nats.connect(clusterId, clientId, { url });

    return new Promise<void>((res, rej) => {
      this.client!.on("connect", () => {
        console.log("Connected to NATS 🚀");
        res();
      });
      this.client!.on("error", (err) => {
        console.error("❌ Error with NATS", err);
        rej(err);
      });
    });
  }
}

export const natsWrapper = new NatsWrapper();
  • what is this class doing?
  • we have a priv var called _client that we expose as client only if its available
  • and then we have a connect fn which is very similar to mongoose, but we take some params, such as the clusterId which is ticketing
  • how is the clusterId ticketing? well if you look at the nats-depl yaml file, you'll see a flag called -cid and then underneath it, youll see ticketing
  • and obv cid is clusterid
  • and then we have the client id which for which we can just give some shit for now, but we should try and keep it random
  • and then we have the url, which is nats-srv:4222 or something
  • and note how we export it as an instance of this class
  • this is index
index.ts
// other imports
import { natsWrapper } from "./nats-wrapper";

const main = async () => {
  if (!process.env.MONGO_URI) {
    throw new Error("MONGO_URI must be defined");
  }
  if (!process.env.JWT_KEY) {
    throw new Error("JWT_KEY must be defined");
  }
  try {
    await natsWrapper.connect("ticketing", "sdhjsdh", "http://nats-srv:4222");
    natsWrapper.client.on("close", () => {
      console.error("NATS connection closed");
      process.exit();
    });
    process.on("SIGINT", () => natsWrapper.client.close());
    process.on("SIGTERM", () => natsWrapper.client.close());

    await mongoose.connect(process.env.MONGO_URI);
    console.log("Connected to MongoDB");
  } catch (err) {
    console.error(err);
    throw new DatabaseConnectionError();
  }

  app.listen(3000, () => {
    console.log("Ticketing listening on port 3000 🚀");
  });
};
  • notice how we just imported the wrapper and then ran the connect fn
  • and as we had in the listener file, we need to implement a graceful shutdown process, so we have the code for that here
  • and now that we can access this client from anyfile, we can use it in the route handler and publish events whenever a new tick was created, like this
import { natsWrapper } from "../nats-wrapper";

// existing code
const publisher = new TicketCreatedPublisher(natsWrapper.client);
await publisher.publish({
  id: ticket.id,
  title: ticket.title,
  price: ticket.price,
  userId: ticket.userId,
});
  • and the same thing for when a ticket is updated

how to handle failed events

  • lets say you deposited 7k into an account in this sort of architecture and structure
  • so you put the money and the the account was reflected in the db but just before sending the event, the NATS client was terminated
  • how would you handle this event?
  • its not about using await/ no-await
  • one approach is to have 2 collections in the db, one for all the transactions that occurred and one for all the events, and this event collection would have 2 properties, the event itself and a flag called isSent or something that checks if the event was sent
  • lets say the nats server is terminated and then after sometime, it comes back, we can just push all the events that had isSent as false and be done
  • and if theres some issue in connecting to the db, then well fail both the ops in the trans and event collections
  • we need to ensure that when we save a trans, we HAVE to save the event as well
  • if either fails, we abort both the events and rollback
  • which is why dbs have this feature called a db transaction
  • note: we arent going to work on this but its imp to know how to solve it in case we ever come across a situation like this

testing

  • if you ran the tests now, they would fail because the tests setup doesnt include code to connect to a nats server
  • but we could copy paste the code from index to connect to a client but we cant assume that well always have a copy of nats running or something right?
  • which is why we're going to mock fn the natsWrapper and put in a fake one
  • so in the src folder, we create a folder called __mocks__ and we create a file with the same name as the file we're trying to mock, which in this case is nats-wrapper.ts
  • if you observe the real nats-wrapper, youll know that its basically returning an object
  • so, we'll also return an object with a property called client, and this client property has a function called publish, which we will mock
export const natsWrapper = {
  client: {
    publish: jest
      .fn()
      .mockImplementation(
        (subject: string, data: string, callback: () => void) => {
          callback();
        }
      ),
  },
};
  • once this is done, we import this in our tests/setup.ts file
jest.mock("../nats-wrapper");
  • and then we reset this mock fn before every test suite, just so we dont carry forward any data between tests
beforeEach(async () => {
+ jest.clearAllMocks();
  const collections = await mongoose.connection.db!.collections();

  for (const collection of collections) {
    await collection.deleteMany({});
  }
});

env vars

  • for connecting to nats we used all hard coded strings, so lets just transition them to use env vars, just like jwt_key and mongo_uri
  • and also, we want the client_id to be unique right, so then instead of making it random, which would totally work, how about we give it the same name as the pod? that way it would make it super easy for when we debug our logs
  • and to do that, we modify the ticketing-depl file
# all the other config
env:
# all other envs
    - name: NATS_CLIENT_ID
      valueFrom:
        fieldRef:
            fieldPath: metadata.name

creating the orders services

  • keeps track of who wants to buy which ticket
  • also has the logic to lock down a ticket and not let others buy it and also expire it if you dont buy it within in a certain time period
  • properties
propdesc
userIduser who wants to buy the ticket
statuswhether the order is paid, expired or pending
expiresAttime at which this order expires (15mins to pay)
ticketIdthe ticket the user is trying to buy
  • and since we're not using a monolith, the orders service will also have to have a tickets collection where we have the id, title, price and version
  • that means, the orders collection has to sub to ticket:created and ticket:updated chan

listening for events and handling concurrency issues

creating the listeners in the orders service

  • the orders service needs to know when a new ticket is created or updated right
  • so in other words, it listens to ticket:created and ticket:updated channels
  • so lets create listeners for those events in the orders service
  • we create listeners for TicketCreated and TicketUpdated and we write the logic for saving and updating tickets in the db inside of orders
  • and then we create instances of them inside of index.ts and listen for events

dealing with concurrency

  • if we ran a script that ran a set of ticket creations and updations on multiple replicas in PARALLEL about 200 times, we might expect to see some concurrency issues
  • for eg: create a ticket with a price of 5, change the price to 10, and then 15
  • so finally, in an ideal world we should have about 200 tix with a price of 15 right?
  • this count may or may not be 0
  • which is why we need to use versioning
  • in other words, whenever we create a ticket etc, we also add a value to a field called version and set it to 1
  • whenever we update the ticket, we also increment this version
  • so when we emit events and process these, we must make sure that we process the event that is of version: currVersion + 1 first and ignore the rest because NATS is anyway going to send it again if you dont ack it
  • and mongodb has this feature of versioning by default
  • so theres no need for us to manually do any of this
  • this approach is called RECORD UPDATES WITH OPTIMISTIC CONCURRENCY CONTROL
  • this is not a mongodb feature, this is agnostic to any db and can be applied anywhere
  • but for mongo we're going to use a package called mongoose-update-if-current
  • this is a plugin that will automatically update the versioning for us and help us (which is technically cheating because we arent doing it ourselves)
  • how do we wire this up?
import { updateIfCurrentPlugin } from "mongoose-update-if-current";
// ...
ticketSchema.set("versionKey", "version");
ticketSchema.plugin(updateIfCurrentPlugin);
  • setting the versioning key from __v to a variable called version
  • and we're telling mongoose to use this schema
  • but what if we're using some other db? then how do we implement this logic?

versioning

  • what is the functionality of this package?
  1. to update the version no on records before they are saved
  2. to customise the find-and-update operation(save) to look for the correct version
  • and lets solve this step by step

step 1

  • in the listener when dealing with ticketupdated events, we are currently finding the id and changing the data using ticket.set({})
  • and to this set fn we're passing in title and price, but we could also pass in version, and thats it! we've basically accomplished the task

step 2

  • the equivalent of this package is the $where attribute of mongoose which allows us to chain additional conditions to a query when calling the save fn
  • so what we can do is, we can have a presave middleware that is called before every save()
ticketSchema.pre('save', function(done) {
    // ts-ignore
    this.$where = {
        version: this.get('version'). -1
    }
})
  • the ts server would throw an error and say $where doesnt exist, but we literally know it exists from the docs, so we can ignore it
  • so what this does essentially is it only merges when the version number is one less than the event
  • but what if the versions are separated by 100? or if timestamps?
  • all those changes would be done here
  • how to access the db inside the pod?
  • k exec -it <podname> mongo
  • then, run show dbs
  • then use <db name>
  • then db.<collection_name>
  • and then normal commands like db.find({}) etc

creating the expiration service

  • the only job of this service is to listen for events and then update another event when the timer is up
  • no event handlers, no mongoose, none of that
  • its only a listener and a publisher
  • question is how do we emit an event after a particular time interval?
  • you may notice that the expiresAt field inside of an order is a timestamp and its set to the future
  • so the only job is to basically check if this timestamp is now in the past

options

  1. as soon as an order:created event is picked up we start a setTimeout
    • setTimeouts are stored in memory, so if this service restarted, then all the timers would be lost
  2. check the order:created event and check if there's time to emit, dont ack it because nats will anyways send it again. if its expired tho, send an expired:complete event
  3. (VERY GOOD IMPLEMENTATION BUT NOT SUPPORTED BY NATS-STREAMING) as soon as we get an order:created event we immediately dispatch an expiration:complete but we tell the broker to wait for 15mins before publishing. this is called a scheduled message or scheduled delivery
  4. (WHAT WERE DOING) create a new service and use a pkg called BULLJS which basically helps us set long timers and notifs and we can just tell this pkg 'remind me to do something in 15mins' and bull is going to store all this in a redis instance

setup

  • we create the expiration-redis-depl file where we create a redis instance
  • we also create the expiration-depl file. we can get rid of the clusterIP service because no service is going to 'interact' with this service
  • its going to listen and pub events, thats all

bull js

  • what do people use bull for
  • so its basically a worker, very similar to celery (py-worker)
  • not intended for large jobs
  • were going to create a queue of jobs
  • what is our code going to look like: we have a queue that has 2 functions
    • code to enqueue a job
    • code to dequeue a job
  • the eq fn adds jobs to the queue
  • then these jobs will be sent to the redis srv
  • once the 15mins are up, the jobs will sent to the dq fn, where we have some code to process that dq req
  • then we emit a expiration:complete event

fin!

after i dont know how many months i have happily finished this and am excited to see what i can pick up next! the code for the microservices project is here