It’s about time to embrace Streams

 

Luciano Mammino (@loige)

05/03/2019

It’s about time to embrace Streams

 

Luciano Mammino (@loige)

Hello, i am Luciano! 👋

Cloud Architect

Let's connect!

Blog: loige.co

Twitter: @loige

GitHub: @lmammino 

Agenda

01. Buffers VS Streams
02. Stream types & APIs
03. Pipe()

04. Streams utilities
05. Writing custom streams

06. Streams in the Browser

01. Buffers VS
        Streams

buffer: data structure to store and transfer arbitrary binary data

*Note that this is loading all the content of the file in memory

*

Stream: Abstract interface for working with streaming data

*It does not load all the data straight away

*

Let's solve this problem

1. Read the content of a file
2. copy it to another file*

* cp in Node.js

The buffer way

// buffer-copy.js

const { readFileSync, writeFileSync } = require('fs')

const [,, src, dest] = process.argv
const content = readFileSync(src)
writeFileSync(dest, content)

The Stream way

// stream-copy.js

const { 
  createReadStream,
  createWriteStream
} = require('fs')

const [,, src, dest] = process.argv
const srcStream = createReadStream(src)
const destStream = createWriteStream(dest)
srcStream.on('data', (data) => destStream.write(data))

* Careful: this implementation is not optimal

*

Memory comparison (~600Mb file)

node --inspect-brk buffer-copy.js assets/poster.psd ~/Downloads/poster.psd

Memory comparison (~600Mb file)

node --inspect-brk stream-copy.js assets/poster.psd ~/Downloads/poster.psd

let's try with a big file (~10Gb)

let's try with a big file (~10Gb)

node --inspect-brk stream-copy.js assets/the-matrix-hd.mkv ~/Downloads/the-matrix-hd.mkv

if bytes were blocks...

Big buffer approach

Streaming approach

👍 streams vs buffers 👎

  • Streams keep a low memory footprint even with large amounts of data
  • Streams allows you to process data as soon as it arrives
  • Stream processing generally does not block the event loop

03. Stream types
       & APIs

All Streams are event emitters

A stream instance is an object that emits events when its internal state changes, for instance:

s.on('readable', () => {}) // ready to be consumed
s.on('data', (chunk) => {}) // new data is available
s.on('error', (err) => {}) // some error happened
s.on('end', () => {}) // no more data available

The events available depend from the type of stream

Writable streams

A writable stream is an abstraction that allows to write data over a destination

 

Examples:

  • fs writeStream
  • process.stdout, process.stderr
  • HTTP request (client-side)
  • HTTP response (server-side)
  • AWS S3 PutObject (body parameter)

Writable streams - Methods

  • writable.write(chunk, [encoding], [callback])
  • writable.end([chunk], [encoding], [callback])
  • writable.on('drain')
  • writable.on('close')
  • writable.on('finish')
  • writable.on('error', (err) => {})

Writable streams - Events

// writable-http-request.js
const http = require('http')

const req = http.request(
  {
    hostname: 'enx6b07hdu6cs.x.pipedream.net',
    method: 'POST'
  },
  resp => {
    console.log(`Server responded with "${resp.statusCode}"`)
  }
)

req.on('finish', () => console.log('request sent'))
req.on('close', () => console.log('Connection closed'))
req.on('error', err => console.error(`Request failed: ${err}`))

req.write('writing some content...\n')
req.end('last write & close the stream')

backpressure

When writing large amounts of data you should make sure you handle the stop write signal and the drain event

 

loige.link/backpressure

// stream-copy-safe.js

const { createReadStream, createWriteStream } = require('fs')

const [, , src, dest] = process.argv
const srcStream = createReadStream(src)
const destStream = createWriteStream(dest)

srcStream.on('data', data => {
  const canContinue = destStream.write(data)
  if (!canContinue) {
    // we are overflowing the destination, we should pause
    srcStream.pause()
    // we will resume when the destination stream is drained
    destStream.once('drain', () => srcStream.resume())
  }
})

Readable streams

A readable stream represents a source from which data is consumed.

Examples:

  • fs readStream
  • process.stdin
  • HTTP response (client-side)
  • HTTP request (server-side)
  • AWS S3 GetObject (data field)

It supports two modes for data consumption: flowing and paused (or non-flowing) mode.

Readable streams - Methods

  • readable.read([size])
  • readable.pause()
  • readable.resume()

Readable streams - events

  • readable.on('readable')
  • readable.on('data', (chunk) => {})
  • readable.on('end')
  • readable.on('error', (err) => {})

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

1

2

3

Source data

Readable stream in flowing mode

data listener

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

1

2

3

Source data

Readable stream in flowing mode

Read

data listener

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

1

2

3

Source data

Readable stream in flowing mode

data listener

data

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

2

3

Source data

