meshV2: 大量イベント発火時のキュー肥大化を防ぐ重複排除機能

問題の概要

meshV2拡張機能で、「ずっと」ループ内でメッセージを連続発火すると、イベント送信キュー(eventQueue)が無限に肥大化し、システムが不安定になる問題があります。

再現手順

  1. meshV2拡張機能でグループを作成(ホスト)
  2. 以下のようなスクリプトを実行:
  1. 30fps(33msごと)でイベントが発火される
  2. 処理頻度は250ms → 1回で約7-8イベント処理
  3. キューに無限にイベントが追加され続ける

現状の問題点

1. eventQueue(送信キュー)に上限がない

コード: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js:820

fireEvent (eventName, payload = '') {
    // ...
    this.eventQueue.push({
        eventName: eventName,
        payload: payload,
        firedAt: new Date().toISOString()
    });
}

問題:

  • 上限チェックがないため、無制限に追加される
  • 同じイベントが重複してキューに追加される
  • 大量発火(例:ループ内で複数イベント)すると、キューが肥大化
  • メモリ消費が増大し、システムが不安定になる

2. processBatchEvents()の処理

コード: mesh-service.js:597

async processBatchEvents () {
    if (this.eventQueue.length === 0) return;

    // キューから全イベントを取り出す
    const events = this.eventQueue.splice(0);
    log.info(`Mesh V2: Processing ${events.length} queued events for sending`);
    // ...
}

問題:

  • キュー全体を一度に取り出して処理
  • 10万イベント溜まると、すべて送信完了まで時間がかかる
  • その間も新しいイベントが追加され続ける

3. processNextBroadcast()(受信側)の処理効率

コード: mesh-service.js:535

// フレームごとに1つのブロードキャストのみ実行
const {event, offsetMs} = eventsToProcess[0];
this.broadcastEvent(event);

問題:

  • 1フレームに1イベントしか処理しない
  • 30fps = 1秒で30イベント処理
  • 大量受信(例:1秒で1000イベント)すると、処理が追いつかない

採用する解決方法

基本方針

送信側: 重複排除 + サイズ制限(ハイブリッド)

  • イベント送信キューには、1種類のメッセージ(eventName + payload)につき1つまで
  • 異なるイベントが最大100個まで格納可能
  • 上限を超えたら古いイベントを破棄(FIFO)

受信側: ループで複数イベントを一括実行

  • 1フレームに1イベントではなく、受信した複数イベントをループで一括実行
  • 理由: イベントの種類が異なれば、イベントハンドラも異なるため、RESTARTが起きない

メリット

  • ✅ キューが無限に肥大化しない
  • ✅ メモリ使用量が制限される(送信側: 最大100イベント)
  • ✅ システムがクラッシュしない
  • ✅ 受信側の処理速度が向上(複数イベント一括処理)
  • ✅ シンプルで確実

デメリット

  • ⚠️ 同じイベントを連続発火しても、1回しか送信されない
  • ⚠️ 異なるイベントが100個を超えると、古いイベントが破棄される

実装案

実装1: 送信側(重複排除 + サイズ制限)

ファイル: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js

// constructorに定数と統計変数を追加
constructor (blocks, meshId, domain) {
    // ... 既存のコード ...

    this.eventQueue = [];
    this.eventBatchInterval = 250;
    this.eventBatchTimer = null;

    // Event queue limits
    this.MAX_EVENT_QUEUE_SIZE = 100;  // 最大100イベント
    this.eventQueueStats = {
        duplicatesSkipped: 0,
        dropped: 0,
        lastReportTime: Date.now()
    };

    // ... 残りのコード ...
}

