[rb] add synchronization and error handling for socket interactions by titusfortner · Pull Request #16487 · SeleniumHQ/selenium
PR Code Suggestions ✨
Latest suggestions up to 627943c
| Category | Suggestion | Impact |
| Possible issue |
Set closing flag on listener exit
In attach_socket_listener, use an ensure block to set @closing = true when the listener loop exits, including on error, to ensure a clean shutdown.
rb/lib/selenium/webdriver/common/websocket_connection.rb [128-152]
def attach_socket_listener
Thread.new do
Thread.current.report_on_exception = false
- loop do
- break if @closing
-
- incoming_frame << socket.readpartial(1024)
-
- while (frame = incoming_frame.next)
+ begin
+ loop do
break if @closing
- message = process_frame(frame)
- next unless message['method']
+ incoming_frame << socket.readpartial(1024)
- params = message['params']
- @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
- @callback_threads.add(callback_thread(params, &callback))
+ while (frame = incoming_frame.next)
+ break if @closing
+
+ message = process_frame(frame)
+ next unless message['method']
+
+ params = message['params']
+ @callbacks_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+ @callback_threads.add(callback_thread(params, &callback))
+ end
end
end
+ rescue *CONNECTION_ERRORS, WebSocket::Error => e
+ WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws
+ ensure
+ @closing_mtx.synchronize { @closing = true }
end
- rescue *CONNECTION_ERRORS, WebSocket::Error => e
- WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws
end
end
Suggestion importance[1-10]: 8
__
Why: The suggestion correctly points out that an error in the socket listener should set the @closing flag to signal other threads that the connection is dead, preventing them from waiting indefinitely.
| Medium
|
Enforce safe response timeout
In send_cmd, replace wait.until with a custom wait loop that checks the @closing flag to avoid waiting for a response when the connection is being terminated.
rb/lib/selenium/webdriver/common/websocket_connection.rb [99-115]
def send_cmd(**payload)
id = next_id
data = payload.merge(id: id)
WebDriver.logger.debug "WebSocket -> #{data}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
data = JSON.generate(data)
out_frame = WebSocket::Frame::Outgoing::Client.new(version: ws.version, data: data, type: 'text')
begin
socket.write(out_frame.to_s)
rescue *CONNECTION_ERRORS => e
- raise e, "WebSocket is closed (#{e.class}: #{e.message})"
+ raise Error::WebDriverError, "WebSocket is closed (#{e.class}: #{e.message})"
end
- wait.until do
- @messages_mtx.synchronize { messages.delete(id) }
+ deadline = Time.now + RESPONSE_WAIT_TIMEOUT
+ loop do
+ break if @closing
+ found = @messages_mtx.synchronize { messages.delete(id) }
+ return found if found
+ if Time.now > deadline
+ raise Error::WebDriverError, "Timed out waiting for WebSocket response to command id=#{id}"
+ end
+ sleep RESPONSE_WAIT_INTERVAL
end
+
+ raise Error::WebDriverError, "Connection closing while waiting for response to command id=#{id}"
end
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly identifies that the wait.until block can wait for the full timeout even if the connection is closing, and proposes adding a check for the @closing flag to fail faster, which improves robustness.
| Medium
|
| General |
Handle malformed frames safely
In process_frame, wrap JSON.parse in a begin/rescue block to handle
JSON::ParserError, preventing malformed data from crashing the socket listener thread.
rb/lib/selenium/webdriver/common/websocket_connection.rb [158-169]
def process_frame(frame)
- message = frame.to_s
+ raw = frame.to_s
# Firefox will periodically fail on unparsable empty frame
- return {} if message.empty?
+ return {} if raw.nil? || raw.empty?
- msg = JSON.parse(message)
- @messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
+ begin
+ msg = JSON.parse(raw)
+ rescue JSON::ParserError => e
+ WebDriver.logger.debug "Ignoring unparsable WebSocket frame: #{e.class}: #{e.message}", id: :ws
+ return {}
+ end
+
+ if msg.key?('id')
+ @messages_mtx.synchronize { messages[msg['id']] = msg }
+ end
WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
msg
end
[To ensure code accuracy, apply this suggestion manually]
Suggestion importance[1-10]: 8
__
Why: The suggestion correctly identifies that a JSON::ParserError from a malformed frame would crash the listener thread, and proposes adding error handling to make the connection more resilient.
| Medium
|
Learned best practice |
✅ Guard message fields before use
Suggestion Impact:The commit introduced separate mutexes and began synchronizing access to callbacks/messages. It changed the callback iteration to duplicate callbacks under a dedicated messages mutex, addressing the race concern. However, it did not add the exact type guards for method and params as suggested.
code diff:
@@ -132,9 +138,8 @@
message = process_frame(frame)
next unless message['method']
- params = message['params']
- @mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
- @callback_threads.add(callback_thread(params, &callback))
+ @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+ @callback_threads.add(callback_thread(message['params'], &callback))
end
end
end
@@ -154,7 +159,7 @@
return {} if message.empty?
msg = JSON.parse(message)
- @mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
+ @messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
Add defensive checks to ensure message['params'] is a Hash before use and safely duplicate callbacks to prevent nil errors and races when handlers are removed concurrently.
rb/lib/selenium/webdriver/common/websocket_connection.rb [137-147]
while (frame = incoming_frame.next)
break if @closing
message = process_frame(frame)
- next unless message['method']
+ method = message['method']
+ next unless method.is_a?(String)
params = message['params']
- @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+ next unless params.is_a?(Hash)
+
+ handlers = @callbacks_mtx.synchronize { callbacks[method].dup }
+ handlers.each do |callback|
@callback_threads.add(callback_thread(params, &callback))
end
end
[Suggestion processed]
Suggestion importance[1-10]: 6
__
Why:
Relevant best practice - Validate inputs and states early to avoid nil errors; guard message handling and callback spawning against missing fields.
| Low
|
|
| |
Previous suggestions
✅ Suggestions up to commit 4292334
| Category | Suggestion | Impact |
| High-level |
✅ Refactor to use multiple specialized mutexes
Suggestion Impact:The commit introduced three separate mutexes (@callbacks_mtx, @messages_mtx, @closing_mtx) and replaced usages of the single @mtx with the appropriate granular mutexes across callback management, message handling, and closing logic.
code diff:
def initialize(url:)
@callback_threads = ThreadGroup.new
- @mtx = Mutex.new
+
+ @callbacks_mtx = Mutex.new
+ @messages_mtx = Mutex.new
+ @closing_mtx = Mutex.new
+
@closing = false
@session_id = nil
@url = url
@@ -47,7 +51,7 @@
end
def close
- @mtx.synchronize do
+ @closing_mtx.synchronize do
return if @closing
@closing = true
@@ -64,8 +68,8 @@
@socket_thread&.join(0.5)
@callback_threads.list.each do |thread|
thread.join(0.5)
- rescue StandardError
- nil
+ rescue StandardError => e
+ WebDriver.logger.debug "Failed to join thread during close: #{e.class}: #{e.message}", id: :ws
end
end
@@ -74,18 +78,22 @@
end
def add_callback(event, &block)
- @mtx.synchronize do
+ @callbacks_mtx.synchronize do
callbacks[event] << block
block.object_id
end
end
def remove_callback(event, id)
- removed = @mtx.synchronize { callbacks[event].reject! { |cb| cb.object_id == id } }
- return if removed || @closing
-
- ids = @mtx.synchronize { callbacks[event]&.map(&:object_id) }
- raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+ @callbacks_mtx.synchronize do
+ return if @closing
+
+ callbacks_for_event = callbacks[event]
+ return if callbacks_for_event.reject! { |cb| cb.object_id == id }
+
+ ids = callbacks_for_event.map(&:object_id)
+ raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+ end
end
def send_cmd(**payload)
@@ -98,12 +106,10 @@
begin
socket.write(out_frame.to_s)
rescue *CONNECTION_ERRORS => e
- raise Error::WebDriverError, "WebSocket is closed (#{e.class}: #{e.message})"
- end
-
- wait.until do
- @mtx.synchronize { messages.delete(id) }
- end
+ raise e, "WebSocket is closed (#{e.class}: #{e.message})"
+ end
+
+ wait.until { @messages_mtx.synchronize { messages.delete(id) } }
end
private
@@ -132,9 +138,8 @@
message = process_frame(frame)
next unless message['method']
- params = message['params']
- @mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
- @callback_threads.add(callback_thread(params, &callback))
+ @messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
+ @callback_threads.add(callback_thread(message['params'], &callback))
end
end
end
@@ -154,7 +159,7 @@
return {} if message.empty?
msg = JSON.parse(message)
- @mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
+ @messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
msg
@@ -170,7 +175,8 @@
rescue Error::WebDriverError, *CONNECTION_ERRORS => e
WebDriver.logger.debug "Callback aborted: #{e.class}: #{e.message}", id: :ws
rescue StandardError => e
- # Unexpected handler failure; log with a short backtrace.
+ return if @closing
+
bt = Array(e.backtrace).first(5).join("\n")
WebDriver.logger.error "Callback error: #{e.class}: #{e.message}\n#{bt}", id: :ws
end
Replace the single, coarse-grained mutex with multiple, more granular mutexes, one for each shared resource (@callbacks, @messages, @closing). This will reduce lock contention and improve concurrency.
Examples:
rb/lib/selenium/webdriver/common/websocket_connection.rb [40]
rb/lib/selenium/webdriver/common/websocket_connection.rb [77-80]
@mtx.synchronize do
callbacks[event] << block
block.object_id
end
Solution Walkthrough:
Before:
class WebSocketConnection
def initialize(url:)
@mtx = Mutex.new
@closing = false
@callbacks = {}
@messages = {}
# ...
end
def add_callback(event, &block)
@mtx.synchronize do
# ... access @callbacks
end
end
def send_cmd(**payload)
# ...
wait.until do
@mtx.synchronize { messages.delete(id) }
end
end
def process_frame(frame)
# ...
@mtx.synchronize { messages[msg['id']] = msg }
# ...
end
end
After:
class WebSocketConnection
def initialize(url:)
@callbacks_mtx = Mutex.new
@messages_mtx = Mutex.new
@closing_mtx = Mutex.new
@closing = false
@callbacks = {}
@messages = {}
# ...
end
def add_callback(event, &block)
@callbacks_mtx.synchronize do
# ... access @callbacks
end
end
def send_cmd(**payload)
# ...
wait.until do
@messages_mtx.synchronize { messages.delete(id) }
end
end
def process_frame(frame)
# ...
@messages_mtx.synchronize { messages[msg['id']] = msg }
# ...
end
end
Suggestion importance[1-10]: 8
__
Why: The suggestion correctly identifies that a single coarse-grained mutex is used to protect multiple independent resources, which can cause unnecessary lock contention and performance issues. Adopting a more granular locking strategy is a significant architectural improvement for the concurrency model introduced in this PR.
| Medium
|
| Possible issue |
✅ Ensure session is always deleted
Suggestion Impact:The commit changed the quit method so that failures from bidi.close are rescued and super is guaranteed to run via an ensure block, ensuring session cleanup. Although it used an ensure block instead of wrapping only bidi.close in a begin/rescue, it achieved the same goal of always calling super even if bidi.close fails.
code diff:
def quit
bidi.close
- super
rescue *QUIT_ERRORS
nil
+ ensure
+ super
end
Modify the quit method to ensure super is always called, even if bidi.close fails. Wrap only the bidi.close call in a begin/rescue block to prevent leaving orphaned browser sessions.
rb/lib/selenium/webdriver/remote/bidi_bridge.rb [48-53]
def quit
- bidi.close
+ begin
+ bidi.close
+ rescue *QUIT_ERRORS
+ nil
+ end
super
-rescue *QUIT_ERRORS
- nil
end
[Suggestion processed]
Suggestion importance[1-10]: 8
__
Why: The suggestion correctly identifies a critical flaw where a failure in bidi.close would prevent the super call from executing, potentially leaving browser sessions running. The proposed fix ensures the session is always cleaned up.
| Medium
|
✅ Fix race condition in callback removal
Suggestion Impact:The commit changed remove_callback to wrap the closing check, removal, and ID collection within a single @callbacks_mtx.synchronize block, addressing the race condition as suggested.
code diff:
def remove_callback(event, id)
- removed = @mtx.synchronize { callbacks[event].reject! { |cb| cb.object_id == id } }
- return if removed || @closing
-
- ids = @mtx.synchronize { callbacks[event]&.map(&:object_id) }
- raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+ @callbacks_mtx.synchronize do
+ return if @closing
+
+ callbacks_for_event = callbacks[event]
+ return if callbacks_for_event.reject! { |cb| cb.object_id == id }
+
+ ids = callbacks[event]&.map(&:object_id)
+ raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+ end
end
Refactor the remove_callback method to use a single synchronize block. This will prevent a race condition by ensuring the callback removal and error message data collection are performed atomically.
rb/lib/selenium/webdriver/common/websocket_connection.rb [84-88]
-removed = @mtx.synchronize { callbacks[event].reject! { |cb| cb.object_id == id } }
-return if removed || @closing
+@mtx.synchronize do
+ return if @closing
-ids = @mtx.synchronize { callbacks[event]&.map(&:object_id) }
-raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+ callbacks_for_event = callbacks[event]
+ if callbacks_for_event.reject! { |cb| cb.object_id == id }
+ return
+ end
+ ids = callbacks_for_event.map(&:object_id)
+ raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
+end
+
[Suggestion processed]
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly identifies a race condition in the remove_callback method where two separate synchronize blocks could lead to inconsistent state, and proposes a valid fix by using a single atomic block.
| Medium
|
Learned best practice |
Make handshake loop safe
Guard the handshake read loop against connection closure and IO errors so resources don't hang if the socket closes mid-handshake.
rb/lib/selenium/webdriver/common/websocket_connection.rb [115-118]
def process_handshake
socket.print(ws.to_s)
- ws << socket.readpartial(1024) until ws.finished?
+ begin
+ ws << socket.readpartial(1024) until ws.finished?
+ rescue *CONNECTION_ERRORS, WebSocket::Error, EOFError, IOError
+ # let caller handle a failed/closed handshake deterministically
+ raise Error::WebDriverError, 'WebSocket handshake failed or connection closed'
+ end
end
Suggestion importance[1-10]: 6
__
Why:
Relevant best practice - Enforce deterministic and safe resource handling by ensuring sockets/threads are always cleaned up on all code paths.
| Low
|
|
| |