You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

324 lines
7.0 KiB

#!/usr/bin/env perl
use warnings;
use strict;
#
# Requires
# libjson-perl
#
# Magic Markers
#
#%# family=auto
#%# capabilities=autoconf suggest
package JsonUDP;
use warnings;
use strict;
use IO::Socket::INET;
use JSON;
sub new {
my $class = shift;
my $port = shift || 5644;
my $self = {};
bless($self, $class);
$self->{sock} = IO::Socket::INET->new(
PeerAddr => '127.0.0.1',
PeerPort => $port,
Proto => 'udp',
);
$self->{json} = JSON->new->utf8->relaxed->pretty->canonical;
$self->{tag} = 0;
$self->{debug} = 0;
return $self;
}
sub _tx {
my $self = shift;
my $msgline = shift;
return $self->{sock}->send($msgline);
}
sub _rx {
my $self = shift;
my $tag = shift;
my $db = [];
my $error;
while(1) {
my $jsontxt;
$self->{sock}->recv($jsontxt,1024);
if ($self->{debug}) {
print($jsontxt);
}
my $msg = $self->{json}->decode($jsontxt);
# ignore packets not for us
if ($msg->{_tag} ne $tag) {
next;
}
# Save most recent error for return
if ($msg->{_type} eq 'error') {
$error = $msg;
next;
}
if ($msg->{_type} eq 'end') {
if ($error) {
# TODO: an error channel
return undef;
}
return $db;
}
if ($msg->{_type} eq 'row') {
delete $msg->{_tag};
delete $msg->{_type};
push @$db, $msg;
next;
}
# Ignore any unknown _type
}
}
sub read {
my $self = shift;
my $cmdline = shift;
my $tag = $self->{tag}++;
# TODO:
# Add a read cache
$self->_tx(sprintf("r %i %s", $tag, $cmdline));
return $self->_rx($tag);
}
1;
package main;
use warnings;
use strict;
my $config = {
edge_pkts => {
p2p_tx_pkt => {
label => 'Peer to Peer tx rate',
type => 'DERIVE',
min => 0,
},
p2p_rx_pkt => {
label => 'Peer to Peer rx rate',
type => 'DERIVE',
min => 0,
},
super_tx_pkt => {
label => 'Peer to Supernode tx rate',
type => 'DERIVE',
min => 0,
},
super_rx_pkt => {
label => 'Peer to Supernode rx rate',
type => 'DERIVE',
min => 0,
},
super_broadcast_tx_pkt => {
label => 'Broadcast to Supernode tx rate',
type => 'DERIVE',
min => 0,
},
super_broadcast_rx_pkt => {
label => 'Broadcast to Supernode rx rate',
type => 'DERIVE',
min => 0,
},
transop_tx_pkt => {
label => 'Transform tx rate',
type => 'DERIVE',
min => 0,
},
transop_rx_pkt => {
label => 'Transform rx rate',
type => 'DERIVE',
min => 0,
},
},
edge_counts => {
edges => {
label => 'Current known peers',
type => 'GAUGE',
},
supernodes => {
label => 'Current known supernodes',
type => 'GAUGE',
},
},
supernode_pkts => {
errors_tx_pkt => {
label => 'Error rate',
type => 'DERIVE',
min => 0,
},
reg_super_rx_pkt => {
label => 'Connect rate',
type => 'DERIVE',
min => 0,
},
reg_super_nak => {
label => 'Connect error rate',
type => 'DERIVE',
min => 0,
},
forward_tx_pkt => {
label => 'Packets forwarded rate',
type => 'DERIVE',
min => 0,
},
broadcast_tx_pkt => {
label => 'Broadcast packet rate',
type => 'DERIVE',
min => 0,
},
},
supernode_counts => {
edges => {
label => 'Current known edges',
type => 'GAUGE',
},
communities => {
label => 'Current known communities',
type => 'GAUGE',
},
},
};
my $fetchinfo = {
edge_pkts => {
port => 5644,
read => "packetstats",
},
edge_counts => {
port => 5644,
count => [
"edges",
"supernodes",
],
},
supernode_pkts => {
port => 5645,
read => "packetstats",
},
supernode_counts => {
port => 5645,
count => [
"edges",
"communities",
],
},
};
sub do_config {
my $rpc = shift;
my $name = shift;
print("graph_title n2n $name status\n");
print("graph_category network\n");
my @names;
while (my ($fieldname, $field) = each(%{$config->{$name}})) {
push @names, $fieldname;
while (my ($key, $val) = each(%{$field})) {
print($fieldname.'.'.$key," ",$val,"\n");
}
}
# Ensure stable order
print("graph_order ", join(' ', sort(@names)), "\n");
}
sub do_fetch {
my $rpc = shift;
my $name = shift;
my $db;
my $read_table = $fetchinfo->{$name}->{read};
if (defined($read_table)) {
$db = $rpc->read($read_table);
for my $row (@$db) {
my $type = $row->{type};
delete $row->{type};
while (my ($key, $val) = each(%{$row})) {
my $metricname = $type."_".$key;
print($metricname,".value ",$val,"\n");
}
}
}
my $count_tables = $fetchinfo->{$name}->{count};
if (defined($count_tables)) {
for my $table (@{$count_tables}) {
$db = $rpc->read($table);
print($table,".value ", scalar(@$db), "\n");
}
}
}
sub do_autoconf {
# quick check to see if this plugin should be enabled
if (`pgrep supernode`) {
print("yes\n");
} elsif (`pgrep edge`) {
print("yes\n");
} else {
print("no - neither edge nor supernode are running\n");
}
}
sub do_suggest {
my $ports = {};
if (`pgrep supernode`) {
$ports->{5645}=1;
}
if (`pgrep edge`) {
$ports->{5644}=1;
}
while (my ($name, $info) = each(%{$fetchinfo})) {
my $port = $info->{port};
next if (!defined($port)); # this not a real fetchinfo
next if (!defined($ports->{$port})); # not linked to a running daemon
print($name,"\n");
}
}
my $subc = {
'fetch' => \&do_fetch,
'config' => \&do_config,
'autoconf' => \&do_autoconf,
'suggest' => \&do_suggest,
};
sub main() {
my $name = $ARGV[1] || $0;
$name =~ s%^.*/n2n_([^/]+)%$1%;
my $port = $fetchinfo->{$name}->{port};
my $rpc = JsonUDP->new($port);
my $cmd = $ARGV[0];
if (!defined($cmd)) {
$cmd = 'fetch';
}
my $func = $subc->{$cmd};
if (!defined($func)) {
die("bad sub command");
}
return $func->($rpc, $name);
}
main();