#!/usr/bin/perl -w

# ==============================================================================
# IMIP HUB - must be catted onto the end of node.pl
# ==============================================================================

use IO::Socket;

require 'node.pl';


# ------------------------------------------------------------------------------
# globals

use vars qw( $debug
	     $con
	     $msg_type
	     $ref
	     %fields
	     $body );


# ------------------------------------------------------------------------------
# constants

my $MAX_TIME_DIFF       = 5; # a time difference of 5 seconds between computers is unacceptable
my $MAX_LISTEN          = 20;
my $MAX_FWD_TIMEOUT     = 30;
my $DEFAULT_FWD_TIMEOUT = $MAX_FWD_TIMEOUT;
my $HELLO_TIMEOUT       = 20;
my $PING_MUX_INTERVAL   = 0; # 20
my $CACHE_SOCKET        = '/usr/mi/var/cache.sock';
my $MSG_EOL_ACCEPT      = '\015?\012';
my $MSG_TERM_ACCEPT     = "$MSG_EOL_ACCEPT\\.$MSG_EOL_ACCEPT";


# ------------------------------------------------------------------------------
# command line arguments

die "syntax: $0 [-d|-D] listen_port" unless @ARGV == 1;
my $hub_port = shift;


# ------------------------------------------------------------------------------
# file my variables

# sequence number for my_refs
my $my_ref_seq = 1;

# indexes of connections
my %con_by_deployment_customer = ();
my %con_by_deployment = ();

# active requests
my %active_requests = ();

# cache
my @cache_requests = ();

# monitoring
my $start_time = time;
my $n_requests = 0;
my $n_responses = 0;
my $n_errors = 0;
my $n_cache_hits = 0;
my $n_obj_cached = 0;

# cache connection
my $con_cache;


# ------------------------------------------------------------------------------
# listening socket
my $sock_listen = IO::Socket::INET->new( Proto     => 'tcp',
					 LocalPort => $hub_port,
					 Listen    => $MAX_LISTEN,
					 Reuse     => 1 )
    or die "cannot open listening socket: $!";


# ------------------------------------------------------------------------------
# connect to the cache
my $sock_cache = IO::Socket::UNIX->new( Peer => $CACHE_SOCKET )
    or die "cannot connect to cache: $!";


# ------------------------------------------------------------------------------
# 'listen' connection handler

set_readh('listen', sub {
    accept_connection();

  { my $con = $con;
    set_timeout($HELLO_TIMEOUT, sub {
	message($con, "timeout: time limit for connection has expired");
	abort_connection($con);
    }); }
});


# ------------------------------------------------------------------------------
# 'new' connection handler

set_conh('new',
    # received a 'hello_mux' message
    hello_mux => sub {
	my ($deployment, $time) = @fields{qw(deployment time)};
	my @customers = split /\s+/, $fields{customers};

	if (abs(time - $time) > $MAX_TIME_DIFF) {
	    # the time difference is too great - reply with an error
	    send_message('error', '<', $ref,
			 code    => 1,
			 message => 'clocks are not synchronized');
	    reset_timeout();
	    return;
	}

	$con->{deployment} = $deployment;
	$con->{customers} = \@customers;

	# index this connection - TODO: are customer_names globally unique, or only within a deployment?
	#                               can more than one server provide access to a single customer?
	push @{ $con_by_deployment{$deployment} }, $con;
	for my $customer (@customers) {
	    $con_by_deployment_customer{$deployment}{$customer} = $con;
	}

	# send a welcoming reply - TODO: could do this earlier?
	send_message('welcome', '<', $ref);
	
	set_connection_type('mux');

	message('connection from mux activated');

	cancel_timeout();
	
        if ($PING_MUX_INTERVAL) {
	    my $con = $con;
	    set_timeout($PING_MUX_INTERVAL, sub {
		message($con, "should 'ping' the mux now!");
		reset_timeout($con);
	    });
	}
    },

    hello_mon => sub {
	send_message('welcome', '<', $ref);
	set_connection_type('mon');

	cancel_timeout();
    },

    # unknown message
    DEFAULT => sub {
	message("unknown message type '". $msg_type ."'");
	abort_connection();
    },

    # if the connection is closed by the peer
    CLOSING => sub {
	message("new connection closed");
    },

    # if there is an error on this connection
    ERROR => sub {
	message("error on new connection");
    },
);


# ------------------------------------------------------------------------------
# 'mon' (monitor) connection handler

set_conh('mon',
    'stats' => sub {
	send_stats($con);
    },

    'auto' => sub {
	my $con = $con;
	send_stats($con);
	set_timeout($con, $fields{period} || 5, sub {
	    send_stats($con);
	    
	    reset_timeout($con);
	});
    },

    'auto_off' => sub {
	cancel_timeout($con);
    }
);


# ------------------------------------------------------------------------------
# send current stats to the monitor

sub send_stats {
    my $con = shift;
    my $up_time = time - $start_time;
    return if $up_time == 0 or $n_requests == 0; # avoid / by 0 error

    my $n_msg_tot = $n_msg_recv + $n_msg_sent;
    my $req_per_sec = $n_requests / $up_time;
    my $sel_per_sec = $n_selects / $up_time;
    my $msg_recv_per_sec = $n_msg_recv / $up_time;
    my $msg_sent_per_sec = $n_msg_sent / $up_time;
    my $msg_tot_per_sec = $n_msg_tot / $up_time;

    send_message( $con, 'stats', '.', '.',
		  up_time => $up_time,
		  n_requests => $n_requests,
		  n_responses => $n_responses,
		  n_errors => $n_errors,
		  n_selects => $n_selects,
		  n_msg_recv => $n_msg_recv,
		  n_msg_sent => $n_msg_sent,
		  n_msg_tot => $n_msg_tot,
		  req_per_sec => $req_per_sec,
		  sel_per_sec => $sel_per_sec,
		  msg_recv_per_sec => $msg_recv_per_sec,
		  msg_sent_per_sec => $msg_sent_per_sec,
		  msg_tot_per_sec => $msg_tot_per_sec,
		  n_obj_cached => $n_obj_cached,
		  n_cache_hits => $n_cache_hits,
		  frac_cache_hits => $n_cache_hits / $n_requests,
		  n_active_reqs => scalar(keys %active_requests),
		  n_active_cache_reqs => scalar(@cache_requests) );
}


