Skip to content

Commit ae83420

Browse files
author
Jason Playne
committedNov 7, 2014
now with a working subscription model for checks
1 parent 1909804 commit ae83420

10 files changed

+144
-64
lines changed
 

‎run.sh

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
#!/bin/sh
22
export GOPATH=`pwd`
3+
4+
# set DEBUG to a non empty value to display debug information
5+
export DEBUG=
36
go run src/sensu-client.go --config-file=src/config/config.json

‎src/plugins/checks/external.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package checks
22

33
import (
4+
"fmt"
45
"os/exec"
56
"plugins"
67
)
@@ -14,17 +15,20 @@ type ExternalCheck struct {
1415
func (ec *ExternalCheck) Init(config plugins.PluginConfig) (string, error) {
1516
// make sure that the command exists?
1617
ec.name = config.Name
17-
em.command = config.Command
18+
ec.command = config.Command
1819
return ec.name, nil
1920
}
2021

2122
func (ec *ExternalCheck) Gather(r *plugins.Result) error {
22-
cmd := exec.Command(em.command)
23+
fmt.Printf("About to start command\n")
24+
cmd := exec.Command(ec.command)
2325
err := cmd.Run()
2426
ec.checkStatus = plugins.UNKNOWN
2527

2628
out, errOut := cmd.CombinedOutput()
27-
r.SetOutput(string(out))
29+
fmt.Println("Output BELOW")
30+
fmt.Println(out)
31+
r.Add(string(out))
2832
if nil == errOut {
2933
ec.checkStatus = plugins.OK
3034
} else {

‎src/plugins/metrics/external.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package metrics
22

33
import (
4+
"fmt"
45
"os/exec"
56
"plugins"
67
)
@@ -18,12 +19,16 @@ func (em *ExternalMetric) Init(config plugins.PluginConfig) (string, error) {
1819
}
1920

2021
func (em *ExternalMetric) Gather(r *plugins.Result) error {
22+
fmt.Printf("About to start command\n")
2123
cmd := exec.Command(em.command)
2224
err := cmd.Run()
2325

2426
out, errOut := cmd.CombinedOutput()
27+
fmt.Println("Output BELOW")
28+
fmt.Println(out)
29+
2530
if nil == errOut {
26-
r.SetOutput(string(bytes))
31+
r.Add(em.name + " " + string(out))
2732
}
2833

2934
return err

‎src/plugins/plugins.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ type SensuPluginInterface interface {
1313
}
1414

1515
type PluginConfig struct {
16-
Type string
17-
Name string
18-
Command string
19-
Args []string
20-
Handlers []string
21-
Standalone bool
22-
Interval time.Duration
16+
Type string `json:"type"`
17+
Name string `json:"name"`
18+
Command string `json:"command"`
19+
Args []string `json:"args"`
20+
Handlers []string `json:"handlers"`
21+
Standalone bool `json:"standalone"`
22+
Interval time.Duration `json:"interval"`
2323
}
2424

2525
type Status int // check status - not used for metrics

‎src/sensu-client.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ func main() {
2525
os.Exit(1)
2626
}
2727

28+
log.SetPrefix("Sensu: ")
29+
2830
processes := []sensu.Processor{
29-
new(sensu.Keepalive),
31+
//new(sensu.Keepalive),
3032
new(sensu.Subscriber),
31-
sensu.NewPluginProcessor(),
33+
//sensu.NewPluginProcessor(),
3234
}
3335
c := sensu.NewClient(settings, processes)
3436

‎src/sensu/check.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import (
55
)
66

77
type Check struct {
8-
Name string
9-
Command string
8+
Name string `json:"name"`
9+
Command string `json:"command"`
1010
Executed int
1111
Status int
12+
Issued int `json:"issued"`
1213
Output string
1314
Duration float64
1415
Timeout int

‎src/sensu/keepalive.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55
"github.com/bitly/go-simplejson"
66
"github.com/streadway/amqp"
77
"log"
8+
"os"
89
"time"
910
)
1011

1112
type Keepalive struct {
1213
q MessageQueuer
1314
config *Config
1415
close chan bool
16+
logger *log.Logger
1517
}
1618

1719
const keepaliveInterval = 20 * time.Second
@@ -23,6 +25,7 @@ func (k *Keepalive) Init(q MessageQueuer, config *Config) error {
2325
); err != nil {
2426
return fmt.Errorf("Exchange Declare: %s", err)
2527
}
28+
k.logger = log.New(os.Stdout, "Keepalive: ", log.LstdFlags)
2629

2730
k.q = q
2831
k.config = config
@@ -61,10 +64,10 @@ func (k *Keepalive) publish(payload amqp.Publishing) {
6164
"",
6265
payload,
6366
); err != nil {
64-
log.Printf("keepalive.publish: %v", err)
67+
k.logger.Printf("keepalive.publish: %v", err)
6568
return
6669
}
67-
log.Print("Keepalive published")
70+
k.logger.Print("Keepalive published")
6871
}
6972

7073
func createKeepalivePayload(clientConfig *simplejson.Json, timestamp time.Time) amqp.Publishing {

‎src/sensu/plugin_processor.go

+44-36
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sensu
33
import (
44
"fmt"
55
"log"
6+
"os"
67
"plugins"
78
"plugins/checks"
89
"plugins/metrics"
@@ -18,6 +19,7 @@ type PluginProcessor struct {
1819
jobsConfig map[string]plugins.PluginConfig
1920
close chan bool
2021
results chan *Result
22+
logger *log.Logger
2123
}
2224

2325
// used to create a new processor instance.
@@ -26,6 +28,7 @@ func NewPluginProcessor() *PluginProcessor {
2628
proc.jobs = make(map[string]plugins.SensuPluginInterface)
2729
proc.jobsConfig = make(map[string]plugins.PluginConfig)
2830
proc.results = make(chan *Result, 500) // queue of 500 buffered results
31+
proc.logger = log.New(os.Stdout, "Plugin: ", log.LstdFlags)
2932

3033
return proc
3134
}
@@ -78,10 +81,10 @@ func newCheckConfig(json interface{}) plugins.PluginConfig {
7881
func (p *PluginProcessor) AddJob(job plugins.SensuPluginInterface, checkConfig plugins.PluginConfig) {
7982
name, err := job.Init(checkConfig)
8083
if nil != err {
81-
log.Printf("Failed to initialise check: (%s) %s\n", name, err)
84+
p.logger.Printf("Failed to initialise check: (%s) %s\n", name, err)
8285
return
8386
}
84-
log.Printf("Scheduling job: %s (%s) every %d seconds", name, checkConfig.Command, checkConfig.Interval)
87+
p.logger.Printf("Scheduling job: %s (%s) every %d seconds", name, checkConfig.Command, checkConfig.Interval)
8588

8689
p.jobs[name] = job
8790
p.jobsConfig[name] = checkConfig
@@ -107,39 +110,12 @@ func (p *PluginProcessor) Init(q MessageQueuer, config *Config) error {
107110
for check_type, checkConfigInterface := range checks_config {
108111
checkConfig, ok := checkConfigInterface.(map[string]interface{})
109112
if !ok {
110-
log.Printf("Failed to parse config: %", check_type)
113+
p.logger.Printf("Failed to parse config: %", check_type)
111114
continue
112115
}
113116

114117
config := newCheckConfig(checkConfig)
115-
116-
// see if we can handle this check using one of our build in ones
117-
switch check_type {
118-
case "cpu_metrics":
119-
check = new(metrics.CpuStats)
120-
case "display_metrics":
121-
check = new(metrics.DisplayStats)
122-
case "interface_metrics":
123-
check = new(metrics.NetworkInterfaceStats)
124-
case "load_metrics":
125-
check = new(metrics.LoadStats)
126-
case "memory_metrics":
127-
check = new(metrics.MemoryStats)
128-
case "uptime_metrics":
129-
check = new(metrics.UptimeStats)
130-
case "wireless-ap_metrics":
131-
check = new(metrics.WirelessStats)
132-
case "check_procs":
133-
check = new(checks.ProcessCheck)
134-
default:
135-
if "metric" == config.Type {
136-
// we have a metric!
137-
check = new(metrics.ExternalMetric)
138-
} else {
139-
// we have a check!
140-
check = new(checks.ExternalCheck)
141-
}
142-
}
118+
check = getCheckHandler(check_type, config.Type)
143119

144120
config.Name = check_type
145121
p.AddJob(check, config)
@@ -153,17 +129,18 @@ func (p *PluginProcessor) Init(q MessageQueuer, config *Config) error {
153129
func (p *PluginProcessor) Start() {
154130
go p.publish()
155131

132+
clientConfig := p.config.Data().Get("client")
133+
156134
// start our result publisher thread
157135
for job_name, job := range p.jobs {
158136
go func(theJobName string, theJob plugins.SensuPluginInterface) {
159137
config := p.jobsConfig[job_name]
160138

161-
log.Printf("Starting job: %s", theJobName)
162139
reset := make(chan bool)
163140

164141
timer := time.AfterFunc(0, func() {
165-
log.Printf("Gathering: %s", theJobName)
166-
result := NewResult(p.config.Data().Get("client"), theJobName)
142+
p.logger.Printf("Gathering: %s", theJobName)
143+
result := NewResult(clientConfig, theJobName)
167144
result.SetCommand(config.Command)
168145

169146
presult := new(plugins.Result)
@@ -174,7 +151,7 @@ func (p *PluginProcessor) Start() {
174151

175152
if nil != err {
176153
// returned an error - we should stop this job from running
177-
log.Printf("Failed to gather stat: %s. %v", theJobName, err)
154+
p.logger.Printf("Failed to gather stat: %s. %v", theJobName, err)
178155
reset <- false
179156
return
180157
}
@@ -216,11 +193,42 @@ func (p *PluginProcessor) publish() {
216193
case result := <-p.results:
217194
if result.HasOutput() {
218195
if err := p.q.Publish(RESULTS_QUEUE, "", result.GetPayload()); err != nil {
219-
log.Printf("Error Publishing Stats: %v. %v", err, result)
196+
p.logger.Printf("Error Publishing Stats: %v. %v", err, result)
220197
}
221198
}
222199
case <-p.close:
223200
return
224201
}
225202
}
226203
}
204+
205+
func getCheckHandler(check_type, config_type string) plugins.SensuPluginInterface {
206+
var check plugins.SensuPluginInterface
207+
switch check_type {
208+
case "cpu_metrics", "cpu_metrics.rb":
209+
check = new(metrics.CpuStats)
210+
case "display_metrics":
211+
check = new(metrics.DisplayStats)
212+
case "interface_metrics":
213+
check = new(metrics.NetworkInterfaceStats)
214+
case "load_metrics":
215+
check = new(metrics.LoadStats)
216+
case "memory_metrics":
217+
check = new(metrics.MemoryStats)
218+
case "uptime_metrics":
219+
check = new(metrics.UptimeStats)
220+
case "wireless-ap_metrics":
221+
check = new(metrics.WirelessStats)
222+
case "check_procs":
223+
check = new(checks.ProcessCheck)
224+
default:
225+
if "metric" == config_type {
226+
// we have a metric!
227+
check = new(metrics.ExternalMetric)
228+
} else {
229+
// we have a check!
230+
check = new(checks.ExternalCheck)
231+
}
232+
}
233+
return check
234+
}

‎src/sensu/plugin_results.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/json"
88
"fmt"
99
"log"
10+
"os"
1011
"strings"
1112
"time"
1213
)
@@ -139,7 +140,9 @@ func (result *Result) toJson() []byte {
139140
if nil != err {
140141
log.Panic(err)
141142
}
142-
log.Printf(string(json)) // handy json debug printing
143+
if "" != os.Getenv("DEBUG") {
144+
log.Printf(string(json)) // handy json debug printing
145+
}
143146
return json
144147
}
145148

‎src/sensu/subscriber.go

+61-10
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,39 @@
11
package sensu
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"github.com/streadway/amqp"
67
"log"
8+
"os"
9+
"plugins"
10+
"time"
711
)
812

913
type Subscriber struct {
1014
deliveries <-chan amqp.Delivery
1115
done chan error
16+
logger *log.Logger
17+
config *Config
18+
q MessageQueuer
1219
}
1320

1421
func (s *Subscriber) Init(q MessageQueuer, c *Config) error {
1522

23+
s.config = c
24+
s.q = q
1625
config_name, _ := c.Data().Get("client").Get("name").String()
1726
config_ver, _ := c.Data().Get("client").Get("version").String()
1827

19-
queue_name := config_name + "-" + config_ver
20-
log.Printf("Declaring Queue: %s", queue_name)
28+
s.logger = log.New(os.Stdout, "Subscriptions: ", log.LstdFlags)
29+
30+
queue_name := fmt.Sprintf("%s-%s-%d", config_name, config_ver, time.Now().Unix())
31+
s.logger.Printf("Declaring Queue: %s", queue_name)
2132
queue, err := q.QueueDeclare(queue_name)
2233
if err != nil {
2334
return fmt.Errorf("Queue Declare: %s", err)
2435
}
25-
log.Printf("declared Queue")
36+
s.logger.Printf("declared Queue")
2637

2738
var subscriptions []string
2839
subscriptions, err = c.Data().GetPath("client", "subscriptions").StringArray()
@@ -31,20 +42,20 @@ func (s *Subscriber) Init(q MessageQueuer, c *Config) error {
3142
}
3243

3344
for _, sub := range subscriptions {
34-
log.Printf("declaring Exchange (%q)", sub)
45+
s.logger.Printf("declaring Exchange (%q)", sub)
3546
err = q.ExchangeDeclare(sub, "fanout")
3647
if err != nil {
3748
return fmt.Errorf("Exchange Declare: %s", err)
3849
}
3950

40-
log.Printf("binding %s to Exchange %q", queue.Name, sub)
51+
s.logger.Printf("Binding %s to Exchange %q", queue.Name, sub)
4152
err = q.QueueBind(queue.Name, "", sub)
4253
if err != nil {
4354
return fmt.Errorf("Queue Bind: %s", err)
4455
}
4556
}
4657

47-
log.Printf("starting Consume")
58+
s.logger.Printf("Starting Consume on queue: " + queue.Name)
4859
s.deliveries, err = q.Consume(queue.Name, "")
4960
if err != nil {
5061
return fmt.Errorf("Queue Consume: %s", err)
@@ -55,7 +66,7 @@ func (s *Subscriber) Init(q MessageQueuer, c *Config) error {
5566
}
5667

5768
func (s *Subscriber) Start() {
58-
go handle(s.deliveries, s.done)
69+
go s.handle(s.deliveries, s.done)
5970

6071
// for {
6172
// select {
@@ -69,15 +80,55 @@ func (s *Subscriber) Stop() {
6980
s.done <- nil
7081
}
7182

72-
func handle(deliveries <-chan amqp.Delivery, done chan error) {
83+
func (s *Subscriber) handle(deliveries <-chan amqp.Delivery, done chan error) {
84+
clientConfig := s.config.Data().Get("client")
7385
for d := range deliveries {
74-
log.Printf(
86+
s.logger.Printf(
7587
"got %dB delivery: [%v] %q",
7688
len(d.Body),
7789
d.DeliveryTag,
7890
d.Body,
7991
)
92+
93+
checkConfig := new(plugins.PluginConfig)
94+
err := json.Unmarshal(d.Body, checkConfig)
95+
if nil != err {
96+
s.logger.Printf("Unable to decode message, skipping...")
97+
d.Reject(false)
98+
continue
99+
}
100+
101+
//s.logger.Printf("Our check consists of: %+v", checkConfig)
102+
103+
theJob := getCheckHandler(checkConfig.Name, checkConfig.Type)
104+
105+
result := NewResult(clientConfig, checkConfig.Name)
106+
result.SetCommand(checkConfig.Command)
107+
108+
presult := new(plugins.Result)
109+
110+
theJob.Init(*checkConfig)
111+
112+
err = theJob.Gather(presult)
113+
result.SetOutput(presult.Output())
114+
result.SetCheckStatus(theJob.GetStatus())
115+
116+
if nil != err {
117+
// returned an error - we should stop this job from running
118+
s.logger.Printf("Failed to gather stat: %s. %v", checkConfig.Name, err)
119+
return
120+
}
121+
122+
// and now send it back
123+
if result.HasOutput() {
124+
if err = s.q.Publish(RESULTS_QUEUE, "", result.GetPayload()); err != nil {
125+
s.logger.Printf("Error Publishing Stats: %v. %v", err, result)
126+
}
127+
}
128+
129+
d.Ack(false)
130+
80131
}
81-
log.Printf("handle: deliveries channel closed")
132+
s.logger.Printf("handle: deliveries channel closed")
82133
done <- nil
83134
}

0 commit comments

Comments
 (0)
Please sign in to comment.