# ============================================================================== # IMIP COMMUNICATIONS NODE # - foundation of a state-machine networking daemon # ============================================================================== use strict; use IO::Select; use Fcntl; use Errno qw(EWOULDBLOCK); use Getopt::Std; $| = 1; # for debugging # ------------------------------------------------------------------------------ # globals use vars qw( $debug $con $msg_type $my_ref $your_ref %fields $body $n_selects $n_msg_recv $n_msg_sent ); # for monitoring $n_selects = 0; $n_msg_recv = 0; $n_msg_sent = 0; # ------------------------------------------------------------------------------ # constants my $MSG_EOL = "\015\012"; my $MSG_TERM = ".$MSG_EOL"; my $MSG_EOL_ACCEPT = '\015?\012'; my $MSG_TERM_ACCEPT = "$MSG_EOL_ACCEPT\\.$MSG_EOL_ACCEPT"; my $MAX_CHUNK = 62*1024; # ------------------------------------------------------------------------------ # options use vars qw( $opt_d $opt_D ); getopts('dD'); $debug = $opt_D ? 2 : $opt_d ? 1 : 0; # ------------------------------------------------------------------------------ # file my variables # hash of connections by socket (not tied) my %con = (); # connection handlers, by connection type my %readh = (); my %closingh = (); my %errorh = (); # select objects my $select_read = IO::Select->new; my $select_write = IO::Select->new; my $select_error = IO::Select->new; # timeouts my %timeouts_by_key = (); # by unique key my %n_timeouts_by_time = (); # number of timeouts set for a particular time my %timeouts_by_time = (); my $next_timeout_time; my $max_timeout_time = 0; # ------------------------------------------------------------------------------ # set a timeout sub set_timeout { my $key = @_ == 3 ? shift : $con; my ($delay, $sub) = @_; my $time = time + $delay; vmessage("TIMEOUT SET: $key $delay"); my $timeout = { key => $key, sub => $sub, time => $time, delay => $delay }; $timeouts_by_key{$key} = $timeout; push @{$timeouts_by_time{$time}}, $timeout; if ($n_timeouts_by_time{$time} ++ == 0) { if ($time > $max_timeout_time) { $max_timeout_time = $time; } if (not defined $next_timeout_time or $time < $next_timeout_time) { $next_timeout_time = $time; } } } # ------------------------------------------------------------------------------ # cancel a timeout sub cancel_timeout { my $key = @_ ? shift : $con; vmessage("TIMEOUT CANCELLED: $key"); my $timeout = delete $timeouts_by_key{$key}; # debug / safety check unless (defined $timeout) { message("attempted to cancel non-existant timeout $key"); use Data::Dumper; print Dumper $key if ref $key; return; } my $time = delete $timeout->{time}; # might want to look at a more efficient way of doing this - especially if we have any long timeouts! if (-- $n_timeouts_by_time{$time} == 0) { delete $n_timeouts_by_time{$time}; delete $timeouts_by_time{$time}; if ($time == $next_timeout_time) { while(1) { $next_timeout_time++; if ($next_timeout_time > $max_timeout_time) { undef $next_timeout_time; $max_timeout_time = 0; last; } last if $n_timeouts_by_time{$next_timeout_time}; } } if ($time == $max_timeout_time) { do { $max_timeout_time--; } until $n_timeouts_by_time{$max_timeout_time}; } } } # ------------------------------------------------------------------------------ # reset a timeout sub reset_timeout { my $key = @_ ? shift : $con; vmessage("TIMEOUT RESET: $key"); my $timeout = $timeouts_by_key{$key}; # debug / safety check unless (defined $timeout) { message("attempted to cancel non-existant timeout $key"); use Data::Dumper; print Dumper $key if ref $key; return; } cancel_timeout($key); set_timeout($key, $timeout->{delay}, $timeout->{sub}); } # ------------------------------------------------------------------------------ # debugging - how is the timeout system? sub timeout_status { print "next: $next_timeout_time max: $max_timeout_time\n"; if (defined $next_timeout_time) { for my $t ($next_timeout_time .. $max_timeout_time) { if (exists $timeouts_by_time{$t}) { print "$t : $n_timeouts_by_time{$t} : ", join ' ', map {$_->{key}} grep {$_->{time}} @{$timeouts_by_time{$t}}; print "\n"; } } } use Data::Dumper; print "by key: ", (Dumper \%timeouts_by_key), "\n"; print "by time: ", (Dumper \%timeouts_by_time), "\n"; } # ------------------------------------------------------------------------------ # the main loop of the state-machine sub main_loop { while (1) { # check for expired timeouts: my $time = time; while (defined $next_timeout_time and $next_timeout_time <= $time) { # there are expired timeout/s for my $timeout (grep {defined $_->{time}} @{$timeouts_by_time{$next_timeout_time}}) { vmessage("TIMEOUT EXPIRED: $timeout->{key}"); # call the timeout sub my $time_check = $timeout->{time}; $timeout->{sub}->($timeout->{key}); # cancel the timeout - but not if it's already cancelled! cancel_timeout($timeout->{key}) if exists $timeout->{time}; } } # determine how long to wait on this select my $select_timeout = defined $next_timeout_time ? $next_timeout_time - time : undef; vmessage("SELECTING: ". (defined $select_timeout ? "$select_timeout sec" : 'indefinitely')); # select my ($ready_to_read, $ready_to_write, $have_errors) = IO::Select->select($select_read, $select_write, $select_error, $select_timeout); $n_selects++; my $handle; # sockets that are ready to read for $handle (@$ready_to_read) { # call the connection handler for this connection $con = $con{$handle}; $readh{$con->{type}}->(); } # sockets that are ready to write for $handle (@$ready_to_write) { $con = $con{$handle}; if ($con->{write_buf}) { my $bytes_to_write = length $con->{write_buf}; my $bytes_written = $handle->syswrite($con->{write_buf}, $bytes_to_write); if (not defined $bytes_written) { # write error if ($! == EWOULDBLOCK) { message("Write would block"); } else { message("Error writing to connection: $!"); $errorh{$con->{type}}->() if exists $errorh{$con->{type}}; # TODO: is this necessary? } } elsif ($bytes_written == $bytes_to_write) { # wrote all the buffer $select_write->remove($handle); $con->{write_buf} = ''; } else { # wrote part of the buffer (or maybe nothing?) substr $con->{write_buf}, 0, $bytes_written, ''; } } } # sockets that have errors for $handle (@$have_errors) { $con = $con{$handle}; message("Error on socket"); $errorh{$con->{type}}->() if exists $errorh{$con->{type}}; } } } # ------------------------------------------------------------------------------ # buffer data to be sent to a connection sub send_data { my $con = @_ == 2 ? shift : $con; my $message = shift; # die "cannot send an empty message" unless $message ne ''; $select_write->add($con->{socket}) if $con->{write_buf} eq ''; $con->{write_buf} .= $message; } # ------------------------------------------------------------------------------ # send a message to a certain connection # the paramater spec is: [connection] your_ref my_ref [key1 val1]* [body] sub send_message { my $con = ref $_[0] ? shift : $con; my $body = @_ % 2 ? '' : pop; my ($msg_type, $your_ref, $my_ref) = splice @_, 0, 3; my $message = "$msg_type $your_ref $my_ref$MSG_EOL"; $message .= (shift) .": ". (shift) . $MSG_EOL while @_; $message .= 'body_length: '. length($body) . $MSG_EOL if length $body; ++$n_msg_sent; send_data($con, $message . $MSG_TERM . $body); # debug message: chomp (my $debug_msg = "SENT:\n". $message . $MSG_TERM . $body); vmessage($debug_msg); } # ------------------------------------------------------------------------------ # print debugging messages sub message { my $con = @_ == 2 ? shift : $con; print (((defined $con && exists $con->{socket}) ? $con->{socket}->fileno : '--'), ": ", shift, "\n") if $debug; } # only if verbose! sub vmessage { my $con = @_ == 2 ? shift : $con; print (((defined $con && exists $con->{socket}) ? $con->{socket}->fileno : '--'), ": ", shift, "\n") if $debug == 2; } # ------------------------------------------------------------------------------ # add a socket connection to the node sub add_connection { my ($sock_con, $type) = @_; nonblock($sock_con); $select_read->add($sock_con); $select_error->add($sock_con); $con = $con{$sock_con} = { socket => $sock_con, read_buf => '', write_buf => '' }; set_connection_type($con, $type); message("Accepting a new connection"); return $con; } # ------------------------------------------------------------------------------ # close a connection violently sub abort_connection { my $con = shift || $con; message($con, "Aborted connection"); close_connection($con); } # ------------------------------------------------------------------------------ # actually close the handle, and remove its connection object sub close_connection { $con = shift; # call the 'CLOSING' handler for this connection $closingh{$con->{type}}->() if exists $closingh{$con->{type}}; # cancel any timeout keyed by this connection # - timeouts with other keys must look after themselves! cancel_timeout($con) if exists $timeouts_by_key{$con}; my $handle = $con->{socket}; $select_read->remove($handle); $select_write->remove($handle); $select_error->remove($handle); $handle->close(); delete $con{$handle}; } # ------------------------------------------------------------------------------ # change the type of the connection (e.g. to implement states) sub set_connection_type { my $con = @_ == 2 ? shift : $con; my $type = shift; die "type $type has no handler" unless $readh{$type}; $con->{type} = $type; } # ------------------------------------------------------------------------------ # read some data from a connection sub read_connection { $con->{read_buf} = '' unless defined $con->{read_buf}; my $bytes_read = $con->{socket}->sysread($con->{read_buf}, $MAX_CHUNK, length $con->{read_buf}); if (not defined $bytes_read) { # read error if ($! == EWOULDBLOCK) { message("Read would block"); } else { message("Error reading from connection: $!"); $errorh{$con->{type}}->() if exists $errorh{$con->{type}}; # TODO: is this necessary? } } elsif ($bytes_read == 0) { # connection has closed message("Connection closed"); message(" Read buffer was not empty:\n$con->{read_buf}") unless $con->{read_buf} eq ''; message(" Write buffer was not empty:\n$con->{write_buf}") unless $con->{write_buf} eq ''; close_connection($con); } return $bytes_read; } # ------------------------------------------------------------------------------ # accept a 'new' connection on a listening socket sub accept_connection { my $sock_con = $con->{socket}->accept; if ($sock_con) { add_connection($sock_con, 'new'); } else { if ($! == EWOULDBLOCK) { message("Accept would block"); } else { message("Error accepting connection: $!"); } } } # ------------------------------------------------------------------------------ # set the read handler, for a named type of connection # the handler is called when a connection of that type is ready to read sub set_readh { my ($type, $handler) = @_; $readh{$type} = $handler; } # ------------------------------------------------------------------------------ # connection handler for IMIP messages sub set_conh { my ($type, %handlers) = @_; my $body_length; my $default = delete $handlers{DEFAULT} || sub { message("unexpected message type '$msg_type' - ignored"); }; $readh{$type} = sub { # we pass '1' as an argument if the handler has been called from another handler # - i.e. when connection type changes - in this case, don't call read_connection again unless (shift) { return unless read_connection(); } while(1) { unless ( $con->{type} eq $type ) { # if the type of the connection has changed, call another read handler $readh{$con->{type}}->(1); return; } if (defined $body_length) { # we are expecting a body of length $body_length if ($body_length <= length $con->{read_buf}) { # the body has been read # process the message, with a body $body = substr $con->{read_buf}, 0, $body_length, ''; vmessage("BODY: $body"); ++$n_msg_recv; ($handlers{$msg_type} || $default)->($msg_type); undef $body_length; } else { return } } else { # we are expecting a header if ($con->{read_buf} =~ s/^(.*?)$MSG_TERM_ACCEPT//so) { my @lines = split /$MSG_EOL_ACCEPT/so, $1; ($msg_type, $my_ref, $your_ref) = split /\s+/, shift @lines; %fields = map { split /:\s*/, $_, 2 } @lines; vmessage("RECEIVED:\n". "$msg_type $my_ref $your_ref\n". (join '', map {"$_\n"} @lines). "."); if (not defined ($body_length = delete $fields{'body_length'})) { # process the message, without a body undef $body; ++$n_msg_recv; ($handlers{$msg_type} || $default)->(); } } else { return } } } }; $closingh{$type} = delete $handlers{CLOSING} if exists $handlers{CLOSING}; $errorh{$type} = delete $handlers{ERROR} if exists $handlers{ERROR}; } # ------------------------------------------------------------------------------ # install signal handlers sub install_signal_handlers { $SIG{'__WARN__'} = sub { print "WARN> @_\n"; }; $SIG{'__DIE__'} = sub { print "DIE> @_\n"; exit 1; }; } # ------------------------------------------------------------------------------ # make a file handle non blocking sub nonblock { my $socket = shift; my $flags; $flags = fcntl($socket, F_GETFL, 0) || die "Can't get flags"; fcntl($socket, F_SETFL, $flags | O_NONBLOCK) || die "Can't set flags"; } 1