// fireEvent()を修正
fireEvent (eventName, payload = '') {
    if (!this.groupId || !this.client) {
        log.warn(`Mesh V2: Cannot fire event ${eventName} - groupId: ${this.groupId}, client: ${!!this.client}`);
        return;
    }

    // ステップ1: 重複チェック
    const isDuplicate = this.eventQueue.some(item =>
        item.eventName === eventName && item.payload === payload
    );

    if (isDuplicate) {
        this.eventQueueStats.duplicatesSkipped++;
        this.reportEventStatsIfNeeded();

        log.debug(`Mesh V2: Event already in queue, skipping: ${eventName}`);
        return;
    }

    // ステップ2: サイズ制限チェック(保険)
    if (this.eventQueue.length >= this.MAX_EVENT_QUEUE_SIZE) {
        const dropped = this.eventQueue.shift();  // 古いイベントを破棄(FIFO)
        this.eventQueueStats.dropped++;

        if (this.eventQueueStats.dropped % 10 === 1) {  // 10イベントごとに警告
            log.warn(`Mesh V2: Event queue full (${this.MAX_EVENT_QUEUE_SIZE}). ` +
                `Dropped ${this.eventQueueStats.dropped} events. ` +
                `Latest: ${dropped.eventName}`);
        }
    }

    log.debug(`Mesh V2: Queuing event for sending: ${eventName} ` +
        `(queue size: ${this.eventQueue.length})`);

    this.eventQueue.push({
        eventName: eventName,
        payload: payload,
        firedAt: new Date().toISOString()
    });
}

