worker: make MessagePort inherit from EventTarget · nodejs/node@0aa3809
@@ -24,20 +24,23 @@ const {
2424 stopMessagePort
2525} = internalBinding('messaging');
2626const {
27- threadId,
2827 getEnvMessagePort
2928} = internalBinding('worker');
30293130const { Readable, Writable } = require('stream');
32-const EventEmitter = require('events');
31+const {
32+ Event,
33+ NodeEventTarget,
34+ defineEventHandler,
35+ initNodeEventTarget,
36+ kCreateEvent,
37+ kNewListener,
38+ kRemoveListener,
39+} = require('internal/event_target');
3340const { inspect } = require('internal/util/inspect');
34-let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
35-debug = fn;
36-});
37413842const kIncrementsPortRef = Symbol('kIncrementsPortRef');
3943const kName = Symbol('kName');
40-const kOnMessageListener = Symbol('kOnMessageListener');
4144const kPort = Symbol('kPort');
4245const kWaitingStreams = Symbol('kWaitingStreams');
4346const kWritableCallbacks = Symbol('kWritableCallbacks');
@@ -54,55 +57,47 @@ const messageTypes = {
5457};
55585659// We have to mess with the MessagePort prototype a bit, so that a) we can make
57-// it inherit from EventEmitter, even though it is a C++ class, and b) we do
60+// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
5861// not provide methods that are not present in the Browser and not documented
5962// on our side (e.g. hasRef).
6063// Save a copy of the original set of methods as a shallow clone.
6164const MessagePortPrototype = ObjectCreate(
6265ObjectGetPrototypeOf(MessagePort.prototype),
6366ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
6467// Set up the new inheritance chain.
65-ObjectSetPrototypeOf(MessagePort, EventEmitter);
66-ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
68+ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
69+ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
6770// Copy methods that are inherited from HandleWrap, because
6871// changing the prototype of MessagePort.prototype implicitly removed them.
6972MessagePort.prototype.ref = MessagePortPrototype.ref;
7073MessagePort.prototype.unref = MessagePortPrototype.unref;
717472-// A communication channel consisting of a handle (that wraps around an
73-// uv_async_t) which can receive information from other threads and emits
74-// .onmessage events, and a function used for sending data to a MessagePort
75-// in some other thread.
76-MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
77-if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
78-debug(`[${threadId}] received message`, event);
79-// Emit the deserialized object to userland.
80-this.emit('message', event.data);
81-};
82-83-// This is for compatibility with the Web's MessagePort API. It makes sense to
84-// provide it as an `EventEmitter` in Node.js, but if somebody overrides
85-// `onmessage`, we'll switch over to the Web API model.
86-ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
87-enumerable: true,
88-configurable: true,
89-get() {
90-return this[kOnMessageListener];
91-},
92-set(value) {
93-this[kOnMessageListener] = value;
94-if (typeof value === 'function') {
95-this.ref();
96-MessagePortPrototype.start.call(this);
97-} else {
98-this.unref();
99-stopMessagePort(this);
100-}
75+class MessageEvent extends Event {
76+constructor(data, target, type) {
77+super(type);
78+this.data = data;
10179}
102-});
80+}
81+82+ObjectDefineProperty(
83+MessagePort.prototype,
84+kCreateEvent,
85+{
86+value: function(data, type) {
87+return new MessageEvent(data, this, type);
88+},
89+configurable: false,
90+writable: false,
91+enumerable: false,
92+});
1039310494// This is called from inside the `MessagePort` constructor.
10595function oninit() {
96+initNodeEventTarget(this);
97+// TODO(addaleax): This should be on MessagePort.prototype, but
98+// defineEventHandler() does not support that.
99+defineEventHandler(this, 'message');
100+defineEventHandler(this, 'messageerror');
106101setupPortReferencing(this, this, 'message');
107102}
108103@@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
112107value: oninit
113108});
114109110+class MessagePortCloseEvent extends Event {
111+constructor() {
112+super('close');
113+}
114+}
115+115116// This is called after the underlying `uv_async_t` has been closed.
116117function onclose() {
117-this.emit('close');
118+this.dispatchEvent(new MessagePortCloseEvent());
118119}
119120120121ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
@@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
156157// If there are none or all are removed, unref() the channel so the worker
157158// can shutdown gracefully.
158159port.unref();
159-eventEmitter.on('newListener', (name) => {
160-if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
160+eventEmitter.on('newListener', function(name) {
161+if (name === eventName) newListener(eventEmitter.listenerCount(name));
162+});
163+eventEmitter.on('removeListener', function(name) {
164+if (name === eventName) removeListener(eventEmitter.listenerCount(name));
165+});
166+const origNewListener = eventEmitter[kNewListener];
167+eventEmitter[kNewListener] = function(size, type, ...args) {
168+if (type === eventName) newListener(size - 1);
169+return origNewListener.call(this, size, type, ...args);
170+};
171+const origRemoveListener = eventEmitter[kRemoveListener];
172+eventEmitter[kRemoveListener] = function(size, type, ...args) {
173+if (type === eventName) removeListener(size);
174+return origRemoveListener.call(this, size, type, ...args);
175+};
176+177+function newListener(size) {
178+if (size === 0) {
161179port.ref();
162180MessagePortPrototype.start.call(port);
163181}
164-});
165-eventEmitter.on('removeListener', (name) => {
166-if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
182+}
183+184+function removeListener(size) {
185+if (size === 0) {
167186stopMessagePort(port);
168187port.unref();
169188}
170-});
189+}
171190}
172191173192