feat(plugin): plugin ordering · webhookx-io/webhookx@e8a5540

@@ -17,6 +17,7 @@ import (

1717

"github.com/webhookx-io/webhookx/db"

1818

"github.com/webhookx-io/webhookx/db/entities"

1919

"github.com/webhookx-io/webhookx/db/migrator"

20+

"github.com/webhookx-io/webhookx/db/query"

2021

"github.com/webhookx-io/webhookx/dispatcher"

2122

"github.com/webhookx-io/webhookx/eventbus"

2223

"github.com/webhookx-io/webhookx/mcache"

@@ -29,6 +30,7 @@ import (

2930

"github.com/webhookx-io/webhookx/pkg/reports"

3031

"github.com/webhookx-io/webhookx/pkg/schedule"

3132

"github.com/webhookx-io/webhookx/pkg/stats"

33+

"github.com/webhookx-io/webhookx/pkg/store"

3234

"github.com/webhookx-io/webhookx/pkg/taskqueue"

3335

"github.com/webhookx-io/webhookx/pkg/tracing"

3436

"github.com/webhookx-io/webhookx/plugins"

@@ -37,6 +39,7 @@ import (

3739

"github.com/webhookx-io/webhookx/service"

3840

"github.com/webhookx-io/webhookx/status"

3941

"github.com/webhookx-io/webhookx/status/health"

42+

"github.com/webhookx-io/webhookx/utils"

4043

"github.com/webhookx-io/webhookx/worker"

4144

"github.com/webhookx-io/webhookx/worker/deliverer"

4245

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

@@ -322,6 +325,43 @@ func (app *Application) initialize() error {

322325

return nil

323326

}

324327328+

func (app *Application) buildPluginIterator(version string) (*plugins.Iterator, error) {

329+

app.log.Debugw("building plugin iterator", "version", version)

330+

list, err := app.db.Plugins.List(context.TODO(), &query.PluginQuery{})

331+

if err != nil {

332+

return nil, fmt.Errorf("failed to query plugins from database: %v", err)

333+

}

334+

iterator, err := plugins.NewIterator(version, list)

335+

if err != nil {

336+

return nil, fmt.Errorf("failed to new plugin iterator: %v", err)

337+

}

338+

return iterator, nil

339+

}

340+341+

func (app *Application) scheduleRebuildPluginIterator() {

342+

app.bus.Subscribe("plugin.crud", func(_ interface{}) {

343+

store.Set("plugin:version", utils.UUID())

344+

})

345+346+

app.scheduler.AddTask(&schedule.Task{

347+

Name: "app.plugin_rebuild",

348+

Interval: time.Second,

349+

Do: func() {

350+

version := store.GetDefault("plugin:version", "init").(string)

351+

if plugins.LoadIterator().Version == version {

352+

return

353+

}

354+355+

iterator, err := app.buildPluginIterator(version)

356+

if err != nil {

357+

app.log.Error(err)

358+

return

359+

}

360+

plugins.SetIterator(iterator)

361+

},

362+

})

363+

}

364+325365

func registerEventHandler(bus *eventbus.EventBus) {

326366

bus.ClusteringSubscribe(eventbus.EventCRUD, func(data []byte) {

327367

eventData := &eventbus.CrudData{}

@@ -420,6 +460,15 @@ func (app *Application) Start() error {

420460

}

421461

}))

422462463+

if app.worker != nil || app.gateway != nil {

464+

iterator, err := app.buildPluginIterator("init")

465+

if err != nil {

466+

return fmt.Errorf("failed to build plugin iterator: %s", err)

467+

}

468+

plugins.SetIterator(iterator)

469+

app.scheduleRebuildPluginIterator()

470+

}

471+423472

if err := app.bus.Start(); err != nil {

424473

return err

425474

}