Cara menggunakan nodejs queue system

Cara menggunakan nodejs queue system
Cara menggunakan nodejs queue system

Alberto Gimeno Follow Ecosystem Engineer at GitHub. Sometimes I write about JavaScript, Node.js, and frontend development.

Scaling your Node.js app using distributed queues

June 10, 2021 9 min read 2735

Cara menggunakan nodejs queue system

Editor’s Note: This article was updated in June 2021 to include information on Bull (as opposed to kue), a Node.js library that implements a queuing system on top of Redis.

In a previous article, I talked about how to run background tasks/jobs in Node.js (with the worker_threads module in particular). But what happens if you are reaching the limits of the machine your Node.js instance is running in? Then you need to either move to a bigger machine (known as scaling vertically)or scale horizontally. Scaling vertically always has a limit, so at some point, you’ll need to scale horizontally.

Scaling using Node.js distributed worker queues

So, how do you accomplish this? If your app is, for example, a web server that needs to send responses almost immediately, then you need something like a load balancer. In contrast, if your app needs to do work but is not required to be done immediately, then you can spread the work to “worker” nodes and distribute it using queues.

Some use cases include generating daily reports, recalculating things for users daily (e.g., recommendations), processing things a user has uploaded (e.g., a large CSV file, importing data when a user migrates to a service, importing data when the user signs in).

A distributed queue is like the storage of job descriptions that contain enough information to do the job, or enough information to figure out all of the things required to do the job. For example:

const jobDataSendWelcomeEmail = {
  userId: 1234,
  userName: 'John Smith',
  email: '[email protected]'
}

Usually, the main app (or any part of a more complex system), puts jobs into the queue. Other apps running in different machines are connected to the queue and receive those jobs. These consumers can process the job with the information received, or at least they can figure out all of the information they need and obtain it. This simple architecture has important benefits:

  • Your app is divided now into two logic pieces that can be distributed in different machines
  • You can scale from one to many workers without touching any code and without disrupting the execution of the main app. The queue takes care of sending the jobs to the workers through the network and in most implementations, takes care of sending the same job once to a worker

Note: Each vendor has its own jargon for queues (topics, channels), jobs (tasks, messages), and workers (consumers).

Why you should use Node.js distributed worker queues

You might be thinking that you can implement this architecture yourself with your existing database and without adding complexity to the system. You can create a “jobs” table with two columns, an “id” primary key column, and a “data” column with all of the job information. The main app just writes to the table and every X seconds the workers read from it to peek at the next job that is to be executed. To prevent other workers from reading the job, you make the operation in a transaction that also deletes the job from the table.

Voilá! Problem solved, right? Well, first of all, you are querying and waiting every X seconds. That’s not ideal, but could be okay in basic use cases. More importantly, the problem is, what happens if the worker crashes while processing the job? The job has already been deleted when it was pulled from the table and we cannot recover it… this (along with other things) is nicely solved by the libraries and services implemented for the matter and you don’t have to reinvent the wheel.

Benefits of using a queue service

One great thing about queue systems is how they handle error scenarios. When you receive a job, this is not deleted from the queue, but it is “locked” or invisible to the rest of the workers until one of these happens, either the worker deletes it after the work is done, or there is a timeout that you can configure. So, if a worker crashes, the timeout happens and the job goes back to the queue to be consumed by other workers. When everything is fine, the worker just deletes the job once the data is processed.

That is great if the problem was in the worker (the machine was shut down, ran out of resources, etc.), but what if the problem is in the code that processes the jobs, and every time the queue sends it to a worker, the worker crashes?

Then we are in an infinite loop of failures, right? Nope, distributed queues usually have a configuration option to set a maximum number of retries. If the maximum number of retries is reached then depending on the queue you can configure different things. A typical adjustment is moving those jobs to a “failure queue” for manual inspection or to consume it for workers that just notify errors.

Not only are distributed queue implementations great for handling these errors but also, they use different mechanisms to send jobs to workers as soon as possible. Some implementations use sockets, others use HTTP long polling, and others might use other mechanisms. This is an implementation detail, but I want to highlight that it is not trivial to implement, so you better use existing and battle-tested implementations rather than implementing your own.

What to put in the job data

Many times I find myself wondering what to put in the job data. The answer depends on your use case, but it always boils down to two principles:

Don’t put too much. The amount of data you can put in the job data is limited. Check the queuing system you are using for more information. Usually, it’s big enough that we won’t reach the limit, but sometimes we are tempted to put too much. For example, if you need to process a big CSV file, you cannot put it in the queue. You’ll need to upload it first to a storage service and then create a job with a URL to the file and additional information you need such as the user that uploaded it, etc.

