meshV2: 大量イベント発火時のキュー肥大化を防ぐ重複排除機能
問題の概要
meshV2拡張機能で、「ずっと」ループ内でメッセージを連続発火すると、イベント送信キュー(eventQueue)が無限に肥大化し、システムが不安定になる問題があります。
再現手順
- meshV2拡張機能でグループを作成(ホスト)
- 以下のようなスクリプトを実行:
- 30fps(33msごと)でイベントが発火される
- 処理頻度は250ms → 1回で約7-8イベント処理
- キューに無限にイベントが追加され続ける
現状の問題点
1. eventQueue(送信キュー)に上限がない
コード: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js:820
fireEvent (eventName, payload = '') { // ... this.eventQueue.push({ eventName: eventName, payload: payload, firedAt: new Date().toISOString() }); }
問題:
- 上限チェックがないため、無制限に追加される
- 同じイベントが重複してキューに追加される
- 大量発火(例:ループ内で複数イベント)すると、キューが肥大化
- メモリ消費が増大し、システムが不安定になる
2. processBatchEvents()の処理
コード: mesh-service.js:597
async processBatchEvents () { if (this.eventQueue.length === 0) return; // キューから全イベントを取り出す const events = this.eventQueue.splice(0); log.info(`Mesh V2: Processing ${events.length} queued events for sending`); // ... }
問題:
- キュー全体を一度に取り出して処理
- 10万イベント溜まると、すべて送信完了まで時間がかかる
- その間も新しいイベントが追加され続ける
3. processNextBroadcast()(受信側)の処理効率
コード: mesh-service.js:535
// フレームごとに1つのブロードキャストのみ実行 const {event, offsetMs} = eventsToProcess[0]; this.broadcastEvent(event);
問題:
- 1フレームに1イベントしか処理しない
- 30fps = 1秒で30イベント処理
- 大量受信(例:1秒で1000イベント)すると、処理が追いつかない
採用する解決方法
基本方針
送信側: 重複排除 + サイズ制限(ハイブリッド)
- イベント送信キューには、1種類のメッセージ(eventName + payload)につき1つまで
- 異なるイベントが最大100個まで格納可能
- 上限を超えたら古いイベントを破棄(FIFO)
受信側: ループで複数イベントを一括実行
- 1フレームに1イベントではなく、受信した複数イベントをループで一括実行
- 理由: イベントの種類が異なれば、イベントハンドラも異なるため、RESTARTが起きない
メリット
- ✅ キューが無限に肥大化しない
- ✅ メモリ使用量が制限される(送信側: 最大100イベント)
- ✅ システムがクラッシュしない
- ✅ 受信側の処理速度が向上(複数イベント一括処理)
- ✅ シンプルで確実
デメリット
- ⚠️ 同じイベントを連続発火しても、1回しか送信されない
- ⚠️ 異なるイベントが100個を超えると、古いイベントが破棄される
実装案
実装1: 送信側(重複排除 + サイズ制限)
ファイル: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js
// constructorに定数と統計変数を追加 constructor (blocks, meshId, domain) { // ... 既存のコード ... this.eventQueue = []; this.eventBatchInterval = 250; this.eventBatchTimer = null; // Event queue limits this.MAX_EVENT_QUEUE_SIZE = 100; // 最大100イベント this.eventQueueStats = { duplicatesSkipped: 0, dropped: 0, lastReportTime: Date.now() }; // ... 残りのコード ... } // fireEvent()を修正 fireEvent (eventName, payload = '') { if (!this.groupId || !this.client) { log.warn(`Mesh V2: Cannot fire event ${eventName} - groupId: ${this.groupId}, client: ${!!this.client}`); return; } // ステップ1: 重複チェック const isDuplicate = this.eventQueue.some(item => item.eventName === eventName && item.payload === payload ); if (isDuplicate) { this.eventQueueStats.duplicatesSkipped++; this.reportEventStatsIfNeeded(); log.debug(`Mesh V2: Event already in queue, skipping: ${eventName}`); return; } // ステップ2: サイズ制限チェック(保険) if (this.eventQueue.length >= this.MAX_EVENT_QUEUE_SIZE) { const dropped = this.eventQueue.shift(); // 古いイベントを破棄(FIFO) this.eventQueueStats.dropped++; if (this.eventQueueStats.dropped % 10 === 1) { // 10イベントごとに警告 log.warn(`Mesh V2: Event queue full (${this.MAX_EVENT_QUEUE_SIZE}). ` + `Dropped ${this.eventQueueStats.dropped} events. ` + `Latest: ${dropped.eventName}`); } } log.debug(`Mesh V2: Queuing event for sending: ${eventName} ` + `(queue size: ${this.eventQueue.length})`); this.eventQueue.push({ eventName: eventName, payload: payload, firedAt: new Date().toISOString() }); } // 統計情報レポート(10秒ごと) reportEventStatsIfNeeded () { const now = Date.now(); const elapsed = now - this.eventQueueStats.lastReportTime; if (elapsed >= 10000 && (this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) { log.info(`Mesh V2: Event Queue Stats (last ${(elapsed / 1000).toFixed(1)}s): ` + `duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` + `dropped=${this.eventQueueStats.dropped}, ` + `current queue size=${this.eventQueue.length}`); this.eventQueueStats.duplicatesSkipped = 0; this.eventQueueStats.dropped = 0; this.eventQueueStats.lastReportTime = now; } } // cleanup()に統計情報出力を追加 cleanup () { // 統計情報を出力 if (this.eventQueueStats && (this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) { log.info(`Mesh V2: Final Event Queue Stats: ` + `duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` + `dropped=${this.eventQueueStats.dropped}`); } // キューをクリア this.pendingBroadcasts = []; this.batchStartTime = null; this.lastBroadcastOffset = 0; // ... 既存のクリーンアップコード ... }
計算量:
- 重複チェック: O(n) where n = eventQueue.length
- 最悪ケース: キューに100イベント → 100回の比較
- 実用上問題なし(250msに1回のみ実行)
実装2: 受信側(ループで複数イベント一括実行)
ファイル: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js
/** * Process pending broadcast events that should fire based on elapsed real time. * Called once per frame via BEFORE_STEP event. * * Strategy: * - Process all events whose timing has arrived (offsetMs <= elapsedMs) * - Execute them in order (maintains event sequence) * - Different event types don't cause RESTART (different handlers) */ processNextBroadcast () { if (!this.groupId) { // 切断されている場合はなにもしない return; } if (this.pendingBroadcasts.length === 0) { // キューが空になったらリセット this.batchStartTime = null; this.lastBroadcastOffset = 0; return; } const now = Date.now(); const elapsedMs = this.batchStartTime ? now - this.batchStartTime : 0; // 処理すべきイベントを収集(タイミングが来ているもの) const eventsToProcess = []; while (this.pendingBroadcasts.length > 0) { const {event, offsetMs} = this.pendingBroadcasts[0]; // まだタイミングが来ていない場合は待機 if (offsetMs > elapsedMs) { log.debug(`Mesh V2: Waiting for event ${event.name} ` + `(needs ${offsetMs}ms, elapsed ${elapsedMs}ms)`); break; } // タイミングが来たイベントをキューから取り出し const item = this.pendingBroadcasts.shift(); eventsToProcess.push(item); } // 収集したイベントをすべて処理 if (eventsToProcess.length > 0) { log.info(`Mesh V2: Broadcasting ${eventsToProcess.length} events ` + `(${this.pendingBroadcasts.length} remaining in queue)`); eventsToProcess.forEach(({event, offsetMs}) => { log.info(`Mesh V2: Broadcasting event: ${event.name} ` + `(offset: ${offsetMs}ms, elapsed: ${elapsedMs}ms)`); this.broadcastEvent(event); this.lastBroadcastOffset = offsetMs; }); } }
変更点:
- ❌ 削除: 1フレームに1イベントのみ処理する制約
- ✅ 追加:
forEachループですべてのイベントを一括処理 - ✅ 理由: イベント種類が異なればハンドラも異なるため、RESTARTが起きない
パフォーマンス:
- 従来: 1フレームに1イベント = 30イベント/秒(30fps)
- 改善後: 1フレームに複数イベント = 処理速度が大幅向上
- 例: 100イベント受信 → 従来3.3秒、改善後0.1秒以内
テスト方法
テスト1: 同じイベントを連続発火(重複排除)
期待結果:
- eventQueueには常に1つのイベントのみ
- 250msごとに1回送信
- duplicatesSkipped が増加
- システムが安定動作
テスト2: 異なるイベントを発火
ずっと
[メッセージ1] を送る
[メッセージ2] を送る
[メッセージ3] を送る
end
期待結果:
- eventQueueには最大3つのイベント
- 250msごとに3つ送信
- duplicatesSkipped は増加しない
テスト3: 大量の異なるイベント(サイズ制限)
// 150種類の異なるイベントを発火 for (let i = 0; i < 150; i++) { this.meshService.fireEvent(`message${i}`); }
期待結果:
- eventQueueが100で上限に達する
- 古い50イベントが破棄される
- 警告ログが出力される(10イベントごと)
- システムがクラッシュしない
テスト4: 受信側の一括処理
準備:
- ホスト: グループ作成
- メンバー: グループ参加
ホスト側:
[メッセージ1] を送る
[メッセージ2] を送る
[メッセージ3] を送る
メンバー側:
[メッセージ1] を受け取ったとき
(変数1) を (1) にする
[メッセージ2] を受け取ったとき
(変数2) を (1) にする
[メッセージ3] を受け取ったとき
(変数3) を (1) にする
期待結果:
- メンバー側で3つのイベントが同一フレーム内で処理される
- 変数1, 変数2, 変数3がすべて1になる
- ログに「Broadcasting 3 events」と表示される
パラメータ設定値
| パラメータ | 値 | 理由 |
|---|---|---|
MAX_EVENT_QUEUE_SIZE |
100 | 250ms × 400回 = 100秒分のバッファ(実用上十分) |
eventBatchInterval |
250ms | 既存の設定を維持 |
MAX_BATCH_SIZE |
1000 | GraphQLペイロードサイズ制限(既存) |
優先度
High - 大量イベント発火時にシステムが不安定になる実用上の問題
影響範囲
- meshV2拡張機能を使用するすべてのプロジェクト
- 特に「ずっと」ループ内でメッセージを発火するケース
- センサーデータやゲームイベントをリアルタイム共有するケース
関連Issue
- meshV2: 高頻度のグローバル変数更新でスタックオーバーフロー(Maximum call stack size exceeded) #499 - meshV2: 高頻度のグローバル変数更新でスタックオーバーフロー(修正済み)
- meshV2拡張機能のAWS AppSyncコスト追跡ログ実装 #498 - meshV2拡張機能のAWS AppSyncコスト追跡ログ実装
🤖 Generated with Claude Code
Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com