stream.pipeline does not wait for the last stream to flush before calling the final callback

  • Version: v12.8.0
  • Platform: macOS 10.15.4 (Catalina)
  • Subsystem: stream

What steps will reproduce the bug?

// reproduce.js

const util = require('util');
const stream = require('stream');

const call = async (fn, ...args) => fn(...args);

const map = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      (modified) => cb(null, modified),
      (error) => cb(error),
    );

  return tx;
};

const tap = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      () => cb(null, chunk),
      (error) => cb(error),
    );

  return tx;
};

const fork = (...t) => {
  let done;
  let doneError;
  let flush;

  const tx = new stream.Transform({ objectMode: true });
  const pt = new stream.PassThrough({ objectMode: true });

  stream.pipeline(pt, ...t, (error) => {
    done = true;
    doneError = error;
    flush && flush(doneError);
  });

  tx._flush = (cb) => {
    pt.push(null);
    flush = cb;
    done && flush(doneError);
  };

  tx._transform = (chunk, enc, cb) => {
    pt.push(chunk, enc);
    cb(null, chunk);
  };

  return tx;
};

const readableStream = new stream.PassThrough({ objectMode: true });
const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    readableStream,
    fork(
      tap(() => console.log('fork 1: do something with obj for 2s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))),
      tap(() => console.log('fork 1 done!')),
    ),
    fork(
      tap(() => console.log('fork 2: do something with obj for 4s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))),
      tap(() => console.log('fork 2 done!')),
    ),
    fork(
      tap(() => console.log('fork 3: do something with obj for 6s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))),
      tap(() => console.log('fork 3 done!')),
    ),
    // new stream.PassThrough({ objectMode: true }),
    // ^___ adding an extra stream in the pipeline seems to fix the problem
  );
  console.log('done!');
}

run().catch(console.error);

readableStream.push({ name: 'test' });
readableStream.push(null);

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

Console output should look like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!

What do you see instead?

Console output actually looks like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!

Additional information

As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.

await pipeline(
    ...
    new stream.PassThrough({ objectMode: true }),
);

console.log('done!);