worker: make MessagePort inherit from EventTarget · nodejs/node@0aa3809

@@ -24,20 +24,23 @@ const {

2424

stopMessagePort

2525

} = internalBinding('messaging');

2626

const {

27-

threadId,

2827

getEnvMessagePort

2928

} = internalBinding('worker');

30293130

const { Readable, Writable } = require('stream');

32-

const EventEmitter = require('events');

31+

const {

32+

Event,

33+

NodeEventTarget,

34+

defineEventHandler,

35+

initNodeEventTarget,

36+

kCreateEvent,

37+

kNewListener,

38+

kRemoveListener,

39+

} = require('internal/event_target');

3340

const { inspect } = require('internal/util/inspect');

34-

let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {

35-

debug = fn;

36-

});

37413842

const kIncrementsPortRef = Symbol('kIncrementsPortRef');

3943

const kName = Symbol('kName');

40-

const kOnMessageListener = Symbol('kOnMessageListener');

4144

const kPort = Symbol('kPort');

4245

const kWaitingStreams = Symbol('kWaitingStreams');

4346

const kWritableCallbacks = Symbol('kWritableCallbacks');

@@ -54,55 +57,47 @@ const messageTypes = {

5457

};

55585659

// We have to mess with the MessagePort prototype a bit, so that a) we can make

57-

// it inherit from EventEmitter, even though it is a C++ class, and b) we do

60+

// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do

5861

// not provide methods that are not present in the Browser and not documented

5962

// on our side (e.g. hasRef).

6063

// Save a copy of the original set of methods as a shallow clone.

6164

const MessagePortPrototype = ObjectCreate(

6265

ObjectGetPrototypeOf(MessagePort.prototype),

6366

ObjectGetOwnPropertyDescriptors(MessagePort.prototype));

6467

// Set up the new inheritance chain.

65-

ObjectSetPrototypeOf(MessagePort, EventEmitter);

66-

ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);

68+

ObjectSetPrototypeOf(MessagePort, NodeEventTarget);

69+

ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);

6770

// Copy methods that are inherited from HandleWrap, because

6871

// changing the prototype of MessagePort.prototype implicitly removed them.

6972

MessagePort.prototype.ref = MessagePortPrototype.ref;

7073

MessagePort.prototype.unref = MessagePortPrototype.unref;

717472-

// A communication channel consisting of a handle (that wraps around an

73-

// uv_async_t) which can receive information from other threads and emits

74-

// .onmessage events, and a function used for sending data to a MessagePort

75-

// in some other thread.

76-

MessagePort.prototype[kOnMessageListener] = function onmessage(event) {

77-

if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)

78-

debug(`[${threadId}] received message`, event);

79-

// Emit the deserialized object to userland.

80-

this.emit('message', event.data);

81-

};

82-83-

// This is for compatibility with the Web's MessagePort API. It makes sense to

84-

// provide it as an `EventEmitter` in Node.js, but if somebody overrides

85-

// `onmessage`, we'll switch over to the Web API model.

86-

ObjectDefineProperty(MessagePort.prototype, 'onmessage', {

87-

enumerable: true,

88-

configurable: true,

89-

get() {

90-

return this[kOnMessageListener];

91-

},

92-

set(value) {

93-

this[kOnMessageListener] = value;

94-

if (typeof value === 'function') {

95-

this.ref();

96-

MessagePortPrototype.start.call(this);

97-

} else {

98-

this.unref();

99-

stopMessagePort(this);

100-

}

75+

class MessageEvent extends Event {

76+

constructor(data, target, type) {

77+

super(type);

78+

this.data = data;

10179

}

102-

});

80+

}

81+82+

ObjectDefineProperty(

83+

MessagePort.prototype,

84+

kCreateEvent,

85+

{

86+

value: function(data, type) {

87+

return new MessageEvent(data, this, type);

88+

},

89+

configurable: false,

90+

writable: false,

91+

enumerable: false,

92+

});

1039310494

// This is called from inside the `MessagePort` constructor.

10595

function oninit() {

96+

initNodeEventTarget(this);

97+

// TODO(addaleax): This should be on MessagePort.prototype, but

98+

// defineEventHandler() does not support that.

99+

defineEventHandler(this, 'message');

100+

defineEventHandler(this, 'messageerror');

106101

setupPortReferencing(this, this, 'message');

107102

}

108103

@@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {

112107

value: oninit

113108

});

114109110+

class MessagePortCloseEvent extends Event {

111+

constructor() {

112+

super('close');

113+

}

114+

}

115+115116

// This is called after the underlying `uv_async_t` has been closed.

116117

function onclose() {

117-

this.emit('close');

118+

this.dispatchEvent(new MessagePortCloseEvent());

118119

}

119120120121

ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {

@@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {

156157

// If there are none or all are removed, unref() the channel so the worker

157158

// can shutdown gracefully.

158159

port.unref();

159-

eventEmitter.on('newListener', (name) => {

160-

if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {

160+

eventEmitter.on('newListener', function(name) {

161+

if (name === eventName) newListener(eventEmitter.listenerCount(name));

162+

});

163+

eventEmitter.on('removeListener', function(name) {

164+

if (name === eventName) removeListener(eventEmitter.listenerCount(name));

165+

});

166+

const origNewListener = eventEmitter[kNewListener];

167+

eventEmitter[kNewListener] = function(size, type, ...args) {

168+

if (type === eventName) newListener(size - 1);

169+

return origNewListener.call(this, size, type, ...args);

170+

};

171+

const origRemoveListener = eventEmitter[kRemoveListener];

172+

eventEmitter[kRemoveListener] = function(size, type, ...args) {

173+

if (type === eventName) removeListener(size);

174+

return origRemoveListener.call(this, size, type, ...args);

175+

};

176+177+

function newListener(size) {

178+

if (size === 0) {

161179

port.ref();

162180

MessagePortPrototype.start.call(port);

163181

}

164-

});

165-

eventEmitter.on('removeListener', (name) => {

166-

if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {

182+

}

183+184+

function removeListener(size) {

185+

if (size === 0) {

167186

stopMessagePort(port);

168187

port.unref();

169188

}

170-

});

189+

}

171190

}

172191173192