Monday, April 2, 2018

Node FeedParser & Transform streams

FeedParser is itself a Transform stream that operates in object mode. Nevertheless, in the majority of examples it appears at the end of a pipeline, e.g.:

let fp = new FeedParser()
fp.on('readable', () => {
    // get the data
})
my_readable_stream.pipe(fp)

Say we want to get first 2 headlines from an rss:

$ curl -s https://emacsel.com/mp3.xml | node headlines1.js | head -2
Episode 7 - Jorgen Schäfer
Episode 6 - Charles Lowell

Let's use FeedParser as god hath intended it as a transform stream that reads the rss from the stdin & writes the articles to another transform stream that grabs only the headlines &, in turn, pipes them to the stdout:

$ cat headlines1.js
let Transform = require('stream').Transform
let FeedParser = require('feedparser')

class Filter extends Transform {
    constructor() {
      super()
      this._writableState.objectMode = true // we can eat objects
    }
    _transform(input, encoding, done) {
      this.push(input.title + '\n')
      done()
    }
}

process.stdin.pipe(new FeedParser()).pipe(new Filter()).pipe(process.stdout)

(Type npm i feedparser before running the example.)

This works, although it throws an EPIPE error, because head command abruptly closes the stdout descriptor, while the script tries to write into it.

You may add smthg like

process.stdout.on('error', (e) => e.code !== 'EPIPE' && console.error(e))

to silence it, but it's better to use pump module (npm i pump) & catch all errors from all the streams. There's a chance that the module will be added to the node core, so get used to it already.

$ diff -u1 headlines1.js headlines3.js | tail -n+4
 let FeedParser = require('feedparser')
+let pump = require('pump')

@@ -14,2 +15,3 @@

-process.stdin.pipe(new FeedParser()).pipe(new Filter()).pipe(process.stdout)
+pump(process.stdin, new FeedParser(), new Filter(), process.stdout,
+     err => err && err.code !== 'EPIPE' && console.error(err))

Now, what if we want to control the exact number of articles our Filter stream receives? I.e., if an rss is many MBs long & we want only n articles from it? First, we add a CL parameter to our script:

$ cat headlines4.js
let Transform = require('stream').Transform
let FeedParser = require('feedparser')
let pump = require('pump')

class Filter extends Transform {
    constructor(articles_max) {
      super()
      this._writableState.objectMode = true // we can eat objects
      this.articles_max = articles_max
      this.articles_count = 0
    }
    _transform(input, encoding, done) {
      if (this.articles_count++ < this.articles_max) {
          this.push(input.title + '\n')
      } else {
          console.error('ignore', this.articles_count)
      }
      done()
    }
}

let articles_max = Number(process.argv.slice(2)) || 1
pump(process.stdin, new FeedParser(), new Filter(articles_max), process.stdout,
     err => err && err.code !== 'EPIPE' && console.error(err))

Although this works too, it still downloads & parses the articles we don't want:

$ curl -s https://emacsel.com/mp3.xml | node headlines4.js 2
Episode 7 - Jorgen Schäfer
Episode 6 - Charles Lowell
ignore 3
ignore 4
ignore 5
ignore 6
ignore 7

Unfortunately to be able to 'unpipe' a readable stream (from the Filter standpoint it's the FeedParser instance) we have to have a ref to it, & I don't know a way to get such a ref from a Transform stream within, except via explicitly passing a pointer:

$ diff -u headlines4.js headlines5.js | tail -n+4
 let pump = require('pump')

 class Filter extends Transform {
-    constructor(articles_max) {
+    constructor(articles_max, feedparser) {
      super()
      this._writableState.objectMode = true // we can eat objects
      this.articles_max = articles_max
      this.articles_count = 0
+
+     if (feedparser) {
+         this.once('unpipe', () => {
+             this.end()      // ensure 'finish' event gets emited
+         })
+     }
+     this.feedparser = feedparser
     }
     _transform(input, encoding, done) {
      if (this.articles_count++ < this.articles_max) {
          this.push(input.title + '\n')
      } else {
-         console.error('ignore', this.articles_count)
+         console.error('stop on', this.articles_count)
+         if (this.feedparser) this.feedparser.unpipe(this)
      }
      done()
     }
 }

 let articles_max = Number(process.argv.slice(2)) || 1
-pump(process.stdin, new FeedParser(), new Filter(articles_max), process.stdout,
+let fp = new FeedParser()
+pump(process.stdin, fp, new Filter(articles_max, fp), process.stdout,
      err => err && err.code !== 'EPIPE' && console.error(err))

Test:

$ curl -s https://emacsel.com/mp3.xml | node headlines5.js 2
Episode 7 - Jorgen Schäfer
Episode 6 - Charles Lowell
stop on 3

Grab the gist w/ a final version here.