# ------------------------------------------------------------------------------
# 'mux' connection handler

set_conh('mux',
    # a request from another node for a remote object
    # we forward it to an appropriate destination (another hub, or remote mux)
    request => sub {
	my ($con, $ref, $fields) = ($con, $ref, {%fields});

	++$n_requests;

	my $type = $fields{type};

	my ($deployment, $customer) = split /\s+/, $fields{identifier};

	# TODO: route simply by deployment when it doesn't matter which box it goes to
	my $server_con = $con_by_deployment_customer{$deployment}{$customer};

	unless ($server_con) {
	    # the combination of deployment and customer is unknown - reply with an error
	    send_message('error', $your_ref, '.',
			 code    => 2,
			 message => "cannot reach customer $deployment:$customer");
	    return;
	}

	# we might need to forward the request...
	my $c = 1;
	my $forward_request = sub {
	    my $fred = "hello world"+$c;
	    my $fwd_my_ref = $my_ref_seq ++;
	    $active_requests{$fwd_my_ref} = { from_con     => $con,
					      req_your_ref => $your_ref,
					      fields       => $fields };

	    # forward the request
	    send_message($server_con, 'request', '.', $fwd_my_ref, %$fields);
	    
	    my $timeout = $fields->{timeout} || $DEFAULT_FWD_TIMEOUT;
	    $timeout = $MAX_FWD_TIMEOUT if $timeout > $MAX_FWD_TIMEOUT;

	    # TODO: this timeout is not strictly necessary - if the network doesn't break!
	    # ALSO - these nested closures break Perl!!
#	    set_timeout("fwd:$fwd_my_ref", $timeout, sub {
		# the response did not arrive in time
#		send_message( $con, 'error', $your_ref, '.',
#			      code    => 3,
#			      message => 'response did not arrive before timeout' );
#		delete $active_requests{$fwd_my_ref};
#		message($con, 'response did not arrive before timeout');
#	    });
	};
#	bless $forward_request;

	# but check the cache first!
	# TODO: timeouts!!

	my $message = <<End;
query $fields->{type} $fields->{identifier} $fields->{requester}
.
End
	send_data($con_cache, $message);
        vmessage("CACHE QUERY:\n$message");

        push @cache_requests, {
	    responseh => sub {
		my $data = shift;
		if ($data) {
		    # the cache gave us the data!
		    # forward the response
		    vmessage("CACHE HIT");
		    ++$n_cache_hits;
		    send_message( $con, 'response', $your_ref, '.',
				  #TODO: cache_hit => 1, ?
				  $data );
		} else {
		    vmessage("CACHE MISS");
                    # the cache was not helpful
		    $forward_request->();
		}
	    }
	};
    },

    # a response to a request
    # we forward it to the node that sent us the request (another hub, or remote mux)
    response => sub {
	++$n_responses;

	my $request = delete $active_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message('received response for expired / spurious request');
	    return;
	}

	# cache the response - do not need to check time here - we can assume it's in the future.
	# cache messages will not cope with binary data
	# there is no response from the cache to a store (we hope)
	if ($fields{cache_until}) {
	    send_data($con_cache, <<End);
store $fields{cache_key} $fields{cache_until}
$body
.
End
            vmessage("CACHED: $fields{cache_key} $fields{cache_until}\n$body");
            ++$n_obj_cached;
	}

	# forward the response
	send_message( $request->{from_con}, 'response', $request->{req_your_ref}, '.',
		      %fields, $body);

	# TODO: this timeout is not strictly necessary - if the network doesn't break!
#	cancel_timeout("fwd:$my_ref");
    },

    # something went wrong, in response to a forwarded request
    # (at the moment, that is the only case when an 'error' is sent)
    error => sub {
	++$n_errors;

	my $request = delete $active_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message('received error for expired / spurious request');
	    return;
	}

	# TODO: try another server?

	# forward the error
	send_message($request->{from_con}, 'error', $request->{req_your_ref}, '.',
		     %fields);

	# TODO: this timeout is not strictly necessary - if the network doesn't break!
#	cancel_timeout("fwd:$my_ref");
    },
);


# ------------------------------------------------------------------------------
# 'cache' connection handler
# this uses a simple \n.\n terminated textual message

set_readh('cache', sub {
    return unless read_connection();
    while ($con->{read_buf} =~ s/^(.*?)$MSG_TERM_ACCEPT//so) {
	vmessage("FROM CACHE:\n$1");
	my ($status, $data) = split /$MSG_EOL_ACCEPT/, $1, 2;
        my $cache_request = shift @cache_requests;
        if ($status ne '000 OK') {
            message("cache error: $status");
            # TODO: what?
            $cache_request->{responseh}->();
        }
#TODO: cancel_timeout();
        $cache_request->{responseh}->($data eq '' ? undef : $data);
    }
});


# ------------------------------------------------------------------------------
# begin work

install_signal_handlers();

add_connection($sock_listen, 'listen');

$con_cache = add_connection($sock_cache, 'cache');

main_loop();