Readable stream in flowing mode

data listener

Read

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

2

3

Source data

Readable stream in flowing mode

data listener

data

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

3

Source data

Readable stream in flowing mode

data listener

Read

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

3

Source data

Readable stream in flowing mode

data listener

data

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

Source data

Readable stream in flowing mode

Read

data listener

(end)

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

Source data

Readable stream in flowing mode

data listener

end

(end)

When no more data is available, end is emitted.

Readable streams - Flowing Mode

Data is read from source automatically and chunks are emitted as soon as they are available.

// count-emojis-flowing.js

const { createReadStream } = require('fs')
const { EMOJI_MAP } = require('emoji') // from npm

const emojis = Object.keys(EMOJI_MAP)

const file = createReadStream(process.argv[2])
let counter = 0

file.on('data', chunk => {
  for (let char of chunk.toString('utf8')) {
    if (emojis.includes(char)) {
      counter++
    }
  }
})
file.on('end', () => console.log(`Found ${counter} emojis`))
file.on('error', err => console.error(`Error reading file: ${err}`))

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

Readable stream in paused mode

consumer

1

2

3

<      >

(internal buffer)

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

1

2

3

<      >

(internal buffer)

Read

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

1

2

3

<      >

(internal buffer)

readable

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

1

2

3

<      >

(internal buffer)

Nothing happens until the consumer decides to read the data

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

1

2

3

<      >

(internal buffer)

read()

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

2

3

<      >

(internal buffer)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

null

2

3

<      >

(internal buffer)

read()

Using read() with an empty buffer will return null (stop reading signal)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

2

3

<      >

(internal buffer)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

2

3

<      >

(internal buffer)

Read

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

2

3

<      >

(internal buffer)

readable

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

2

3

<      >

(internal buffer)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

2

3

<      >

(internal buffer)

read()

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

3

<      >

(internal buffer)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

3

<      >

(internal buffer)

Read

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

3

<      >

(internal buffer)

readable

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

3

<      >

(internal buffer)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

3

<      >

(internal buffer)

read()

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

<      >

(internal buffer)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

<      >

(internal buffer)

Read

(end)

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

Source data

consumer

<      >

(internal buffer)

(end)

end

Readable stream in paused mode

Readable streams - Paused Mode

A consumer has to call the read method explicitly to read chunks of data from the stream. The stream sends a readable event to signal that new data is available.

// count-emojis-paused.js
const { createReadStream } = require('fs')
const { EMOJI_MAP } = require('emoji') // from npm

const emojis = Object.keys(EMOJI_MAP)

const file = createReadStream(process.argv[2])
let counter = 0

file.on('readable', () => {
  let chunk
  while ((chunk = file.read()) !== null) {
    for (let char of chunk.toString('utf8')) {
      if (emojis.includes(char)) {
        counter++
      }
    }
  }
})
file.on('end', () => console.log(`Found ${counter} emojis`))
file.on('error', err => console.error(`Error reading file: ${err}`))

Readable streams
Mode Switch conditions

  • All readable streams are created in paused mode
  • paused streams can be switched to flowing mode with:
    • stream.on('data', () => {})
    • stream.resume()
    • stream.pipe()
  • flowing streams can switch back to paused with:
    • stream.pause()
    • stream.unpipe() for all attached streams

Readable streams
Flowing
VS paused

  • Push VS Pull mental models

  • Flowing is simpler to use

  • Paused gives you more control on how data is consumed from the source

  • Whichever you pick, stay consistent!

Bonus mode

Readable streams are also Async Iterators
(Node.js 10+)

 

 

Warning: still experimental

// count-emojis-async-iterator.js
const { createReadStream } = require('fs')
const { EMOJI_MAP } = require('emoji') // from npm

async function main () {
  const emojis = Object.keys(EMOJI_MAP)
  const file = createReadStream(process.argv[2])
  let counter = 0

  for await (let chunk of file) {
    for (let char of chunk.toString('utf8')) {
      if (emojis.includes(char)) {
        counter++
      }
    }
  }

  console.log(`Found ${counter} emojis`)
}

main()

Still experimental in Node.js 11

If you like this API and don't want to rely on an experimental core feature: 

Object mode

Readable streams can emit objects if this mode is enabled

// readable-timer.js

const { Readable } = require('stream')

const timerStream = new Readable({
  objectMode: true,
  read () {
    this.push(new Date())
  }
})

timerStream.on('data', (currentDate) => {
  // prints the current second
  console.log(currentDate.getSeconds())
})

This is an object

Other types of stream

  • Duplex Stream
    streams that are both Readable and Writable. 
    (net.Socket)
     
  • Transform Stream
    Duplex streams that can modify or transform the data as it is written and read.
    (zlib.createGzip(), crypto.createCipheriv())

Anatomy of a transform stream

1. write data

transform stream

