feat(plugin): plugin ordering · webhookx-io/webhookx@e8a5540
@@ -17,6 +17,7 @@ import (
1717"github.com/webhookx-io/webhookx/db"
1818"github.com/webhookx-io/webhookx/db/entities"
1919"github.com/webhookx-io/webhookx/db/migrator"
20+"github.com/webhookx-io/webhookx/db/query"
2021"github.com/webhookx-io/webhookx/dispatcher"
2122"github.com/webhookx-io/webhookx/eventbus"
2223"github.com/webhookx-io/webhookx/mcache"
@@ -29,6 +30,7 @@ import (
2930"github.com/webhookx-io/webhookx/pkg/reports"
3031"github.com/webhookx-io/webhookx/pkg/schedule"
3132"github.com/webhookx-io/webhookx/pkg/stats"
33+"github.com/webhookx-io/webhookx/pkg/store"
3234"github.com/webhookx-io/webhookx/pkg/taskqueue"
3335"github.com/webhookx-io/webhookx/pkg/tracing"
3436"github.com/webhookx-io/webhookx/plugins"
@@ -37,6 +39,7 @@ import (
3739"github.com/webhookx-io/webhookx/service"
3840"github.com/webhookx-io/webhookx/status"
3941"github.com/webhookx-io/webhookx/status/health"
42+"github.com/webhookx-io/webhookx/utils"
4043"github.com/webhookx-io/webhookx/worker"
4144"github.com/webhookx-io/webhookx/worker/deliverer"
4245"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
@@ -322,6 +325,43 @@ func (app *Application) initialize() error {
322325return nil
323326}
324327328+func (app *Application) buildPluginIterator(version string) (*plugins.Iterator, error) {
329+app.log.Debugw("building plugin iterator", "version", version)
330+list, err := app.db.Plugins.List(context.TODO(), &query.PluginQuery{})
331+if err != nil {
332+return nil, fmt.Errorf("failed to query plugins from database: %v", err)
333+ }
334+iterator, err := plugins.NewIterator(version, list)
335+if err != nil {
336+return nil, fmt.Errorf("failed to new plugin iterator: %v", err)
337+ }
338+return iterator, nil
339+}
340+341+func (app *Application) scheduleRebuildPluginIterator() {
342+app.bus.Subscribe("plugin.crud", func(_ interface{}) {
343+store.Set("plugin:version", utils.UUID())
344+ })
345+346+app.scheduler.AddTask(&schedule.Task{
347+Name: "app.plugin_rebuild",
348+Interval: time.Second,
349+Do: func() {
350+version := store.GetDefault("plugin:version", "init").(string)
351+if plugins.LoadIterator().Version == version {
352+return
353+ }
354+355+iterator, err := app.buildPluginIterator(version)
356+if err != nil {
357+app.log.Error(err)
358+return
359+ }
360+plugins.SetIterator(iterator)
361+ },
362+ })
363+}
364+325365func registerEventHandler(bus *eventbus.EventBus) {
326366bus.ClusteringSubscribe(eventbus.EventCRUD, func(data []byte) {
327367eventData := &eventbus.CrudData{}
@@ -420,6 +460,15 @@ func (app *Application) Start() error {
420460 }
421461 }))
422462463+if app.worker != nil || app.gateway != nil {
464+iterator, err := app.buildPluginIterator("init")
465+if err != nil {
466+return fmt.Errorf("failed to build plugin iterator: %s", err)
467+ }
468+plugins.SetIterator(iterator)
469+app.scheduleRebuildPluginIterator()
470+ }
471+423472if err := app.bus.Start(); err != nil {
424473return err
425474 }