Maint fixes by nkaradzhov · Pull Request #3086 · redis/node-redis
@@ -1,5 +1,5 @@
import COMMANDS from '../commands';
import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket';
import RedisSocket, { RedisSocketOptions } from './socket';
import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx';
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
import { EventEmitter } from 'node:events';
Expand Down
Expand Up
@@ -154,7 +154,7 @@ export interface RedisClientOptions<
*
* The default is `auto`.
*/
maintPushNotifications?: 'disabled' | 'enabled' | 'auto';
maintNotifications?: 'disabled' | 'enabled' | 'auto';
/**
* Controls how the client requests the endpoint to reconnect to during a MOVING notification in Redis Enterprise maintenance.
*
Expand All
@@ -167,19 +167,19 @@ export interface RedisClientOptions<
* The default is `auto`. */ maintMovingEndpointType?: MovingEndpointType; maintEndpointType?: MovingEndpointType; /** * Specifies a more relaxed timeout (in milliseconds) for commands during a maintenance window. * This helps minimize command timeouts during maintenance. If not provided, the `commandOptions.timeout` * will be used instead. Timeouts during maintenance period result in a `CommandTimeoutDuringMaintenance` error. * This helps minimize command timeouts during maintenance. Timeouts during maintenance period result * in a `CommandTimeoutDuringMaintenance` error. * * The default is 10000 */ maintRelaxedCommandTimeout?: number; /** * Specifies a more relaxed timeout (in milliseconds) for the socket during a maintenance window. * This helps minimize socket timeouts during maintenance. If not provided, the `socket.timeout` * will be used instead. Timeouts during maintenance period result in a `SocketTimeoutDuringMaintenance` error. * This helps minimize socket timeouts during maintenance. Timeouts during maintenance period result * in a `SocketTimeoutDuringMaintenance` error. * * The default is 10000 */ Expand Down Expand Up @@ -429,7 +429,7 @@ export default class RedisClient< return parsed; }
readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>; readonly #options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>; #socket: RedisSocket; readonly #queue: RedisCommandsQueue; #selectedDB = 0; Expand All @@ -453,7 +453,7 @@ export default class RedisClient< return this._self.#clientSideCache; }
get options(): RedisClientOptions<M, F, S, RESP> | undefined { get options(): RedisClientOptions<M, F, S, RESP> { return this._self.#options; }
Expand Down Expand Up @@ -503,15 +503,15 @@ export default class RedisClient< this.#socket = this.#initiateSocket();
if(options?.maintPushNotifications !== 'disabled') { new EnterpriseMaintenanceManager(this.#queue, this, this.#options!); if(this.#options.maintNotifications !== 'disabled') { new EnterpriseMaintenanceManager(this.#queue, this, this.#options); };
if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { this.#clientSideCache = options.clientSideCache; if (this.#options.clientSideCache) { if (this.#options.clientSideCache instanceof ClientSideCacheProvider) { this.#clientSideCache = this.#options.clientSideCache; } else { const cscConfig = options.clientSideCache; const cscConfig = this.#options.clientSideCache; this.#clientSideCache = new BasicClientSideCache(cscConfig); } this.#queue.addPushHandler((push: Array<any>): boolean => { Expand All @@ -535,16 +535,16 @@ export default class RedisClient< throw new Error('Client Side Caching is only supported with RESP3'); }
if (options?.maintPushNotifications && options?.maintPushNotifications !== 'disabled' && options?.RESP !== 3) { if (options?.maintNotifications && options?.maintNotifications !== 'disabled' && options?.RESP !== 3) { throw new Error('Graceful Maintenance is only supported with RESP3'); }
}
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined { #initiateOptions(options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> = {}): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> {
// Convert username/password to credentialsProvider if no credentialsProvider is already in place if (!options?.credentialsProvider && (options?.username || options?.password)) { if (!options.credentialsProvider && (options.username || options.password)) {
options.credentialsProvider = { type: 'async-credentials-provider', Expand All @@ -555,19 +555,19 @@ export default class RedisClient< }; }
if (options?.database) { if (options.database) { this._self.#selectedDB = options.database; }
if (options?.commandOptions) { if (options.commandOptions) { this._commandOptions = options.commandOptions; }
if(options?.maintPushNotifications !== 'disabled') { EnterpriseMaintenanceManager.setupDefaultMaintOptions(options!); if(options.maintNotifications !== 'disabled') { EnterpriseMaintenanceManager.setupDefaultMaintOptions(options); }
if (options?.url) { if (options.url) { const parsedOptions = RedisClient.parseOptions(options); if (parsedOptions?.database) { this._self.#selectedDB = parsedOptions.database; Expand All @@ -580,8 +580,8 @@ export default class RedisClient<
#initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue( this.#options?.RESP ?? 2, this.#options?.commandsQueueMaxLength, this.#options.RESP ?? 2, this.#options.commandsQueueMaxLength, (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) ); } Expand All @@ -591,7 +591,7 @@ export default class RedisClient< */ private reAuthenticate = async (credentials: BasicAuth) => { // Re-authentication is not supported on RESP2 with PubSub active if (!(this.isPubSubActive && !this.#options?.RESP)) { if (!(this.isPubSubActive && !this.#options.RESP)) { await this.sendCommand( parseArgs(COMMANDS.AUTH, { username: credentials.username, Expand Down Expand Up @@ -640,9 +640,9 @@ export default class RedisClient< Array<{ cmd: CommandArguments } & { errorHandler?: (err: Error) => void }> > { const commands = []; const cp = this.#options?.credentialsProvider; const cp = this.#options.credentialsProvider;
if (this.#options?.RESP) { if (this.#options.RESP) { const hello: HelloOptions = {};
if (cp && cp.type === 'async-credentials-provider') { Expand Down Expand Up @@ -702,7 +702,7 @@ export default class RedisClient< } }
if (this.#options?.name) { if (this.#options.name) { commands.push({ cmd: parseArgs(COMMANDS.CLIENT_SETNAME, this.#options.name) }); Expand All @@ -713,11 +713,11 @@ export default class RedisClient< commands.push({ cmd: ['SELECT', this.#selectedDB.toString()] }); }
if (this.#options?.readonly) { if (this.#options.readonly) { commands.push({ cmd: parseArgs(COMMANDS.READONLY) }); }
if (!this.#options?.disableClientInfo) { if (!this.#options.disableClientInfo) { commands.push({ cmd: ['CLIENT', 'SETINFO', 'LIB-VER', version], errorHandler: () => { Expand All @@ -732,7 +732,7 @@ export default class RedisClient< 'CLIENT', 'SETINFO', 'LIB-NAME', this.#options?.clientInfoTag this.#options.clientInfoTag ? `node-redis(${this.#options.clientInfoTag})` : 'node-redis' ], Expand All @@ -748,8 +748,7 @@ export default class RedisClient< commands.push({cmd: this.#clientSideCache.trackingOn()}); }
const { tls, host } = this.#options!.socket as RedisTcpSocketOptions; const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(!!tls, host!, this.#options!); const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options); if(maintenanceHandshakeCmd) { commands.push(maintenanceHandshakeCmd); }; Expand All @@ -769,7 +768,7 @@ export default class RedisClient< .on('error', err => { this.emit('error', err); this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { if (this.#socket.isOpen && !this.#options.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { this.#queue.flushAll(err); Expand Down Expand Up @@ -817,15 +816,15 @@ export default class RedisClient< } };
const socket = new RedisSocket(socketInitiator, this.#options?.socket); const socket = new RedisSocket(socketInitiator, this.#options.socket); this.#attachListeners(socket); return socket; }
#pingTimer?: NodeJS.Timeout;
#setPingTimer(): void { if (!this.#options?.pingInterval || !this.#socket.isReady) return; if (!this.#options.pingInterval || !this.#socket.isReady) return; clearTimeout(this.#pingTimer);
this.#pingTimer = setTimeout(() => { Expand Down Expand Up @@ -986,7 +985,7 @@ export default class RedisClient< transformReply: TransformReply | undefined, ) { const csc = this._self.#clientSideCache; const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; const defaultTypeMapping = this._self.#options.commandOptions === commandOptions;
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
Expand Down Expand Up @@ -1035,7 +1034,7 @@ export default class RedisClient< ): Promise<T> { if (!this._self.#socket.isOpen) { return Promise.reject(new ClientClosedError()); } else if (!this._self.#socket.isReady && this._self.#options?.disableOfflineQueue) { } else if (!this._self.#socket.isReady && this._self.#options.disableOfflineQueue) { return Promise.reject(new ClientOfflineError()); }
Expand Down
* The default is `auto`. */ maintMovingEndpointType?: MovingEndpointType; maintEndpointType?: MovingEndpointType; /** * Specifies a more relaxed timeout (in milliseconds) for commands during a maintenance window. * This helps minimize command timeouts during maintenance. If not provided, the `commandOptions.timeout` * will be used instead. Timeouts during maintenance period result in a `CommandTimeoutDuringMaintenance` error. * This helps minimize command timeouts during maintenance. Timeouts during maintenance period result * in a `CommandTimeoutDuringMaintenance` error. * * The default is 10000 */ maintRelaxedCommandTimeout?: number; /** * Specifies a more relaxed timeout (in milliseconds) for the socket during a maintenance window. * This helps minimize socket timeouts during maintenance. If not provided, the `socket.timeout` * will be used instead. Timeouts during maintenance period result in a `SocketTimeoutDuringMaintenance` error. * This helps minimize socket timeouts during maintenance. Timeouts during maintenance period result * in a `SocketTimeoutDuringMaintenance` error. * * The default is 10000 */ Expand Down Expand Up @@ -429,7 +429,7 @@ export default class RedisClient< return parsed; }
readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>; readonly #options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>; #socket: RedisSocket; readonly #queue: RedisCommandsQueue; #selectedDB = 0; Expand All @@ -453,7 +453,7 @@ export default class RedisClient< return this._self.#clientSideCache; }
get options(): RedisClientOptions<M, F, S, RESP> | undefined { get options(): RedisClientOptions<M, F, S, RESP> { return this._self.#options; }
Expand Down Expand Up @@ -503,15 +503,15 @@ export default class RedisClient< this.#socket = this.#initiateSocket();
if(options?.maintPushNotifications !== 'disabled') { new EnterpriseMaintenanceManager(this.#queue, this, this.#options!); if(this.#options.maintNotifications !== 'disabled') { new EnterpriseMaintenanceManager(this.#queue, this, this.#options); };
if (options?.clientSideCache) { if (options.clientSideCache instanceof ClientSideCacheProvider) { this.#clientSideCache = options.clientSideCache; if (this.#options.clientSideCache) { if (this.#options.clientSideCache instanceof ClientSideCacheProvider) { this.#clientSideCache = this.#options.clientSideCache; } else { const cscConfig = options.clientSideCache; const cscConfig = this.#options.clientSideCache; this.#clientSideCache = new BasicClientSideCache(cscConfig); } this.#queue.addPushHandler((push: Array<any>): boolean => { Expand All @@ -535,16 +535,16 @@ export default class RedisClient< throw new Error('Client Side Caching is only supported with RESP3'); }
if (options?.maintPushNotifications && options?.maintPushNotifications !== 'disabled' && options?.RESP !== 3) { if (options?.maintNotifications && options?.maintNotifications !== 'disabled' && options?.RESP !== 3) { throw new Error('Graceful Maintenance is only supported with RESP3'); }
}
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined { #initiateOptions(options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> = {}): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> {
// Convert username/password to credentialsProvider if no credentialsProvider is already in place if (!options?.credentialsProvider && (options?.username || options?.password)) { if (!options.credentialsProvider && (options.username || options.password)) {
options.credentialsProvider = { type: 'async-credentials-provider', Expand All @@ -555,19 +555,19 @@ export default class RedisClient< }; }
if (options?.database) { if (options.database) { this._self.#selectedDB = options.database; }
if (options?.commandOptions) { if (options.commandOptions) { this._commandOptions = options.commandOptions; }
if(options?.maintPushNotifications !== 'disabled') { EnterpriseMaintenanceManager.setupDefaultMaintOptions(options!); if(options.maintNotifications !== 'disabled') { EnterpriseMaintenanceManager.setupDefaultMaintOptions(options); }
if (options?.url) { if (options.url) { const parsedOptions = RedisClient.parseOptions(options); if (parsedOptions?.database) { this._self.#selectedDB = parsedOptions.database; Expand All @@ -580,8 +580,8 @@ export default class RedisClient<
#initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue( this.#options?.RESP ?? 2, this.#options?.commandsQueueMaxLength, this.#options.RESP ?? 2, this.#options.commandsQueueMaxLength, (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) ); } Expand All @@ -591,7 +591,7 @@ export default class RedisClient< */ private reAuthenticate = async (credentials: BasicAuth) => { // Re-authentication is not supported on RESP2 with PubSub active if (!(this.isPubSubActive && !this.#options?.RESP)) { if (!(this.isPubSubActive && !this.#options.RESP)) { await this.sendCommand( parseArgs(COMMANDS.AUTH, { username: credentials.username, Expand Down Expand Up @@ -640,9 +640,9 @@ export default class RedisClient< Array<{ cmd: CommandArguments } & { errorHandler?: (err: Error) => void }> > { const commands = []; const cp = this.#options?.credentialsProvider; const cp = this.#options.credentialsProvider;
if (this.#options?.RESP) { if (this.#options.RESP) { const hello: HelloOptions = {};
if (cp && cp.type === 'async-credentials-provider') { Expand Down Expand Up @@ -702,7 +702,7 @@ export default class RedisClient< } }
if (this.#options?.name) { if (this.#options.name) { commands.push({ cmd: parseArgs(COMMANDS.CLIENT_SETNAME, this.#options.name) }); Expand All @@ -713,11 +713,11 @@ export default class RedisClient< commands.push({ cmd: ['SELECT', this.#selectedDB.toString()] }); }
if (this.#options?.readonly) { if (this.#options.readonly) { commands.push({ cmd: parseArgs(COMMANDS.READONLY) }); }
if (!this.#options?.disableClientInfo) { if (!this.#options.disableClientInfo) { commands.push({ cmd: ['CLIENT', 'SETINFO', 'LIB-VER', version], errorHandler: () => { Expand All @@ -732,7 +732,7 @@ export default class RedisClient< 'CLIENT', 'SETINFO', 'LIB-NAME', this.#options?.clientInfoTag this.#options.clientInfoTag ? `node-redis(${this.#options.clientInfoTag})` : 'node-redis' ], Expand All @@ -748,8 +748,7 @@ export default class RedisClient< commands.push({cmd: this.#clientSideCache.trackingOn()}); }
const { tls, host } = this.#options!.socket as RedisTcpSocketOptions; const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(!!tls, host!, this.#options!); const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options); if(maintenanceHandshakeCmd) { commands.push(maintenanceHandshakeCmd); }; Expand All @@ -769,7 +768,7 @@ export default class RedisClient< .on('error', err => { this.emit('error', err); this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { if (this.#socket.isOpen && !this.#options.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { this.#queue.flushAll(err); Expand Down Expand Up @@ -817,15 +816,15 @@ export default class RedisClient< } };
const socket = new RedisSocket(socketInitiator, this.#options?.socket); const socket = new RedisSocket(socketInitiator, this.#options.socket); this.#attachListeners(socket); return socket; }
#pingTimer?: NodeJS.Timeout;
#setPingTimer(): void { if (!this.#options?.pingInterval || !this.#socket.isReady) return; if (!this.#options.pingInterval || !this.#socket.isReady) return; clearTimeout(this.#pingTimer);
this.#pingTimer = setTimeout(() => { Expand Down Expand Up @@ -986,7 +985,7 @@ export default class RedisClient< transformReply: TransformReply | undefined, ) { const csc = this._self.#clientSideCache; const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; const defaultTypeMapping = this._self.#options.commandOptions === commandOptions;
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
Expand Down Expand Up @@ -1035,7 +1034,7 @@ export default class RedisClient< ): Promise<T> { if (!this._self.#socket.isOpen) { return Promise.reject(new ClientClosedError()); } else if (!this._self.#socket.isReady && this._self.#options?.disableOfflineQueue) { } else if (!this._self.#socket.isReady && this._self.#options.disableOfflineQueue) { return Promise.reject(new ClientOfflineError()); }
Expand Down