Another pipe() deadlock since v14.17

Version

v20.3.1

Platform

Darwin silmaril.home 22.5.0 Darwin Kernel Version 22.5.0: Thu Jun 8 22:22:19 PDT 2023; root:xnu-8796.121.3~7/RELEASE_ARM64_T8103 arm64

Subsystem

stream

What steps will reproduce the bug?

const { Readable, Writable } = require('stream');

(async () => {

    // Prepare src that is internally ended, with buffered data pending

    const src = new Readable({ _read() {} });
    src.push(Buffer.alloc(100));
    src.push(null);
    src.pause();

    await new Promise((resolve) => setImmediate(resolve));       // Give it time to settle

    const dst = new Writable({
        highWaterMark: 1000,
        write(buf, enc, cb) {

            console.log('write', buf.length);

            // Delay 1 tick to allow writableNeedDrain=true

            process.nextTick(cb);
        }
    });

    dst.write(Buffer.alloc(1000));                               // Fill write buffer

    //src.resume();
    src.pipe(dst);

    await new Promise((resolve) => setImmediate(resolve));       // Give it time to settle

    console.log('src buffer', src.readableLength);
})();

How often does it reproduce? Is there a required condition?

100% for test code since node v14.17.0.

This seems to be a corner case that depends on:

  1. Source stream is already ended.
  2. Destination needs a drain.

What is the expected behavior? Why is that the expected behavior?

Stream starts flowing and drains through the writable:

write 1000
write 100
src buffer 0

What do you see instead?

Stream deadlocks:

write 1000
src buffer 100

Additional information

#36563 fixes a another pipe deadlock introduced in #35348. This was introduced in v16.0 and backported to v14 in v14.17. However, the fix is not complete, and pipe() can still deadlock as demonstrated!

I expect the issue is caused by the conditional calling of pause() here:

if (dest.writableNeedDrain === true) {
if (state.flowing) {
pause();
}

To verify this, I tried to call src.resume() right before the call to src.pipe(). This makes state.flowing === true, and ensures pause() is called.

As far I can tell, this issue is present in all active and maintenance release lines, and was introduced into v14 during the LTS cycle. This will likely also affect the current readable-stream.

Until this is fixed, or for anyone still using v14 (and possibly v16 depending on how critical a fix is regarded), a workaround is to always call stream.resume() before calling stream.pipe().