[rb] add synchronization and error handling for socket interactions by titusfortner · Pull Request #16487 · SeleniumHQ/selenium

PR Code Suggestions ✨

Latest suggestions up to 627943c

CategorySuggestion                                                                                                                                    Impact
Possible issue
Set closing flag on listener exit

In attach_socket_listener, use an ensure block to set @closing = true when the
listener loop exits, including on error, to ensure a clean shutdown.

rb/lib/selenium/webdriver/common/websocket_connection.rb [128-152]

 def attach_socket_listener
   Thread.new do
     Thread.current.report_on_exception = false
 
-    loop do
-      break if @closing
-
-      incoming_frame << socket.readpartial(1024)
-
-      while (frame = incoming_frame.next)
+    begin
+      loop do
         break if @closing
 
-        message = process_frame(frame)
-        next unless message['method']
+        incoming_frame << socket.readpartial(1024)
 
-        params = message['params']
-        @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
-          @callback_threads.add(callback_thread(params, &callback))
+        while (frame = incoming_frame.next)
+          break if @closing
+
+          message = process_frame(frame)
+          next unless message['method']
+
+          params = message['params']
+          @callbacks_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+            @callback_threads.add(callback_thread(params, &callback))
+          end
         end
       end
+    rescue *CONNECTION_ERRORS, WebSocket::Error => e
+      WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws
+    ensure
+      @closing_mtx.synchronize { @closing = true }
     end
-  rescue *CONNECTION_ERRORS, WebSocket::Error => e
-    WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws
   end
 end
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly points out that an error in the socket listener should set the @closing flag to signal other threads that the connection is dead, preventing them from waiting indefinitely.

Medium
Enforce safe response timeout

In send_cmd, replace wait.until with a custom wait loop that checks the @closing
flag to avoid waiting for a response when the connection is being terminated.

rb/lib/selenium/webdriver/common/websocket_connection.rb [99-115]

 def send_cmd(**payload)
   id = next_id
   data = payload.merge(id: id)
   WebDriver.logger.debug "WebSocket -> #{data}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
   data = JSON.generate(data)
   out_frame = WebSocket::Frame::Outgoing::Client.new(version: ws.version, data: data, type: 'text')
 
   begin
     socket.write(out_frame.to_s)
   rescue *CONNECTION_ERRORS => e
-    raise e, "WebSocket is closed (#{e.class}: #{e.message})"
+    raise Error::WebDriverError, "WebSocket is closed (#{e.class}: #{e.message})"
   end
 
-  wait.until do
-    @messages_mtx.synchronize { messages.delete(id) }
+  deadline = Time.now + RESPONSE_WAIT_TIMEOUT
+  loop do
+    break if @closing
+    found = @messages_mtx.synchronize { messages.delete(id) }
+    return found if found
+    if Time.now > deadline
+      raise Error::WebDriverError, "Timed out waiting for WebSocket response to command id=#{id}"
+    end
+    sleep RESPONSE_WAIT_INTERVAL
   end
+
+  raise Error::WebDriverError, "Connection closing while waiting for response to command id=#{id}"
 end
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the wait.until block can wait for the full timeout even if the connection is closing, and proposes adding a check for the @closing flag to fail faster, which improves robustness.

Medium
General
Handle malformed frames safely

In process_frame, wrap JSON.parse in a begin/rescue block to handle
JSON::ParserError, preventing malformed data from crashing the socket listener
thread.

rb/lib/selenium/webdriver/common/websocket_connection.rb [158-169]

 def process_frame(frame)
-  message = frame.to_s
+  raw = frame.to_s
 
   # Firefox will periodically fail on unparsable empty frame
-  return {} if message.empty?
+  return {} if raw.nil? || raw.empty?
 
-  msg = JSON.parse(message)
-  @messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
+  begin
+    msg = JSON.parse(raw)
+  rescue JSON::ParserError => e
+    WebDriver.logger.debug "Ignoring unparsable WebSocket frame: #{e.class}: #{e.message}", id: :ws
+    return {}
+  end
+
+  if msg.key?('id')
+    @messages_mtx.synchronize { messages[msg['id']] = msg }
+  end
 
   WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
   msg
 end

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that a JSON::ParserError from a malformed frame would crash the listener thread, and proposes adding error handling to make the connection more resilient.

Medium
Learned
best practice
Guard message fields before use
Suggestion Impact:The commit introduced separate mutexes and began synchronizing access to callbacks/messages. It changed the callback iteration to duplicate callbacks under a dedicated messages mutex, addressing the race concern. However, it did not add the exact type guards for method and params as suggested.

code diff:

@@ -132,9 +138,8 @@
               message = process_frame(frame)
               next unless message['method']
 
-              params = message['params']
-              @mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
-                @callback_threads.add(callback_thread(params, &callback))
+              @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+                @callback_threads.add(callback_thread(message['params'], &callback))
               end
             end
           end
@@ -154,7 +159,7 @@
         return {} if message.empty?
 
         msg = JSON.parse(message)
