stream.pipeline does not wait for the last stream to flush before calling the final callback
- Version: v12.8.0
- Platform: macOS 10.15.4 (Catalina)
- Subsystem: stream
What steps will reproduce the bug?
// reproduce.js const util = require('util'); const stream = require('stream'); const call = async (fn, ...args) => fn(...args); const map = (fn) => { const tx = new stream.Transform({ objectMode: true }); tx._transform = (chunk, enc, cb) => call(fn, chunk).then( (modified) => cb(null, modified), (error) => cb(error), ); return tx; }; const tap = (fn) => { const tx = new stream.Transform({ objectMode: true }); tx._transform = (chunk, enc, cb) => call(fn, chunk).then( () => cb(null, chunk), (error) => cb(error), ); return tx; }; const fork = (...t) => { let done; let doneError; let flush; const tx = new stream.Transform({ objectMode: true }); const pt = new stream.PassThrough({ objectMode: true }); stream.pipeline(pt, ...t, (error) => { done = true; doneError = error; flush && flush(doneError); }); tx._flush = (cb) => { pt.push(null); flush = cb; done && flush(doneError); }; tx._transform = (chunk, enc, cb) => { pt.push(chunk, enc); cb(null, chunk); }; return tx; }; const readableStream = new stream.PassThrough({ objectMode: true }); const pipeline = util.promisify(stream.pipeline); async function run() { await pipeline( readableStream, fork( tap(() => console.log('fork 1: do something with obj for 2s')), map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))), tap(() => console.log('fork 1 done!')), ), fork( tap(() => console.log('fork 2: do something with obj for 4s')), map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))), tap(() => console.log('fork 2 done!')), ), fork( tap(() => console.log('fork 3: do something with obj for 6s')), map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))), tap(() => console.log('fork 3 done!')), ), // new stream.PassThrough({ objectMode: true }), // ^___ adding an extra stream in the pipeline seems to fix the problem ); console.log('done!'); } run().catch(console.error); readableStream.push({ name: 'test' }); readableStream.push(null);
How often does it reproduce? Is there a required condition?
always
What is the expected behavior?
Console output should look like:
fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!
What do you see instead?
Console output actually looks like:
fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!
Additional information
As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.
await pipeline( ... new stream.PassThrough({ objectMode: true }), ); console.log('done!);