3. read transformed data

2. transform the data

(readable stream)

(writable stream)

Gzip example

1. write data

transform stream

3. read transformed data

2. transform the data

(readable stream)

(writable stream)

Uncompressed data

Compressed data

compress

gzip.createGzip()

// stream-copy-gzip.js
const { 
  createReadStream,
  createWriteStream
} = require('fs')
const { createGzip } = require('zlib')

const [, , src, dest] = process.argv
const srcStream = createReadStream(src)
const gzipStream = createGzip()
const destStream = createWriteStream(dest)

srcStream.on('data', data => {
  const canContinue = gzipStream.write(data)
  if (!canContinue) {
    srcStream.pause()
    gzipStream.once('drain', () => {
      srcStream.resume()
    })
  }
})

srcStream.on('end', () => {
  // check if there's buffered data left
  const remainingData = gzipStream.read()
  if (remainingData !== null) {
    destStream.write()
  }
  gzipStream.end()
})

gzipStream.on('data', data => {
  const canContinue = destStream.write(data)
  if (!canContinue) {
    gzipStream.pause()
    destStream.once('drain', () => {
      gzipStream.resume()
    })
  }
})

gzipStream.on('end', () => {
  destStream.end()
})

// TODO: handle errors! >:)

03. pipe()

readable.pipe(writableDest)

  • Connects a readable stream to a writable stream
  • A transform stream can be used as a destination as well
  • It returns the destination stream allowing for a chain of pipes
readable
  .pipe(tranform1)
  .pipe(transform2)
  .pipe(transform3)
  .pipe(writable)
// stream-copy-gzip-pipe.js

const { 
  createReadStream,
  createWriteStream
} = require('fs')
const { createGzip } = require('zlib')

const [, , src, dest] = process.argv
const srcStream = createReadStream(src)
const gzipStream = createGzip()
const destStream = createWriteStream(dest)

srcStream
  .pipe(gzipStream)
  .pipe(destStream)

Setup complex pipelines with pipe

readable
  .pipe(decompress)
  .pipe(decrypt)
  .pipe(convert)
  .pipe(encrypt)
  .pipe(compress)
  .pipe(writeToDisk)

This is the most common way to use streams

Handling errors (correctly)

readable
  .on('error', handleErr)
  .pipe(decompress)
  .on('error', handleErr)
  .pipe(decrypt)
  .on('error', handleErr)
  .pipe(convert)
  .on('error', handleErr)
  .pipe(encrypt)
  .on('error', handleErr)
  .pipe(compress)
  .on('error', handleErr)
  .pipe(writeToDisk)
  .on('error', handleErr)

 

handleErr should end and destroy the streams

(it doesn't happen automatically)

 

04. Stream utilities

stream.pipeline(...streams, callback)

// stream-copy-gzip-pipeline.js

const { pipeline } = require('stream')
const { createReadStream, createWriteStream } = require('fs')
const { createGzip } = require('zlib')

const [, , src, dest] = process.argv

pipeline(
  createReadStream(src),
  createGzip(),
  createWriteStream(dest),
  function onEnd (err) {
    if (err) {
      console.error(`Error: ${err}`)
      process.exit(1)
    }

    console.log('Done!')
  }
)

Can pass multiple streams (they will be piped)

The last argument is a callback. If invoked with an error, it means the pipeline failed at some point.

All the streams are ended and destroyed correctly.

stream.pipeline is available in Node.js 10+

In older systems you can use pump - npm.im/pump

// stream-copy-gzip-pump.js
const pump = require('pump') // from npm
const { createReadStream, createWriteStream } = require('fs')
const { createGzip } = require('zlib')

const [, , src, dest] = process.argv

pump( // just swap pipeline with pump!
  createReadStream(src),
  createGzip(),
  createWriteStream(dest),
  function onEnd (err) {
    if (err) {
      console.error(`Error: ${err}`)
      process.exit(1)
    }

    console.log('Done!')
  }
)

pumpify(...streams) - npm.im/pumpify

Create reusable pieces of pipeline

Let's create EncGz, an application that helps us to read and write encrypted-gzipped files

// encgz-stream.js - utility library

const {
  createCipheriv,
  createDecipheriv,
  randomBytes,
  createHash
} = require('crypto')
const { createGzip, createGunzip } = require('zlib')
const pumpify = require('pumpify') // from npm

// calculates md5 of the secret (trimmed)
function getChiperKey (secret) {}

function createEncgz (secret) {
  const initVect = randomBytes(16)
  const cipherKey = getChiperKey(secret)
  const encryptStream = createCipheriv('aes256', cipherKey, initVect)
  const gzipStream = createGzip()

  const stream = pumpify(encryptStream, gzipStream)
  stream.initVect = initVect

  return stream
}
// encgz-stream.js (...continue from previous slide)

function createDecgz (secret, initVect) {
  const cipherKey = getChiperKey(secret)
  const decryptStream = createDecipheriv('aes256', cipherKey, initVect)
  const gunzipStream = createGunzip()

  const stream = pumpify(gunzipStream, decryptStream)
  return stream
}

module.exports = {
  createEncgz,
  createDecgz
}
// encgz.js - CLI to encrypt and gzip (from stdin to stdout)

const { pipeline } = require('stream')
const { createEncgz } = require('./encgz-stream')

const [, , secret] = process.argv

const encgz = createEncgz(secret)
console.error(`init vector: ${encgz.initVect.toString('hex')}`)

pipeline(
  process.stdin,
  encgz,
  process.stdout,
  function onEnd (err) {
    if (err) {
      console.error(`Error: ${err}`)
      process.exit(1)
    }
  }
)
// decgz.js - CLI to gunzip and decrypt (from stdin to stdout)

const { pipeline } = require('stream')
const { createDecgz } = require('./encgz-stream')

const [, , secret, initVect] = process.argv

const decgz = createDecgz(secret, Buffer.from(initVect, 'hex'))

pipeline(
  process.stdin,
  decgz,
  process.stdout,
  function onEnd (err) {
    if (err) {
      console.error(`Error: ${err}`)
      process.exit(1)
    }
  }
)

stream.finished(streams, callback)

Get notified when a stream is not readable, writable anymore - Node.js 10+

// finished can be promisified!
const finished = util.promisify(stream.finished)

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream is done reading.')
}

