Browse Source

Add generic shutdown handling in manager and plugin

pull/1/head
Tyler Sommer 8 months ago
parent
commit
a0f5591825
Signed by: tyler-sommer GPG Key ID: C09C010500DBD008
7 changed files with 89 additions and 41 deletions
  1. +29
    -35
      cli/manager.go
  2. +0
    -1
      config/plugin.go
  3. +4
    -1
      event/plugin.go
  4. +11
    -1
      irc/irc.go
  5. +34
    -1
      plugin/plugin.go
  6. +10
    -1
      vm/plugin.go
  7. +1
    -1
      vm/scheduler.go

+ 29
- 35
cli/manager.go View File

@ -6,7 +6,6 @@ import (
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"github.com/sirupsen/logrus"
@ -24,6 +23,9 @@ type Config struct {
RootDir string `toml:"root_path"`
PluginDir string `toml:"plugin_path"`
ExtraPlugins []string `toml:"extra_plugins"`
// Specify additional plugins that are a part of the main executable.
LinkedPlugins []plugin.Initializer
}
type Manager struct {
@ -31,7 +33,7 @@ type Manager struct {
Config
sig chan os.Signal
stop chan os.Signal
}
func NewManager(rootDir string, extraPlugins ...string) (*Manager, error) {
@ -57,11 +59,20 @@ func NewManager(rootDir string, extraPlugins ...string) (*Manager, error) {
}
return &Manager{
plugins: m,
sig: make(chan os.Signal, 10),
stop: make(chan os.Signal, 10),
Config: conf,
}, nil
}
func (manager *Manager) Stop() {
select {
case <-manager.stop:
// already stopped
default:
close(manager.stop)
}
}
func (manager *Manager) Plugins() *plugin.Manager {
return manager.plugins
}
@ -84,6 +95,9 @@ func (manager *Manager) Start() error {
}
m.Register(plugin.InitializeFromFile(pl))
}
for _, pl := range manager.LinkedPlugins {
m.Register(pl)
}
if err := configure(m); err != nil {
return errors.Wrap(err, "unable to init extra plugins")
}
@ -110,9 +124,15 @@ func (manager *Manager) Start() error {
func (manager *Manager) Loop() error {
st := make(chan os.Signal, 10)
signal.Notify(st, syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGUSR2)
signal.Notify(manager.sig, os.Interrupt, syscall.SIGTERM)
signal.Notify(st, os.Interrupt, syscall.SIGTERM)
for {
select {
case <-manager.stop:
logrus.Infoln("shutting down")
if err := manager.Shutdown(); err != nil {
logrus.Warnln("error shutting down:", err)
}
return nil
case s := <-st:
switch s {
case syscall.SIGHUP:
@ -130,46 +150,20 @@ func (manager *Manager) Loop() error {
logrus.Warnln("unable to restart js vm:", err)
continue
}
case os.Interrupt:
fallthrough
case syscall.SIGTERM:
manager.Stop()
default:
logrus.Infoln("received signal", s, "but not doing anything with it")
}
case <-manager.sig:
logrus.Infoln("shutting down")
if err := manager.Shutdown(); err != nil {
logrus.Warnln("error shutting down:", err)
}
return nil
}
}
}
func (manager *Manager) Shutdown() error {
m := manager.plugins
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
myVM, err := vm.FromPlugins(m)
if err != nil {
logrus.Warnln("error shutting down vm:", err)
}
if err := myVM.Shutdown(); err != nil {
logrus.Warnln("error shutting down vm:", err)
}
wg.Done()
}()
wg.Add(1)
go func() {
d, err := event.FromPlugins(m)
if err != nil {
logrus.Warnln("error shutting down events:", err)
}
d.Stop()
wg.Done()
}()
wg.Wait()
m.Shutdown()
return nil
}


+ 0
- 1
config/plugin.go View File

@ -28,7 +28,6 @@ func ConfigurePlugin(m *plugin.Manager, opts ...SetupOption) error {
func Initialize(m *plugin.Manager) (plugin.Plugin, error) {
p := &configPlugin{}
m.OnPluginInit(p)
return p, nil
}


+ 4
- 1
event/plugin.go View File

@ -20,7 +20,6 @@ func FromPlugins(m *plugin.Manager) (*Dispatcher, error) {
func Initialize(m *plugin.Manager) (plugin.Plugin, error) {
p := &eventPlugin{NewDispatcher()}
m.OnPluginInit(p)
return p, nil
}
@ -35,3 +34,7 @@ func (p *eventPlugin) Name() string {
func (p *eventPlugin) HandlePluginInit(o plugin.Plugin) {
p.dispatcher.Emit("plugin.INIT", map[string]interface{}{"name": o.Name(), "plugin": o})
}
func (p *eventPlugin) HandleShutdown() {
p.dispatcher.Stop()
}

+ 11
- 1
irc/irc.go View File

@ -100,7 +100,17 @@ func (conn *Connection) controlLoop() {
}
func NewManager(c *Config, ev *event.Dispatcher) *Manager {
return &Manager{config: c, events: ev}
m := &Manager{config: c, events: ev}
if c.AutoConnect {
go func() {
logrus.Infoln("Automatically connecting...")
<-time.After(250 * time.Millisecond)
if err := m.Connect(); err != nil {
logrus.Errorln("failed to autoconnect:", err)
}
}()
}
return m
}
func (m *Manager) Do(fn func(*Connection) error) error {


+ 34
- 1
plugin/plugin.go View File

@ -16,6 +16,10 @@ type InitHandler interface {
HandlePluginInit(Plugin)
}
type ShutdownHandler interface {
HandleShutdown()
}
type Initializer interface {
Initialize(*Manager) (Plugin, error)
}
@ -54,7 +58,8 @@ type Manager struct {
loaded map[string]Plugin
onInit []InitHandler
onInit []InitHandler
onShutdown []ShutdownHandler
mu sync.RWMutex
}
@ -76,6 +81,28 @@ func (m *Manager) OnPluginInit(h InitHandler) {
m.onInit = append(m.onInit, h)
}
func (m *Manager) OnShutdown(h ShutdownHandler) {
m.mu.Lock()
defer m.mu.Unlock()
m.onShutdown = append(m.onShutdown, h)
}
func (m *Manager) Shutdown() {
m.mu.RLock()
hs := make([]ShutdownHandler, len(m.onShutdown))
copy(hs, m.onShutdown)
m.mu.RUnlock()
wg := &sync.WaitGroup{}
for _, h := range m.onShutdown {
wg.Add(1)
go func(sh ShutdownHandler) {
sh.HandleShutdown()
wg.Done()
}(h)
}
wg.Wait()
}
func (m *Manager) Lookup(name string) (Plugin, error) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -126,6 +153,12 @@ func (m *Manager) Configure() []error {
if !ok {
// not already loaded, add it
m.loaded[pn] = plg
if ih, ok := plg.(InitHandler); ok {
m.onInit = append(m.onInit, ih)
}
if sh, ok := plg.(ShutdownHandler); ok {
m.onShutdown = append(m.onShutdown, sh)
}
}
// unlock outside of any conditional
m.mu.Unlock()


+ 10
- 1
vm/plugin.go View File

@ -36,7 +36,6 @@ func FromPlugins(m *plugin.Manager) (*VM, error) {
func Initialize(m *plugin.Manager) (plugin.Plugin, error) {
p := &vmPlugin{}
m.OnPluginInit(p)
return p, nil
}
@ -86,3 +85,13 @@ func (p *vmPlugin) HandlePluginInit(o plugin.Plugin) {
}
}
}
func (p *vmPlugin) HandleShutdown() {
if p.vm == nil {
logrus.Warnln("vm: shutting down uninitialized plugin")
return
}
if err := p.vm.Shutdown(); err != nil {
logrus.Warnln("error shutting down vm:", err)
}
}

+ 1
- 1
vm/scheduler.go View File

@ -171,7 +171,7 @@ func (s *scheduler) stop() error {
select {
case <-time.After(500 * time.Millisecond):
// soft timeout, try emptying the jobs queue and interrupting execution
logrus.Warnln("vm soft time out expired, flushing remaining jobs without done them")
logrus.Warnln("vm soft time out expired, flushing remaining jobs without processing them")
s.drain()
s.runtime.inner.Interrupt("vm is shutting down")
// requeue the stop job since we just flushed it down the drain


Loading…
Cancel
Save