#!/usr/bin/perl -w # ============================================================================== # IMIP COMMUNICATIONS NODE # - foundation of a state-machine networking daemon # ============================================================================== use strict; use IO::Select; use Fcntl; use Errno qw(EWOULDBLOCK); use Getopt::Std; $| = 1; # for debugging # ------------------------------------------------------------------------------ # constants my $MAX_TIME_DIFF = 5; # a time difference of 5 seconds between computers is unacceptable my $MSG_EOL = "\015\012"; my $MSG_TERM = $MSG_EOL; my $MSG_EOL_ACCEPT = '\015?\012'; my $MSG_TERM_ACCEPT = "$MSG_EOL_ACCEPT$MSG_EOL_ACCEPT"; my $MAX_CHUNK = 62*1024; # ------------------------------------------------------------------------------ # options use vars qw( $opt_d $opt_D ); getopts('dD'); my $debug = $opt_D ? 2 : $opt_d ? 1 : 0; # ------------------------------------------------------------------------------ # file my variables # hash of connections by socket (not tied) my %con = (); # connection handlers, by connection type my %readh = (); my %closingh = (); my %errorh = (); # select objects my $select_read = IO::Select->new; my $select_write = IO::Select->new; my $select_error = IO::Select->new; # timeouts my %timeouts_by_key = (); # by unique key my %n_timeouts_by_time = (); # number of timeouts set for a particular time my %timeouts_by_time = (); my $next_timeout_time; my $max_timeout_time = 0; # for monitoring my $n_selects = 0; my $n_msg_recv = 0; my $n_msg_sent = 0; # ------------------------------------------------------------------------------ # set a timeout sub set_timeout { my $key = shift; # used to be $con by default my ($delay, $sub, $param) = @_; $param = $key unless defined $param; # default parameter is the key my $time = time + $delay; vmessage(undef, "TIMEOUT SET: $key $delay"); # does not print con fileno my $timeout = { key => $key, sub => $sub, time => $time, delay => $delay, param => $param }; $timeouts_by_key{$key} = $timeout; push @{$timeouts_by_time{$time}}, $timeout; if ($n_timeouts_by_time{$time} ++ == 0) { if ($time > $max_timeout_time) { $max_timeout_time = $time; } if (not defined $next_timeout_time or $time < $next_timeout_time) { $next_timeout_time = $time; } } } # ------------------------------------------------------------------------------ # cancel a timeout sub cancel_timeout { my $key = shift; vmessage(undef, "TIMEOUT CANCELLED: $key"); my $timeout = delete $timeouts_by_key{$key}; # debug / safety check unless (defined $timeout) { message(undef, "attempted to cancel non-existant timeout $key"); return; } my $time = delete $timeout->{time}; # might want to look at a more efficient way of doing this - especially if we have any long timeouts! if (-- $n_timeouts_by_time{$time} == 0) { delete $n_timeouts_by_time{$time}; delete $timeouts_by_time{$time}; if ($time == $next_timeout_time) { while(1) { $next_timeout_time++; if ($next_timeout_time > $max_timeout_time) { undef $next_timeout_time; $max_timeout_time = 0; last; } last if $n_timeouts_by_time{$next_timeout_time}; } } if ($time == $max_timeout_time) { do { $max_timeout_time--; } until $n_timeouts_by_time{$max_timeout_time}; } } } # ------------------------------------------------------------------------------ # reset a timeout sub reset_timeout { my $key = shift; vmessage(undef, "TIMEOUT RESET: $key"); my $timeout = $timeouts_by_key{$key}; # debug / safety check unless (defined $timeout) { message(undef, "attempted to reset non-existant timeout $key"); return; } cancel_timeout($key); set_timeout($key, $timeout->{delay}, $timeout->{sub}); } # ------------------------------------------------------------------------------ # debugging - how is the timeout system? sub timeout_status { print "next: $next_timeout_time max: $max_timeout_time\n"; if (defined $next_timeout_time) { for my $t ($next_timeout_time .. $max_timeout_time) { if (exists $timeouts_by_time{$t}) { print "$t : $n_timeouts_by_time{$t} : ", join ' ', map {$_->{key}} grep {$_->{time}} @{$timeouts_by_time{$t}}; print "\n"; } } } use Data::Dumper; print "by key: ", (Dumper \%timeouts_by_key), "\n"; print "by time: ", (Dumper \%timeouts_by_time), "\n"; } # ------------------------------------------------------------------------------ # the main loop of the state-machine sub main_loop { while (1) { # check for expired timeouts: my $time = time; while (defined $next_timeout_time and $next_timeout_time <= $time) { # there are expired timeout/s for my $timeout (grep {defined $_->{time}} @{$timeouts_by_time{$next_timeout_time}}) { vmessage(undef, "TIMEOUT EXPIRED: $timeout->{key}"); # call the timeout sub my $time_check = $timeout->{time}; $timeout->{sub}->($timeout->{param}); # cancel the timeout - but not if it's already cancelled! cancel_timeout($timeout->{key}) if exists $timeout->{time}; } } # determine how long to wait on this select my $select_timeout = defined $next_timeout_time ? $next_timeout_time - time : undef; vmessage(undef, "SELECTING: ". (defined $select_timeout ? "$select_timeout sec" : 'indefinitely')); # select my ($ready_to_read, $ready_to_write, $have_errors) = IO::Select->select($select_read, $select_write, $select_error, $select_timeout); $n_selects++; my ($handle, $con); # sockets that are ready to read for $handle (@$ready_to_read) { # call the connection handler for this connection $con = $con{$handle}; $readh{$con->{type}}->($con); } # sockets that are ready to write for $handle (@$ready_to_write) { $con = $con{$handle}; if ($con->{write_buf}) { my $bytes_to_write = length $con->{write_buf}; my $bytes_written = $handle->syswrite($con->{write_buf}, $bytes_to_write); if (not defined $bytes_written) { # write error if ($! == EWOULDBLOCK) { message($con, "Write would block"); } else { message($con, "Error writing to connection: $!"); $errorh{$con->{type}}->($con) if exists $errorh{$con->{type}}; # TODO: is this necessary? } } elsif ($bytes_written == $bytes_to_write) { # wrote all the buffer $select_write->remove($handle); $con->{write_buf} = ''; } else { # wrote part of the buffer (or maybe nothing?) substr $con->{write_buf}, 0, $bytes_written, ''; } } } # sockets that have errors for $handle (@$have_errors) { $con = $con{$handle}; message($con, "Error on socket"); $errorh{$con->{type}}->($con) if exists $errorh{$con->{type}}; } } } # ------------------------------------------------------------------------------ # buffer data to be sent to a connection sub send_data { my $con = shift; my $message = shift; # die "cannot send an empty message" unless $message ne ''; $select_write->add($con->{socket}) if $con->{write_buf} eq ''; $con->{write_buf} .= $$message; } # ------------------------------------------------------------------------------ # send a message to a certain connection # the paramater spec is: connection your_ref my_ref (key1 val1)* [body] sub send_message { # print "send_message: ", join '-', @_; # print "\n"; my $con = shift; my ($msg_type, $your_ref, $my_ref) = splice @_, 0, 3; my $body = @_ % 2 ? pop : undef; my $message = "$msg_type $your_ref $my_ref$MSG_EOL"; $message .= (shift) .": ". (shift) . $MSG_EOL while @_; $message .= 'body_length: '. length($$body) . $MSG_EOL if $body; ++$n_msg_sent; $message .= $MSG_TERM; $message .= $$body if $body; send_data($con, \$message); # debug message: chomp (my $debug_msg = "SENT:\n". $message); vmessage($con, $debug_msg); } # ------------------------------------------------------------------------------ # print debugging messages sub message { return unless $debug; my $con = shift; print (((defined $con && defined $con->{socket} && defined $con->{socket}->fileno) ? $con->{socket}->fileno : '--'), ": ", shift, "\n"); } # only if verbose! sub vmessage { return unless $debug == 2; my $con = shift; print (((defined $con && defined $con->{socket} && defined $con->{socket}->fileno) ? $con->{socket}->fileno : '--'), ": ", shift, "\n"); } # ------------------------------------------------------------------------------ # add a socket connection to the node sub add_connection { my ($sock_con, $type) = @_; nonblock($sock_con); $select_read->add($sock_con); $select_error->add($sock_con); my $con = $con{$sock_con} = { socket => $sock_con, read_buf => '', write_buf => '' }; set_connection_type($con, $type); message($con, "Accepting a new connection"); return $con; } # ------------------------------------------------------------------------------ # close a connection violently sub abort_connection { my $con = shift; message($con, "Aborted connection"); close_connection($con); } # ------------------------------------------------------------------------------ # actually close the handle, and remove its connection object sub close_connection { my $con = shift; # call the 'CLOSING' handler for this connection $closingh{$con->{type}}->($con) if exists $closingh{$con->{type}}; # cancel any timeout keyed by this connection # - timeouts with other keys must look after themselves! cancel_timeout($con) if exists $timeouts_by_key{$con}; my $handle = $con->{socket}; $select_read->remove($handle); $select_write->remove($handle); $select_error->remove($handle); $handle->close(); delete $con{$handle}; } # ------------------------------------------------------------------------------ # change the type of the connection (e.g. to implement states) sub set_connection_type { my $con = shift; my $type = shift; die "type $type has no handler" unless $readh{$type}; $con->{type} = $type; } # ------------------------------------------------------------------------------ # read some data from a connection sub read_connection { my $con = shift; $con->{read_buf} = '' unless defined $con->{read_buf}; my $bytes_read = $con->{socket}->sysread($con->{read_buf}, $MAX_CHUNK, length $con->{read_buf}); if (not defined $bytes_read) { # read error if ($! == EWOULDBLOCK) { message($con, "Read would block"); } else { message($con, "Error reading from connection: $!"); $errorh{$con->{type}}->($con) if exists $errorh{$con->{type}}; # TODO: is this necessary? } } elsif ($bytes_read == 0) { # connection has closed message($con, "Connection closed"); message($con, " Read buffer was not empty:\n$con->{read_buf}") unless $con->{read_buf} eq ''; message($con, " Write buffer was not empty:\n$con->{write_buf}") unless $con->{write_buf} eq ''; close_connection($con); } return $bytes_read; } # ------------------------------------------------------------------------------ # accept a 'new' connection on a listening socket sub accept_connection { my $con = shift; my $sock_con = $con->{socket}->accept; my $new_con; if ($sock_con) { $new_con = add_connection($sock_con, 'new'); } else { if ($! == EWOULDBLOCK) { message($con, "Accept would block"); } else { message($con, "Error accepting connection: $!"); } } return $new_con; } # ------------------------------------------------------------------------------ # set the read handler, for a named type of connection # the handler is called when a connection of that type is ready to read sub set_read_handler { my ($type, $handler) = @_; $readh{$type} = $handler; } # ------------------------------------------------------------------------------ # connection handler for IMIP messages sub set_connection_handler { my ($type, %handlers) = @_; my $body_length; my $default = delete $handlers{DEFAULT} || sub { my ($con, $msg_type) = @_; message($con, "unexpected message type '$msg_type' - ignored"); }; my ($msg_type, $my_ref, $your_ref, %fields, $body); set_read_handler($type, sub { my $con = shift; # we pass '1' as an additional argument if the handler has been called from another handler # - i.e. when connection type changes - in this case, don't call read_connection again unless (shift) { return unless read_connection($con); } while(1) { unless ( $con->{type} eq $type ) { # if the type of the connection has changed, call another read handler $readh{$con->{type}}->($con, 1); return; } if (defined $body_length) { # we are expecting a body of length $body_length if ($body_length <= length $con->{read_buf}) { # the body has been read # process the message, with a body $body = substr $con->{read_buf}, 0, $body_length, ''; vmessage($con, "BODY: $body"); # note: this code repeats below ++$n_msg_recv; if (exists $handlers{$msg_type}) { $handlers{$msg_type}->($con, $my_ref, $your_ref, {%fields}, \$body); } else { $default->($con, $msg_type); } undef $body_length; } else { return } } else { # we are expecting a header if ($con->{read_buf} =~ s/^(.*?)$MSG_TERM_ACCEPT//so) { my @lines = split /$MSG_EOL_ACCEPT/so, $1; ($msg_type, $my_ref, $your_ref) = split /\s+/, shift @lines; %fields = map { split /:\s*/, $_, 2 } @lines; vmessage($con, "RECEIVED:\n". "$msg_type $my_ref $your_ref\n". (join '', map {"$_\n"} @lines). "\n"); if (not defined ($body_length = delete $fields{'body_length'})) { # process the message, without a body undef $body; # note: this code repeats above ++$n_msg_recv; if (exists $handlers{$msg_type}) { $handlers{$msg_type}->($con, $my_ref, $your_ref, {%fields}); } else { $default->($con, $msg_type); } } } else { return } } } }); $closingh{$type} = delete $handlers{CLOSING} if exists $handlers{CLOSING}; $errorh{$type} = delete $handlers{ERROR} if exists $handlers{ERROR}; } # ------------------------------------------------------------------------------ # get the general stats sub get_node_stats { return ($n_selects, $n_msg_recv, $n_msg_sent); } # ------------------------------------------------------------------------------ # install signal handlers sub install_signal_handlers { # $SIG{'__WARN__'} = sub{ # print STDERR "WARN> @_\n" if $debug; # }; # $SIG{'__DIE__'} = sub { # print STDERR "DIE> @_\n"; # if $debug; # exit 1; # }; # Other Signal handlers $SIG{HUP} = sub { print STDERR "SIGNAL> HUP received and ignored\n" if $debug; }; $SIG{ABRT} = sub { print STDERR "SIGNAL> ABRT received and ignored\n" if $debug; }; $SIG{PIPE} = sub { print STDERR "SIGNAL> PIPE received and ignored\n" if $debug; }; $SIG{INT} = sub { print STDERR "SIGNAL> INT received and closing\n" if $debug; exit 1; }; $SIG{QUIT} = sub { print STDERR "SIGNAL> QUIT received and closing\n" if $debug; exit 1; }; $SIG{TERM} = sub { print STDERR "SIGNAL> TERM received and closing\n" if $debug; exit 1; }; $SIG{USR1} = sub { $debug = not $debug; print STDERR "SIGNAL> USR1 received, debug now ". ($debug ? 'ON' : 'OFF') ."\n"; }; } # ------------------------------------------------------------------------------ # make a file handle non blocking sub nonblock { my $handle = shift; my $flags; $flags = fcntl($handle, F_GETFL, 0) || die "Can't get flags"; fcntl($handle, F_SETFL, $flags | O_NONBLOCK) || die "Can't set flags"; } # ------------------------------------------------------------------------------ # get a list of connections, of a particular type (if specified) sub get_connections { my $type = shift; if (defined $type) { return [grep {$_->{type} eq $type} values %con]; } else { return [values %con]; } } # ------------------------------------------------------------------------------ # check that the times on the local and remote hosts are synchronized, # given a 'hello_*' message with a 'time' field sub time_check { my ($con, $your_ref, $fields) = @_; if (abs(time - $fields->{time}) > $MAX_TIME_DIFF) { # the time difference is too great - reply with an error send_message($con, 'error', $your_ref, '.', code => 1, message => 'clocks are not synchronized'); # reset_timeout($con); return; } return 1; } 1