run().catch(console.error)
rs.resume() // start & drain the stream

readable-stream - npm.im/readable-stream

Npm package that contains the latest version of Node.js stream library.

It also makes Node.js streams compatible with the browser (can be used with Webpack and Broswserify)

* yeah, the name is misleading. The package offers all the functionalities in the official 'stream' package, not just readable streams.

*

04. Writing custom            streams

// emoji-stream.js (custom readable stream)
const { EMOJI_MAP } = require('emoji') // from npm
const { Readable } = require('readable-stream') // from npm
const emojis = Object.keys(EMOJI_MAP)
function getEmojiDescription (index) {
  return EMOJI_MAP[emojis[index]][1]
}
function getMessage (index) {
  return emojis[index] + ' ' + getEmojiDescription(index)
}

class EmojiStream extends Readable {
  constructor (options) {
    super(options)
    this._index = 0
  }

  _read () {
    if (this._index >= emojis.length) {
      return this.push(null)
    }
    return this.push(getMessage(this._index++))
  }
}

module.exports = EmojiStream
// uppercasify.js (custom transform stream)

const { Transform } = require('readable-stream')

class Uppercasify extends Transform {
  _transform (chunk, encoding, done) {
    this.push(chunk.toString().toUpperCase())
    done()
  }

  _flush () {
    // in case there's buffered data 
    // that still have to be pushed
  }
}

module.exports = Uppercasify
// dom-append.js (custom writable stream)

const { Writable } = require('readable-stream')

class DOMAppend extends Writable {

  constructor (target, tag = 'p', options) {
    super(options)
    this._target = target
    this._tag = tag
  }

  _write (chunk, encoding, done) {
    const elem = document.createElement(this._tag)
    const content = document.createTextNode(chunk.toString())
    elem.appendChild(content)
    this._target.appendChild(elem)
    done()
  }
}

module.exports = DOMAppend

05. Streams in the              browser

// browser/app.js

const EmojiStream = require('../emoji-stream')
const Uppercasify = require('../uppercasify')
const DOMAppend = require('../dom-append')

const list = document.getElementById('list')

const emoji = new EmojiStream()
const uppercasify = new Uppercasify()
const append = new DOMAppend(list, 'li')

emoji
  .pipe(uppercasify)
  .pipe(append)
npm i --save-dev webpack webpack-cli

node_modules/.bin/webpack src/browser/app.js

# creates dist/main.js

mv dist/main.js src/browser/app-bundle.js

Let's use webpack to build this app for the browser

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8" />
    <meta
      name="viewport"
      content="width=device-width, initial-scale=1, shrink-to-fit=no"
    />
    <title>Streams in the browser!</title>
  </head>
  <body>
    <ul id="list"></ul>
    <script src="app.bundle.js"></script>
  </body>
</html>

Finally let's create an index.html

06. Closing

  • Streams have low memory footprint
  • Process data as soon as it's available
  • Composition through pipelines
  • Define your logic as transform streams
  • Readable and writable streams are good abstractions for input & output
  • You can swap them as needed without having to change the logic

TLDR;

If you want to learn (even) moar 🐻about streams...

Credits

Cover Photo by WeRoad on Unsplash

emojiart.org for the amazing St. Patrick emoji art

The internet for the memes! :D

Special thanks

THANKS!