Don’t put too little. If you have immutable data (e.g., a createdAt date) or data that rarely changes (e.g., usernames) you can put in your job data. The job should be processed in a matter of seconds or minutes so usually, it is ok to put some data that might change, like a username, but it is not critical if it’s not updated to the second. You can save queries to the database, or remove any query completely. However, if there’s information that affects how the data is processed, you should query it inside the job processor.


Cara menggunakan nodejs queue system
Cara menggunakan nodejs queue system

Over 200k developers use LogRocket to create better digital experiences

Cara menggunakan nodejs queue system
Cara menggunakan nodejs queue system
Learn more →


Make your queue jobs small and fast to process

If you need to process big sets of data, divide them into smaller pieces. If you have to process a big CSV file, first, divide it into chunks of a certain number of rows and create a job per chunk. There are a few benefits of doing it this way:

  • The data will be processed faster because it can be processed in parallel
  • You make better use of your resources. It’s better to have N workers doing smaller jobs than having one worker doing heavy processing while the rest are idle or underused
  • It’s also faster and more efficient to retry a small job that has failed as opposed to a big job that has failed

If you need an aggregated result from all of those small chunks you can put all of the intermediate results in a database, and when they are all done you can trigger a new job in another queue that aggregates the result. This is a map/reduce in essence. “Map” is the step that divides a large job into smaller jobs and then “reduce” is the step that aggregates the result of those smaller jobs.

If you cannot divide your data beforehand you should still do the processing in small jobs. For example, if you need to use an external API that uses cursors for paginating results, calculating all of the cursors beforehand is impractical. You can process one page of results per job and once the job is processed you get the cursor to the next page and you create a new job with that cursor, so the next job will process the next page and so on.

Delayed jobs in worker queues

Another interesting feature of distributed queues is that you can usually delay jobs. There’s normally a limit on this so you cannot delay a job for two years, but there are some use cases where this is useful. Some examples include:

  • You want to send a welcome email to a user that signed up but not immediately just at a later time. Just create a delayed job that sends an email
  • When processing a job you hit a rate limit from an API. You will probably be told when the rate limit ends so you can put the job back in the queue, but delayed that specific time
  • In general, if you want to trigger something at a specific time in the future such as schedule a backup, a notification, a reminder, etc…

Setting job priority

Most queue implementations do not guarantee the order of execution of the jobs, so don’t rely on that. However, they usually implement some way of prioritizing some jobs over others. This depends highly on the implementation, so take a look at the docs of the system you are using to see how you can achieve it if you need to.

Use cases of distributed worker queueing systems

Let’s look at some examples. Even though all queuing systems have similar features there’s not a common API for them, so we are going to see a few different examples.

The Bull library

Bull is a Node.js library that implements a queuing system on top of Redis. Redis is an in-memory database that can be persisted and many times is already being used for things like session storage in your application. For this reason, choosing this library can be a no-brainer. Besides, even if you are not using Redis yet, there are a few cloud providers that allow you to spin up a managed Redis server easily (e.g. Heroku or AWS). Finally, another benefit of using Bull is that your stack is 100% open source so you don’t fall into any vendor lock-in.

If you need to handle a lot of work and you still want an open-source solution, then I would choose RabbitMQ. I haven’t chosen it for the examples in this article because Redis is usually easier to set up and more common. However RabbitMQ has been designed specifically for these use cases, so by design, it’s technically superior.

There are two main components of job scheduling with Bull: a producer and consumer. A producer creates jobs and adds them to the Redis Queue, while a consumer picks jobs from the queue and processes them.

Let’s see how to create and consume jobs using Bull.

Create the queue and put a job on it:

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});

Consume jobs from the queue:

const bull = require('bull');
const queue = new bull('my-queue')
queue.process(async (job, done) => { 
/*  /* 
    Sometimes, you might need to report the jobs progress, you can easily use the     job.progress() function to track the progress 
     */
  let progress = 0;

  for(i = 0; i < 80; i++){
    await doSomething(job.data);
    progress += 10;
    job.progress(progress);
  }
  // call done when finished
  done();
});

Microsoft Azure using its Service Bus

Microsoft Azure offers two queue services. There’s a here. I’ve chosen to use Service Bus because it guarantees that a job is delivered at most to one worker.

Let’s see how to create and consume jobs using Service Bus.

Create the queue and put a job on it

With Microsoft Azure, we can create the queue programmatically with the

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
0method. Once it is created, we can start sending messages:

const azure = require('azure')
const serviceBusService = azure.createServiceBusService()
const jobData = { hello: 'world' }
serviceBusService.createTopicIfNotExists('queue_name', err => {
  if (err) console.log(err)
  serviceBusService.sendTopicMessage('queue_name', jobData, err => {
    if (err) console.log(err)
  })
})

Consume jobs from the worker queue

Some implementations, like this one, are required to create a subscription. Check out the Azure docs for more information on this topic:

const azure = require('azure')
const serviceBusService = azure.createServiceBusService()
serviceBusService.createSubscription('queue_name', 'subscription_name', err => {
  if (err) return console.log(err)
  // If the `isPeekLock` option is not set to true, the message will be deleted when peeked
  serviceBusService.receiveSubscriptionMessage('queue_name', 'subscription_name', { isPeekLock: true }, (err, message) => {
    if (err) return console.log(err)
    // Do something with `message` and then delete it
    serviceBusService.deleteMessage(lockedMessage, err => {
      if (err) return console.log(err)
    })
  })
})

Amazon, using its SQS service

The Amazon distributed queue service is called Simple Queue Service (SQS). It can be used directly but it is also possible to configure it with other AWS services for doing interesting workflows. For example, you can configure an S3 bucket to automatically send jobs to an SQS queue when a new file (object) is stored. This, for example, can be useful to process files easily (videos, images, CSVs).


More great articles from LogRocket:

  • Don't miss a moment with The Replay, a curated newsletter from LogRocket
  • Learn how LogRocket's Galileo cuts through the noise to proactively resolve issues in your app
  • Use React's useEffect to optimize your application's performance
  • Switch between multiple versions of Node
  • Discover how to animate your React app with AnimXYZ
  • Explore Tauri, a new framework for building binaries
  • Compare NestJS vs. Express.js

Let’s see how we can programmatically add and consume jobs on a queue.

Create the queue and put a job on it:

const AWS = require('aws-sdk')
AWS.config.update({region: 'REGION'})
const sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
const queueParams = {
  QueueName: 'queue_name',
  Attributes: {
    'DelaySeconds': '60',
    'MessageRetentionPeriod': '86400'
  }
}
sqs.createQueue(queueParams, (err, data) => {
  if (err) return console.log(err)
  console.log('queue url', data.QueueUrl)
  const jobParams = {
    MessageBody: JSON.stringify({ hello: 'world' }),
    QueueUrl: data.QueueUrl
  }
  sqs.sendMessage(jobParams, (err, data) => {
    if (err) return console.log(err)
  })
})

Consume jobs from the queue:

const AWS = require('aws-sdk')
AWS.config.update({region: 'REGION'})
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})
const queueURL = 'SQS_QUEUE_URL' // Obtained when the queue was created
const params = {
 AttributeNames: [
  'SentTimestamp'
 ],
 MaxNumberOfMessages: 1, // receive only one message at a time
 MessageAttributeNames: [
    "All"
 ],
 QueueUrl: queueURL,
 VisibilityTimeout: 20,
 WaitTimeSeconds: 0
}
sqs.receiveMessage(params, (err, data) => {
  if (err) return console.log(err)
  if (data.Messages) {
    // Do something when the messages and then delete them
    const message = data.Messages[0]
    
    const deleteParams = {
      QueueUrl: queueURL,
      ReceiptHandle: message.ReceiptHandle
    }
    sqs.deleteMessage(deleteParams, function(err, data) {
      if (err) return console.log(err)
    })
  }
})

Check the Node.js docs on SQS for more information.

Google Cloud, using its pub/sub service

Google Cloud, like Azure, also requires creating subscriptions ( for more information). In fact, you need to create the subscription first, before sending messages to the topic/queue or they will not be available.

The documentation suggests creating both the topic and the subscription from the command line:

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
1

and

const bull = require('bull')
const queue = new bull(“queue_name”)
const job = await queue.add({
  foo: 'bar'
});
2

Nevertheless, you can also create them programmatically, but now let’s see how to insert and consume jobs assuming that we have already created the queue (topic) and the subscription.

Create the queue and put a job on it:

const { PubSub } = require('@google-cloud/pubsub')
const pubsub = new PubSub()
const jobData = Buffer.from(JSON.stringify({ hello: 'world' }))
const messageId = await pubsub.topic('queue_name').publish(jobData)

Consume jobs from the queue

Google Cloud Pub/Sub guarantees that a message/job is delivered at least once for every subscription, but the message could be delivered more than once (as always, for more information):

const { PubSub } = require('@google-cloud/pubsub')
const pubsub = new PubSub()
const subscription = pubsub.subscription('subscription_name')
subscription.on('message', message => {
  const { data } = message
  // Do something and then
  message.ack() // This deletes the message from the queue
})

Conclusion

Distributed queues are a great way of scaling your application for a few reasons:

200’s only Monitor failed and slow network requests in production

Deploying a Node-based web app or website is the easy part. Making sure your Node instance continues to serve resources to your app is where things get tougher. If you’re interested in ensuring requests to the backend or third party services are successful, try LogRocket.
Cara menggunakan nodejs queue system
Cara menggunakan nodejs queue system
https://logrocket.com/signup/

LogRocket is like a DVR for web and mobile apps, recording literally everything that happens while a user interacts with your app. Instead of guessing why problems happen, you can aggregate and report on problematic network requests to quickly understand the root cause.