-        @mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
+        @messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
 
         WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws

Add defensive checks to ensure message['params'] is a Hash before use and safely
duplicate callbacks to prevent nil errors and races when handlers are removed
concurrently.

rb/lib/selenium/webdriver/common/websocket_connection.rb [137-147]

 while (frame = incoming_frame.next)
   break if @closing
 
   message = process_frame(frame)
-  next unless message['method']
+  method = message['method']
+  next unless method.is_a?(String)
 
   params = message['params']
-  @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+  next unless params.is_a?(Hash)
+
+  handlers = @callbacks_mtx.synchronize { callbacks[method].dup }
+  handlers.each do |callback|
     @callback_threads.add(callback_thread(params, &callback))
   end
 end

[Suggestion processed]

Suggestion importance[1-10]: 6

__

Why:
Relevant best practice - Validate inputs and states early to avoid nil errors; guard message handling and callback spawning against missing fields.

Low
  • Update

Previous suggestions

✅ Suggestions up to commit 4292334
CategorySuggestion                                                                                                                                    Impact
High-level
Refactor to use multiple specialized mutexes
Suggestion Impact:The commit introduced three separate mutexes (@callbacks_mtx, @messages_mtx, @closing_mtx) and replaced usages of the single @mtx with the appropriate granular mutexes across callback management, message handling, and closing logic.

code diff:

       def initialize(url:)
         @callback_threads = ThreadGroup.new
-        @mtx = Mutex.new
+
+        @callbacks_mtx = Mutex.new
+        @messages_mtx = Mutex.new
+        @closing_mtx = Mutex.new
+
         @closing = false
         @session_id = nil
         @url = url
@@ -47,7 +51,7 @@
       end
 
       def close
-        @mtx.synchronize do
+        @closing_mtx.synchronize do
           return if @closing
 
           @closing = true
@@ -64,8 +68,8 @@
         @socket_thread&.join(0.5)
         @callback_threads.list.each do |thread|
           thread.join(0.5)
-        rescue StandardError
-          nil
+        rescue StandardError => e
+          WebDriver.logger.debug "Failed to join thread during close: #{e.class}: #{e.message}", id: :ws
         end
       end
 
@@ -74,18 +78,22 @@
       end
 
       def add_callback(event, &block)
-        @mtx.synchronize do
+        @callbacks_mtx.synchronize do
           callbacks[event] << block
           block.object_id
         end
       end
 
       def remove_callback(event, id)
-        removed = @mtx.synchronize { callbacks[event].reject! { |cb| cb.object_id == id } }
-        return if removed || @closing
-
-        ids = @mtx.synchronize { callbacks[event]&.map(&:object_id) }
-        raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+        @callbacks_mtx.synchronize do
+          return if @closing
+
+          callbacks_for_event = callbacks[event]
+          return if callbacks_for_event.reject! { |cb| cb.object_id == id }
+
+          ids = callbacks_for_event.map(&:object_id)
+          raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+        end
       end
 
       def send_cmd(**payload)
@@ -98,12 +106,10 @@
         begin
           socket.write(out_frame.to_s)
         rescue *CONNECTION_ERRORS => e
-          raise Error::WebDriverError, "WebSocket is closed (#{e.class}: #{e.message})"
-        end
-
-        wait.until do
-          @mtx.synchronize { messages.delete(id) }
-        end
+          raise e, "WebSocket is closed (#{e.class}: #{e.message})"
+        end
+
+        wait.until { @messages_mtx.synchronize { messages.delete(id) } }
       end
 
       private
@@ -132,9 +138,8 @@
               message = process_frame(frame)
               next unless message['method']
 
-              params = message['params']
-              @mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
-                @callback_threads.add(callback_thread(params, &callback))
+              @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+                @callback_threads.add(callback_thread(message['params'], &callback))
               end
             end
           end
@@ -154,7 +159,7 @@
         return {} if message.empty?
 
         msg = JSON.parse(message)
-        @mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
+        @messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
 
         WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
         msg
@@ -170,7 +175,8 @@
         rescue Error::WebDriverError, *CONNECTION_ERRORS => e
           WebDriver.logger.debug "Callback aborted: #{e.class}: #{e.message}", id: :ws
         rescue StandardError => e
-          # Unexpected handler failure; log with a short backtrace.
+          return if @closing
+
           bt = Array(e.backtrace).first(5).join("\n")
           WebDriver.logger.error "Callback error: #{e.class}: #{e.message}\n#{bt}", id: :ws
         end

Replace the single, coarse-grained mutex with multiple, more granular mutexes,
one for each shared resource (@callbacks, @messages, @closing). This will reduce
lock contention and improve concurrency.

Examples:

rb/lib/selenium/webdriver/common/websocket_connection.rb [40]
        @mtx = Mutex.new
rb/lib/selenium/webdriver/common/websocket_connection.rb [77-80]
        @mtx.synchronize do
          callbacks[event] << block
          block.object_id
        end

Solution Walkthrough:

Before:

class WebSocketConnection
  def initialize(url:)
    @mtx = Mutex.new
    @closing = false
    @callbacks = {}
    @messages = {}
    # ...
  end

  def add_callback(event, &block)
    @mtx.synchronize do
      # ... access @callbacks
    end
  end

  def send_cmd(**payload)
    # ...
    wait.until do
      @mtx.synchronize { messages.delete(id) }
    end
  end

  def process_frame(frame)
    # ...
    @mtx.synchronize { messages[msg['id']] = msg }
    # ...
  end
end

After:

class WebSocketConnection
  def initialize(url:)
    @callbacks_mtx = Mutex.new
    @messages_mtx = Mutex.new
    @closing_mtx = Mutex.new
    @closing = false
    @callbacks = {}
    @messages = {}
    # ...
  end

  def add_callback(event, &block)
    @callbacks_mtx.synchronize do
      # ... access @callbacks
    end
  end

  def send_cmd(**payload)
    # ...
    wait.until do
      @messages_mtx.synchronize { messages.delete(id) }
    end
  end

  def process_frame(frame)
    # ...
    @messages_mtx.synchronize { messages[msg['id']] = msg }
    # ...
  end
end
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that a single coarse-grained mutex is used to protect multiple independent resources, which can cause unnecessary lock contention and performance issues. Adopting a more granular locking strategy is a significant architectural improvement for the concurrency model introduced in this PR.

Medium
Possible issue
Ensure session is always deleted
Suggestion Impact:The commit changed the quit method so that failures from bidi.close are rescued and super is guaranteed to run via an ensure block, ensuring session cleanup. Although it used an ensure block instead of wrapping only bidi.close in a begin/rescue, it achieved the same goal of always calling super even if bidi.close fails.

code diff:

         def quit
           bidi.close
-          super
         rescue *QUIT_ERRORS
           nil
+        ensure
+          super
         end

Modify the quit method to ensure super is always called, even if bidi.close
fails. Wrap only the bidi.close call in a begin/rescue block to prevent leaving
orphaned browser sessions.

rb/lib/selenium/webdriver/remote/bidi_bridge.rb [48-53]

 def quit
-  bidi.close
+  begin
+    bidi.close
+  rescue *QUIT_ERRORS
+    nil
+  end
   super
-rescue *QUIT_ERRORS
-  nil
 end

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a critical flaw where a failure in bidi.close would prevent the super call from executing, potentially leaving browser sessions running. The proposed fix ensures the session is always cleaned up.

Medium
Fix race condition in callback removal
Suggestion Impact:The commit changed remove_callback to wrap the closing check, removal, and ID collection within a single @callbacks_mtx.synchronize block, addressing the race condition as suggested.

code diff:

       def remove_callback(event, id)
-        removed = @mtx.synchronize { callbacks[event].reject! { |cb| cb.object_id == id } }
-        return if removed || @closing
-
-        ids = @mtx.synchronize { callbacks[event]&.map(&:object_id) }
-        raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+        @callbacks_mtx.synchronize do
+          return if @closing
+
+          callbacks_for_event = callbacks[event]
+          return if callbacks_for_event.reject! { |cb| cb.object_id == id }
+
+          ids = callbacks[event]&.map(&:object_id)
+          raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+        end
       end

Refactor the remove_callback method to use a single synchronize block. This will
prevent a race condition by ensuring the callback removal and error message data
collection are performed atomically.

rb/lib/selenium/webdriver/common/websocket_connection.rb [84-88]

-removed = @mtx.synchronize { callbacks[event].reject! { |cb| cb.object_id == id } }
-return if removed || @closing
+@mtx.synchronize do
+  return if @closing
 
-ids = @mtx.synchronize { callbacks[event]&.map(&:object_id) }
-raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+  callbacks_for_event = callbacks[event]
+  if callbacks_for_event.reject! { |cb| cb.object_id == id }
+    return
+  end
 
+  ids = callbacks_for_event.map(&:object_id)
+  raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+end
+

[Suggestion processed]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a race condition in the remove_callback method where two separate synchronize blocks could lead to inconsistent state, and proposes a valid fix by using a single atomic block.

Medium
Learned
best practice
Make handshake loop safe

Guard the handshake read loop against connection closure and IO errors so
resources don't hang if the socket closes mid-handshake.

rb/lib/selenium/webdriver/common/websocket_connection.rb [115-118]

 def process_handshake
   socket.print(ws.to_s)
-  ws << socket.readpartial(1024) until ws.finished?
+  begin
+    ws << socket.readpartial(1024) until ws.finished?
+  rescue *CONNECTION_ERRORS, WebSocket::Error, EOFError, IOError
+    # let caller handle a failed/closed handshake deterministically
+    raise Error::WebDriverError, 'WebSocket handshake failed or connection closed'
+  end
 end
Suggestion importance[1-10]: 6

__

Why:
Relevant best practice - Enforce deterministic and safe resource handling by ensuring sockets/threads are always cleaned up on all code paths.

Low