Intro to Node Stream with Highland JS

I encountered Node Stream at my day job, and due to my unfamiliarity with the concept, I had a hard time understanding what it is and how it works. This article explains my approach to learning the basics of Node Stream with Highland JS.

What is Stream Processing#

Stream processing is a concept where you start to process the data as soon as they become available and process them in chunks. Batch processing, instead, is where you accumulate the data for a certain period and process all of them at once.

Both of these techniques are used for processing a large amount of data. An example of a data processing application is building a leaderboard that shows the rank of players in an online game. In batch processing, you can sum up the player’s score every hour and update the scoreboard in the background. In stream processing, you can sum up the player’s score as soon as any player finishes their game and update the scoreboard immediately.

There are pros and cons for each type of data processing, but today, we’ll only focus on stream processing. One significant advantage of stream processing is its memory efficiency because it allows you to work with a dataset too big to fit in local memory. You can read this great article from freeCodeCamp.

Node Stream#

Node stream is an abstract interface for stream processing in NodeJS. I want to avoid going into the detail of this topic because I want to share specifically about Highland JS.

One pitfall I faced with using Node Stream for data processing is it’s impossible to resume from where you left off when the application crash or SIGTERM unless you implement some checkpointing mechanism. Otherwise, you must start from the beginning whenever your application/script stops.

Highland JS#

Highland JS is a utility library that helps you write stream processing in NodeJS quickly. Highland JS has many features, including handling backpressure, which is essential for writing an efficient application.

Node Stream is like vanilla JS. Of course, you can build good UI using vanilla JS, but you’ll need to write many things independently. On the other hand, if you’re using React or Next JS, many things are handled for you, like page routing, rendering, bundling, etc. The same thing goes for Node Stream and Highland JS. Of course, you can write a streaming app using a native Node stream, but you’ll have to handwrite a lot of code to handle backpressure and do transformations on the stream. Highland JS lets you do that easily, with fewer lines of code.

Let’s build a stream-processing pipeline#

Let’s build a script to sum up, all the transactions made from each bank we have. I’m a Malaysian, so I’m using Malaysian examples of banks: Maybank, CIMB, and RHB.

We’ll leave all the fancy things like Typescript, linting, and so on behind and focus on only one thing: Highland JS.

Prerequisite#

Let’s start by creating a new folder and initiating an NPM package.

$ mkdir learn-highland
$ cd learn-highland
$ npm init -y

Start with what everyone understands - an array#

Any wise Javascript developer should consider writing the script using arrays of transactions. From the code below, we’re just concatenating arrays of transactions from each bank and summing them up into the totalSpending global variable. Easy…

// src/index.js

const maybank = [100, 200, 100]
const cimb = [50, 100, 300]
const rhb = [110, 230, 190]

const allTransactions = [...maybank, ...cimb, ...rhb]

let totalSpending = 0

for (const transaction of allTransactions) {
    // Run an expensive operation here, e.g., save this data in DB
    totalSpending += transaction
}

console.log(totalSpending)

You can run the script with this command:

$ node src/index.js

You’ll see the following result.

Script output

Make it streaming#

The code above works fine if you have hundreds or maybe a few thousands of transactions in a period. But can you imagine running that script with millions of transactions? This is especially true when the expensive operation takes a lot of CPU and memory, which could kill the resources.

To make it efficient, we can use stream processing here. This is how you can do it using Highland JS.

First, install the Highland npm package.

$ npm install highland

And then, import the package, and start writing the data stream pipeline.

var _ = require('highland')

const maybank = [100, 200, 100]
const cimb = [50, 100, 300]
const rhb = [110, 230, 190]

const allTransactions = [...maybank, ...cimb, ...rhb]

let totalSpending = 0

_(allTransactions)  // This is the data source
    .map((transaction) => { // This is our first transformation
        // Run an expensive operation here, e.g., save this data in DB
        totalSpending += transaction
    })
    .done(() => { // This is how we start consume the stream
        console.log(totalSpending)
    })

It’s essential to understand how Node stream & Highland JS works. The pipeline starts with a data source, transformation & consumption.

source --(consume a stream)--> transformation 1 --(creates a new stream)--> transformation N --(creates a new stream)--> consumption

Note that the output of this script is the same as the previous run. For this example, there is little to no difference between this and the previous implementation. But you can see the difference when doing this on a bigger scale and more compute-intensive operations.

Make it better - using a generator#

Generators - These are functions that provide values for the stream. They are lazy and can be infinite, they can also be asynchronous (for example, making a HTTP request)…. - https://caolan.github.io/highland/#_(source)

The important word here is lazy (evaluation). That means your application will perform its tasks when needed and waste CPU operation unnecessarily.

This is how you write the source in generators instead of arrays. Note that the transformation and stream consumption code is still the same.

var _ = require('highland')

function* maybank() {
    yield 100
    yield 200
    yield 100
}

function* cimb() {
    yield 50
    yield 100
    yield 300
}

function* rhb() {
    yield 110
    yield 230
    yield 190
}

function* allTransactions() {
    yield* maybank()
    yield* cimb()
    yield* rhb()
}

let totalSpending = 0

_(allTransactions())
    .map((transaction) => {
        // Run an expensive operation here, e.g., save this data in DB
        totalSpending += transaction
    })
    .done(() => {
        console.log(totalSpending)
    })

Again, the output of this script is the same as the previous run.

Important note: Even though the generator is more memory friendly than spits out the whole array, you’ll need to be careful not to abuse it. Otherwise, you might get sub-par performance from it.

Start leveraging more Highland JS functions#

There are a lot of functions that Highland JS offers. You can write a better Node stream script/application with some transformation functions.


// Stream source is redacted

_(allTransactions())
    .take(5) // Take only the first five values of the stream
    .filter((transaction) => {
        return transaction > 50 // Filter only transactions that are more than $50
    })
    .ratelimit(2, 1000) // Control the speed of the pipeline to your liking
    .tap(console.log) // Print out the current element in the stream
    .map((transaction) => {
        // Run an expensive operation here, e.g., save this data in DB
        totalSpending += transaction
    })
    .done(() => {
        console.log(totalSpending)
    })

You’ll see the output below:

Script output

Notice the delay in the script output. That’s because of the rate limit we impose in the pipeline. A rate limit is essential if you’re calling external APIs, so you don’t stress their system.

Several other practical things you can do with Highland JS:

  • .batch() transformation can help you batch many elements in a single batch. This is extremely useful for saving I/O and network calls.
  • .fork() to fan out the data and run multiple operations on the same data.

Personal note Be careful when using .group() and uniq() because they’ll consume a lot of memory to keep the data in memory to deduplicate the values.

Conclusion#

Node stream and generator differ from what developers use daily (at least for me). But it’s great to know that this technology exists, and you may want to use them someday.

© Fadhil Yaacob 2024