// 統計情報レポート(10秒ごと)
reportEventStatsIfNeeded () {
    const now = Date.now();
    const elapsed = now - this.eventQueueStats.lastReportTime;

    if (elapsed >= 10000 &&
        (this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) {
        log.info(`Mesh V2: Event Queue Stats (last ${(elapsed / 1000).toFixed(1)}s): ` +
            `duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` +
            `dropped=${this.eventQueueStats.dropped}, ` +
            `current queue size=${this.eventQueue.length}`);

        this.eventQueueStats.duplicatesSkipped = 0;
        this.eventQueueStats.dropped = 0;
        this.eventQueueStats.lastReportTime = now;
    }
}

// cleanup()に統計情報出力を追加
cleanup () {
    // 統計情報を出力
    if (this.eventQueueStats &&
        (this.eventQueueStats.duplicatesSkipped > 0 || this.eventQueueStats.dropped > 0)) {
        log.info(`Mesh V2: Final Event Queue Stats: ` +
            `duplicates skipped=${this.eventQueueStats.duplicatesSkipped}, ` +
            `dropped=${this.eventQueueStats.dropped}`);
    }

    // キューをクリア
    this.pendingBroadcasts = [];
    this.batchStartTime = null;
    this.lastBroadcastOffset = 0;

    // ... 既存のクリーンアップコード ...
}

計算量:

  • 重複チェック: O(n) where n = eventQueue.length
  • 最悪ケース: キューに100イベント → 100回の比較
  • 実用上問題なし(250msに1回のみ実行)

実装2: 受信側(ループで複数イベント一括実行)

ファイル: gui/scratch-vm/src/extensions/scratch3_mesh_v2/mesh-service.js

/**
 * Process pending broadcast events that should fire based on elapsed real time.
 * Called once per frame via BEFORE_STEP event.
 *
 * Strategy:
 * - Process all events whose timing has arrived (offsetMs <= elapsedMs)
 * - Execute them in order (maintains event sequence)
 * - Different event types don't cause RESTART (different handlers)
 */
processNextBroadcast () {
    if (!this.groupId) {
        // 切断されている場合はなにもしない
        return;
    }

    if (this.pendingBroadcasts.length === 0) {
        // キューが空になったらリセット
        this.batchStartTime = null;
        this.lastBroadcastOffset = 0;
        return;
    }

    const now = Date.now();
    const elapsedMs = this.batchStartTime ? now - this.batchStartTime : 0;

    // 処理すべきイベントを収集(タイミングが来ているもの)
    const eventsToProcess = [];

    while (this.pendingBroadcasts.length > 0) {
        const {event, offsetMs} = this.pendingBroadcasts[0];

        // まだタイミングが来ていない場合は待機
        if (offsetMs > elapsedMs) {
            log.debug(`Mesh V2: Waiting for event ${event.name} ` +
                `(needs ${offsetMs}ms, elapsed ${elapsedMs}ms)`);
            break;
        }

        // タイミングが来たイベントをキューから取り出し
        const item = this.pendingBroadcasts.shift();
        eventsToProcess.push(item);
    }

    // 収集したイベントをすべて処理
    if (eventsToProcess.length > 0) {
        log.info(`Mesh V2: Broadcasting ${eventsToProcess.length} events ` +
            `(${this.pendingBroadcasts.length} remaining in queue)`);

        eventsToProcess.forEach(({event, offsetMs}) => {
            log.info(`Mesh V2: Broadcasting event: ${event.name} ` +
                `(offset: ${offsetMs}ms, elapsed: ${elapsedMs}ms)`);

            this.broadcastEvent(event);
            this.lastBroadcastOffset = offsetMs;
        });
    }
}

変更点:

  • ❌ 削除: 1フレームに1イベントのみ処理する制約
  • ✅ 追加: forEachループですべてのイベントを一括処理
  • ✅ 理由: イベント種類が異なればハンドラも異なるため、RESTARTが起きない

パフォーマンス:

  • 従来: 1フレームに1イベント = 30イベント/秒(30fps)
  • 改善後: 1フレームに複数イベント = 処理速度が大幅向上
  • 例: 100イベント受信 → 従来3.3秒、改善後0.1秒以内

テスト方法

テスト1: 同じイベントを連続発火(重複排除)

期待結果:

  • eventQueueには常に1つのイベントのみ
  • 250msごとに1回送信
  • duplicatesSkipped が増加
  • システムが安定動作

テスト2: 異なるイベントを発火

ずっと
  [メッセージ1] を送る
  [メッセージ2] を送る
  [メッセージ3] を送る
end

期待結果:

  • eventQueueには最大3つのイベント
  • 250msごとに3つ送信
  • duplicatesSkipped は増加しない

テスト3: 大量の異なるイベント(サイズ制限)

// 150種類の異なるイベントを発火
for (let i = 0; i < 150; i++) {
    this.meshService.fireEvent(`message${i}`);
}

期待結果:

  • eventQueueが100で上限に達する
  • 古い50イベントが破棄される
  • 警告ログが出力される(10イベントごと)
  • システムがクラッシュしない

テスト4: 受信側の一括処理

準備:

  • ホスト: グループ作成
  • メンバー: グループ参加

ホスト側:

[メッセージ1] を送る
[メッセージ2] を送る
[メッセージ3] を送る

メンバー側:

[メッセージ1] を受け取ったとき
  (変数1) を (1) にする

[メッセージ2] を受け取ったとき
  (変数2) を (1) にする

[メッセージ3] を受け取ったとき
  (変数3) を (1) にする

期待結果:

  • メンバー側で3つのイベントが同一フレーム内で処理される
  • 変数1, 変数2, 変数3がすべて1になる
  • ログに「Broadcasting 3 events」と表示される

パラメータ設定値

パラメータ 理由
MAX_EVENT_QUEUE_SIZE 100 250ms × 400回 = 100秒分のバッファ(実用上十分)
eventBatchInterval 250ms 既存の設定を維持
MAX_BATCH_SIZE 1000 GraphQLペイロードサイズ制限(既存)

優先度

High - 大量イベント発火時にシステムが不安定になる実用上の問題

影響範囲

  • meshV2拡張機能を使用するすべてのプロジェクト
  • 特に「ずっと」ループ内でメッセージを発火するケース
  • センサーデータやゲームイベントをリアルタイム共有するケース

関連Issue


🤖 Generated with Claude Code

Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com