WebSocket JSON-RPC client with support for real-time subscriptions and event streaming.
Overview
Nethereum.JsonRpc.WebSocketClient provides WebSocket transport implementations for Ethereum node communication, supporting both standard request/response patterns and real-time event subscriptions. WebSockets enable push-based notifications from the node for new blocks, pending transactions, and contract events without polling.
Key Features:
- WebSocket transport (wss:// and ws://)
- Real-time subscriptions (newHeads, logs, pendingTransactions, syncing)
- Event streaming with automatic message routing
- Request/response and streaming modes
- Custom request headers support
- Connection management and automatic reconnection
- Thread-safe subscription handling
- Production-tested reliability
Use Cases:
- Real-time block monitoring
- Contract event streaming
- Pending transaction monitoring
- Mempool watching (MEV, arbitrage)
- Live dashboard updates
- Blockchain indexers
- Wallet notifications
Installation
dotnet add package Nethereum.JsonRpc.WebSocketClient
Requirements:
- Ethereum node with WebSocket support (Geth, Erigon, Infura, Alchemy)
- .NET Standard 2.0+ or .NET Core 2.1+
Dependencies
Nethereum:
- Nethereum.JsonRpc.Client - Core RPC abstraction (which provides JSON serialization and logging support)
External:
- System.Net.WebSockets.Client (v4.3.2) - WebSocket protocol implementation
Quick Start
Basic WebSocket Client (Request/Response)
using Nethereum.JsonRpc.WebSocketClient; using Nethereum.RPC.Eth; // Connect to WebSocket endpoint var client = new WebSocketClient("ws://localhost:8546"); // Use like any other RPC client var ethBlockNumber = new EthBlockNumber(client); var blockNumber = await ethBlockNumber.SendRequestAsync(); Console.WriteLine($"Current block: {blockNumber.Value}"); // Always dispose when done client.Dispose();
Streaming Client (Subscriptions)
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Eth.DTOs; using Nethereum.RPC.Reactive.Eth.Subscriptions; // Create streaming client var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID"); // Create subscription for new blocks var subscription = new EthNewBlockHeadersSubscription(client); // Handle new block events subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block => { Console.WriteLine($"New block: {block.Number.Value}"); Console.WriteLine($"Hash: {block.BlockHash}"); Console.WriteLine($"Miner: {block.Miner}"); }); // Start streaming await client.StartAsync(); // Subscribe await subscription.SubscribeAsync(); // Keep running Console.WriteLine("Monitoring new blocks. Press Enter to exit."); Console.ReadLine(); // Cleanup await subscription.UnsubscribeAsync(); await client.StopAsync(); client.Dispose();
Usage Examples
Example 1: Basic WebSocket Connection
using Nethereum.JsonRpc.WebSocketClient; using Nethereum.RPC.Eth; // Local Geth/Erigon var client = new WebSocketClient("ws://localhost:8546"); // Infura var infuraClient = new WebSocketClient( "wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID" ); // Alchemy var alchemyClient = new WebSocketClient( "wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY" ); // Use with RPC services var ethChainId = new EthChainId(client); var chainId = await ethChainId.SendRequestAsync(); var ethGasPrice = new EthGasPrice(client); var gasPrice = await ethGasPrice.SendRequestAsync(); Console.WriteLine($"Chain ID: {chainId.Value}"); Console.WriteLine($"Gas Price: {gasPrice.Value} wei"); // Cleanup client.Dispose();
Example 2: Real-Time Block Monitoring
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Reactive.Eth.Subscriptions; using Nethereum.RPC.Eth.DTOs; using System.Reactive.Linq; var client = new StreamingWebSocketClient("ws://localhost:8546"); // Create new block headers subscription var subscription = new EthNewBlockHeadersSubscription(client); // Subscribe to new blocks subscription.GetSubscriptionDataResponsesAsObservable() .Subscribe(block => { Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] New Block"); Console.WriteLine($" Number: {block.Number.Value}"); Console.WriteLine($" Hash: {block.BlockHash}"); Console.WriteLine($" Parent: {block.ParentHash}"); Console.WriteLine($" Timestamp: {DateTimeOffset.FromUnixTimeSeconds((long)block.Timestamp.Value)}"); Console.WriteLine($" Difficulty: {block.Difficulty.Value}"); Console.WriteLine($" Gas Used: {block.GasUsed.Value:N0}"); Console.WriteLine($" Transactions: {block.TransactionCount()}"); Console.WriteLine(); }, error => Console.WriteLine($"Error: {error.Message}")); // Start client and subscribe await client.StartAsync(); await subscription.SubscribeAsync(); Console.WriteLine("Monitoring blocks. Press Enter to stop."); Console.ReadLine(); // Cleanup await subscription.UnsubscribeAsync(); await client.StopAsync(); client.Dispose();
Example 3: Contract Event Streaming
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Reactive.Eth.Subscriptions; using Nethereum.RPC.Eth.DTOs; using Nethereum.Hex.HexTypes; var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID"); // Create logs subscription for USDC Transfer events var transferEventSignature = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; var usdcAddress = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"; var filterLogs = new NewFilterInput { Address = new[] { usdcAddress }, Topics = new[] { transferEventSignature } }; var subscription = new EthLogsSubscription(client); await subscription.SubscribeAsync(filterLogs); // Handle Transfer events subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(log => { var from = "0x" + log.Topics[1].ToString().Substring(26); var to = "0x" + log.Topics[2].ToString().Substring(26); var amount = new HexBigInteger(log.Data).Value; Console.WriteLine($"USDC Transfer:"); Console.WriteLine($" From: {from}"); Console.WriteLine($" To: {to}"); Console.WriteLine($" Amount: {amount / 1000000m:N2} USDC"); // USDC has 6 decimals Console.WriteLine($" Tx: {log.TransactionHash}"); Console.WriteLine(); }); await client.StartAsync(); Console.WriteLine("Monitoring USDC transfers. Press Enter to stop."); Console.ReadLine(); await subscription.UnsubscribeAsync(); await client.StopAsync(); client.Dispose();
Example 4: Pending Transaction Monitoring (Mempool)
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Reactive.Eth.Subscriptions; using Nethereum.Web3; var client = new StreamingWebSocketClient("ws://localhost:8546"); // Create pending transactions subscription var subscription = new EthNewPendingTransactionSubscription(client); // Handle new pending transactions subscription.GetSubscriptionDataResponsesAsObservable() .Buffer(TimeSpan.FromSeconds(1)) // Batch for 1 second .Subscribe(async txHashes => { if (txHashes.Count > 0) { Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {txHashes.Count} new pending transactions"); // Fetch details for first transaction var web3 = new Web3(client); var txDetails = await web3.Eth.Transactions.GetTransactionByHash.SendRequestAsync(txHashes[0]); if (txDetails != null) { Console.WriteLine($" First tx hash: {txDetails.TransactionHash}"); Console.WriteLine($" From: {txDetails.From}"); Console.WriteLine($" To: {txDetails.To}"); Console.WriteLine($" Value: {Web3.Convert.FromWei(txDetails.Value)} ETH"); Console.WriteLine($" Gas Price: {Web3.Convert.FromWei(txDetails.GasPrice, Web3.Convert.UnitConversion.Gwei)} Gwei"); } } }); await client.StartAsync(); await subscription.SubscribeAsync(); Console.WriteLine("Monitoring mempool. Press Enter to stop."); Console.ReadLine(); await subscription.UnsubscribeAsync(); await client.StopAsync(); client.Dispose();
Example 5: Multiple Subscriptions
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Reactive.Eth.Subscriptions; var client = new StreamingWebSocketClient("ws://localhost:8546"); // Create multiple subscriptions var blockSubscription = new EthNewBlockHeadersSubscription(client); var pendingTxSubscription = new EthNewPendingTransactionSubscription(client); var syncSubscription = new EthSyncingSubscription(client); // Handle new blocks blockSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block => { Console.WriteLine($"[BLOCK] #{block.Number.Value}"); }); // Handle pending transactions (with throttling) pendingTxSubscription.GetSubscriptionDataResponsesAsObservable() .Buffer(TimeSpan.FromSeconds(5)) .Subscribe(txHashes => { Console.WriteLine($"[MEMPOOL] {txHashes.Count} pending transactions in last 5s"); }); // Handle sync status syncSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(syncStatus => { if (syncStatus.IsSyncing) { Console.WriteLine($"[SYNC] Current: {syncStatus.CurrentBlock}, Highest: {syncStatus.HighestBlock}"); } else { Console.WriteLine($"[SYNC] Node is synced"); } }); // Start client and all subscriptions await client.StartAsync(); await blockSubscription.SubscribeAsync(); await pendingTxSubscription.SubscribeAsync(); await syncSubscription.SubscribeAsync(); Console.WriteLine("Monitoring multiple streams. Press Enter to stop."); Console.ReadLine(); // Cleanup all subscriptions await blockSubscription.UnsubscribeAsync(); await pendingTxSubscription.UnsubscribeAsync(); await syncSubscription.UnsubscribeAsync(); await client.StopAsync(); client.Dispose();
Example 6: Custom Request Headers (Authentication)
using Nethereum.JsonRpc.WebSocketClient; using Nethereum.RPC.Eth; var client = new WebSocketClient("wss://api.example.com/ws"); // Add custom headers (e.g., API key) client.RequestHeaders.Add("X-API-Key", "your-api-key-here"); client.RequestHeaders.Add("Authorization", "Bearer your-token"); var ethBlockNumber = new EthBlockNumber(client); var blockNumber = await ethBlockNumber.SendRequestAsync(); Console.WriteLine($"Block (authenticated): {blockNumber.Value}"); client.Dispose();
Example 7: Production Reconnection Pattern (from Nethereum.WebSocketsStreamingTest)
CRITICAL for production: Automatic reconnection when WebSocket connection drops:
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Reactive.Eth.Subscriptions; using System.Reactive.Linq; public class ProductionBlockMonitor { private readonly string url; private StreamingWebSocketClient client; public ProductionBlockMonitor(string url) { this.url = url; } public async Task SubscribeAndRunAsync() { if (client == null) { client = new StreamingWebSocketClient(url); // Production pattern: auto-reconnect on error client.Error += Client_Error; } var blockHeaderSubscription = new EthNewBlockHeadersObservableSubscription(client); // Get subscription ID when subscribed blockHeaderSubscription.GetSubscribeResponseAsObservable().Subscribe(subscriptionId => Console.WriteLine($"Block Header subscription Id: {subscriptionId}")); // Process new blocks blockHeaderSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe( block => Console.WriteLine($"New Block: {block.BlockHash}"), exception => Console.WriteLine($"BlockHeaderSubscription error info: {exception.Message}") ); // Handle unsubscribe confirmation blockHeaderSubscription.GetUnsubscribeResponseAsObservable().Subscribe(response => Console.WriteLine($"Block Header unsubscribe result: {response}")); await client.StartAsync(); await blockHeaderSubscription.SubscribeAsync(); Console.WriteLine("Monitoring blocks with auto-reconnect. Press Enter to stop."); Console.ReadLine(); await blockHeaderSubscription.UnsubscribeAsync(); } // Production reconnection handler private async void Client_Error(object sender, Exception ex) { Console.WriteLine($"Client Error, restarting... ({ex.Message})"); // Stop the failed connection await ((StreamingWebSocketClient)sender).StopAsync(); // Restart everything await SubscribeAndRunAsync(); } } // Usage var monitor = new ProductionBlockMonitor("ws://localhost:8546"); await monitor.SubscribeAndRunAsync();
Why this pattern works:
- The
Client_Errorevent catches all WebSocket failures - Automatically stops the failed connection
- Recursively restarts the entire subscription flow
- Ensures continuous monitoring even through network disruptions
Example 8: High-Frequency Event Processing with Reactive Extensions
using Nethereum.JsonRpc.WebSocketStreamingClient; using Nethereum.RPC.Reactive.Eth.Subscriptions; using System.Reactive.Linq; var client = new StreamingWebSocketClient("ws://localhost:8546"); var subscription = new EthNewBlockHeadersSubscription(client); // Advanced reactive processing subscription.GetSubscriptionDataResponsesAsObservable() .Window(TimeSpan.FromMinutes(1)) // 1-minute windows .SelectMany(window => window .Aggregate(new { Count = 0, TotalGasUsed = BigInteger.Zero, TotalTransactions = 0 }, (acc, block) => new { Count = acc.Count + 1, TotalGasUsed = acc.TotalGasUsed + block.GasUsed.Value, TotalTransactions = acc.TotalTransactions + (int)block.TransactionCount() })) .Subscribe(stats => { Console.WriteLine($"=== 1-Minute Stats ==="); Console.WriteLine($"Blocks: {stats.Count}"); Console.WriteLine($"Avg Gas/Block: {stats.TotalGasUsed / stats.Count:N0}"); Console.WriteLine($"Total Transactions: {stats.TotalTransactions}"); Console.WriteLine(); }); await client.StartAsync(); await subscription.SubscribeAsync(); Console.WriteLine("Collecting statistics. Press Enter to stop."); Console.ReadLine(); await subscription.UnsubscribeAsync(); await client.StopAsync(); client.Dispose();
Example 9: Using with Nethereum.Web3
using Nethereum.Web3; using Nethereum.JsonRpc.WebSocketClient; // Create WebSocket client var wsClient = new WebSocketClient("ws://localhost:8546"); // Use with Web3 var web3 = new Web3(wsClient); // Standard Web3 operations over WebSocket var balance = await web3.Eth.GetBalance.SendRequestAsync( "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb" ); var blockNumber = await web3.Eth.Blocks.GetBlockNumber.SendRequestAsync(); Console.WriteLine($"Balance: {Web3.Convert.FromWei(balance)} ETH"); Console.WriteLine($"Block: {blockNumber.Value}"); // Cleanup wsClient.Dispose();
API Reference
WebSocketClient (Basic)
public class WebSocketClient : ClientBase, IDisposable, IClientRequestHeaderSupport { public WebSocketClient(string path, JsonSerializerSettings jsonSerializerSettings = null, ILogger log = null) public Dictionary<string, string> RequestHeaders { get; set; } public TimeSpan ConnectionTimeout { get; set; } public Task StopAsync() public Task StopAsync(WebSocketCloseStatus webSocketCloseStatus, string status, CancellationToken timeOutToken) }
StreamingWebSocketClient (Subscriptions)
public class StreamingWebSocketClient : IStreamingClient, IDisposable, IClientRequestHeaderSupport { public StreamingWebSocketClient(string path, JsonSerializerSettings jsonSerializerSettings = null, ILogger log = null) public Dictionary<string, string> RequestHeaders { get; set; } public static TimeSpan ConnectionTimeout { get; set; } public WebSocketState WebSocketState { get; } public bool IsStarted { get; } public event WebSocketStreamingErrorEventHandler Error; public Task StartAsync() public Task StopAsync() public bool AddSubscription(string subscriptionId, IRpcStreamingResponseHandler handler) public bool RemoveSubscription(string subscriptionId) }
Available Subscriptions
| Subscription | Description |
|---|---|
| EthNewBlockHeadersSubscription | New block headers |
| EthNewPendingTransactionSubscription | Pending transactions (mempool) |
| EthLogsSubscription | Contract event logs |
| EthSyncingSubscription | Node sync status |
Important Notes
WebSocket Endpoints
Common WebSocket URLs:
| Node/Provider | WebSocket URL |
|---|---|
| Geth (local) | ws://localhost:8546 |
| Erigon (local) | ws://localhost:8545 |
| Infura | wss://mainnet.infura.io/ws/v3/PROJECT_ID |
| Alchemy | wss://eth-mainnet.g.alchemy.com/v2/API_KEY |
| QuickNode | wss://your-endpoint.quiknode.pro/TOKEN/ |
Starting Geth/Erigon with WebSocket
Geth:
geth --ws --ws.addr 0.0.0.0 --ws.port 8546 --ws.api eth,net,web3
Erigon:
erigon --ws --ws.port 8545
Performance Considerations
| Subscription | Event Rate | Notes |
|---|---|---|
| newHeads | ~12s (mainnet) | One per block |
| pendingTransactions | 100-1000/s | Very high volume |
| logs (filtered) | Variable | Depends on filter |
| syncing | Rare | Only during sync |
Tips:
- Use
Buffer()or throttling for high-volume subscriptions - Filter logs as narrowly as possible (specific addresses/topics)
- Consider multiple clients for heavy workloads
Thread Safety
- StreamingWebSocketClient is thread-safe for subscriptions
- Multiple subscriptions can run concurrently
- Each subscription has isolated message handling
Connection Management
- WebSocket connections can drop - implement error handling
- Use the
Errorevent to detect connection issues - Implement reconnection logic for production apps
- Always call
Dispose()to properly close connections
Subscription Limits
Some providers limit concurrent subscriptions:
- Infura: Up to 5 subscriptions per connection
- Alchemy: Up to 10 subscriptions per connection
- Local nodes: Usually unlimited
Related Packages
Alternative Transports
- Nethereum.JsonRpc.RpcClient - HTTP/HTTPS transport
- Nethereum.JsonRpc.IpcClient - IPC transport
- Nethereum.JsonRpc.SystemTextJsonRpcClient - HTTP with System.Text.Json
Core Dependencies
- Nethereum.JsonRpc.Client - Abstraction layer
Higher-Level APIs
- Nethereum.Web3 - Complete Web3 API
- Nethereum.RPC.Reactive - Reactive Extensions for subscriptions