#/usr/bin/perl -w

# ==============================================================================
# IMIP HUB - must be catted onto the end of node.pl
# ==============================================================================

use IO::Socket;

require 'node.pl';


# ------------------------------------------------------------------------------
# globals

use vars qw( $debug
	     $con
	     $msg_type
	     $my_ref
	     $your_ref
	     %fields
	     $body );

$debug = 1;


# ------------------------------------------------------------------------------
# constants

my $MAX_TIME_DIFF       = 5; # a time difference of 5 seconds between computers is unacceptable
my $MAX_LISTEN          = 20;
my $MAX_FWD_TIMEOUT     = 30;
my $DEFAULT_FWD_TIMEOUT = $MAX_FWD_TIMEOUT;
my $HELLO_TIMEOUT       = 20;
my $PING_MUX_INTERVAL   = 0; # 20


# ------------------------------------------------------------------------------
# command line arguments

die "syntax: $0 listen_port" unless @ARGV == 1;
my $hub_port = shift;


# ------------------------------------------------------------------------------
# file my variables

# sequence number for my_refs
my $my_ref_seq = 1;

# indexes of connections
my %con_by_deployment_customer = ();
my %con_by_deployment = ();

# active requests
my %active_requests = ();

# listening socket
my $sock_listen = IO::Socket::INET->new( Proto     => 'tcp',
					 LocalPort => $hub_port,
					 Listen    => $MAX_LISTEN,
					 Reuse     => 1 )
    or die "cannot open listening socket: $!";

# monitoring
my $start_time = time;
my $n_requests = 0;
my $n_responses = 0;
my $n_errors = 0;


# ------------------------------------------------------------------------------
# 'listen' connection handler

set_readh('listen', sub {
    accept_connection();

  { my $con = $con;
    set_timeout($HELLO_TIMEOUT, sub {
	message($con, "timeout: time limit for connection has expired");
	abort_connection($con);
    }); }
});


# ------------------------------------------------------------------------------
# 'new' connection handler

set_conh('new',
    # received a 'hello_mux' message
    hello_mux => sub {
	my ($deployment, $time) = @fields{qw(deployment time)};
	my @customers = split /\s+/, $fields{customers};

	if (abs(time - $time) > $MAX_TIME_DIFF) {
	    # the time difference is too great - reply with an error
	    send_message('error', $your_ref, '.',
			 code    => 1,
			 message => 'clocks are not synchronized');
	    reset_timeout();
	    return;
	}

	$con->{deployment} = $deployment;
	$con->{customers} = \@customers;

	# index this connection - TODO: are customer_names globally unique, or only within a deployment?
	#                               can more than one server provide access to a single customer?
	push @{ $con_by_deployment{$deployment} }, $con;
	for my $customer (@customers) {
	    $con_by_deployment_customer{$deployment}{$customer} = $con;
	}

	# send a welcoming reply - TODO: could do this earlier?
	send_message('welcome', $your_ref, '.');
	
	set_connection_type('mux');

	message('connection from mux activated');

	cancel_timeout();
	
        if ($PING_MUX_INTERVAL) {
	    my $con = $con;
	    set_timeout($PING_MUX_INTERVAL, sub {
		message($con, "should 'ping' the mux now!");
		reset_timeout($con);
	    });
	}
    },

    hello_mon => sub {
	send_message('welcome', $your_ref, '.');
	set_connection_type('mon');

	cancel_timeout();
    },

    # unknown message
    DEFAULT => sub {
	message("unknown message type '". $msg_type ."'");
	abort_connection();
    },

    # if the connection is closed by the peer
    CLOSING => sub {
	message("new connection closed");
    },

    # if there is an error on this connection
    ERROR => sub {
	message("error on new connection");
    },
);


set_conh('mon',
    'stats' => sub {
	send_stats($con);
    },

    'auto' => sub {
	my $con = $con;
	send_stats($con);
	set_timeout($con, $fields{period} || 5, sub {
	    send_stats($con);
	    
	    reset_timeout($con);
	});
    },

    'auto_off' => sub {
	cancel_timeout($con);
    }
);

sub send_stats {
    my $con = shift;
    my $up_time = time - $start_time;
    my $req_per_sec = $n_requests / $up_time;
    send_message( $con, 'stats', '.', '.',
		  up_time => $up_time,
		  n_requests => $n_requests,
		  n_responses => $n_responses,
		  n_errors => $n_errors,
		  req_per_sec => $req_per_sec );
}
    
# ------------------------------------------------------------------------------
# 'mux' connection handler

set_conh('mux',
    # a request from another node for a remote object
    # we forward it to an appropriate destination (another hub, or remote mux)
    request => sub {
	++$n_requests;

	my $type = $fields{type};

	my ($deployment, $customer) = split /\s+/, $fields{identifier};

	# TODO: route simply by deployment when it doesn't matter which box it goes to
	my $server_con = $con_by_deployment_customer{$deployment}{$customer};

	unless ($server_con) {
	    # the combination of deployment and customer is unknown - reply with an error
	    send_message('error', $your_ref, '.',
			 code    => 2,
			 message => "cannot reach customer $deployment:$customer");
	    return;
	}

	my $fwd_my_ref = $my_ref_seq ++;
	$active_requests{$fwd_my_ref} = { from_con     => $con,
					  req_your_ref => $your_ref,
					  fields       => {%fields} };

	# TODO: check the cache

	# forward the request
	send_message($server_con, 'request', '.', $fwd_my_ref, %fields);

	my $timeout = $fields{timeout} || $DEFAULT_FWD_TIMEOUT;
	$timeout = $MAX_FWD_TIMEOUT if $timeout > $MAX_FWD_TIMEOUT;

      { my ($con, $your_ref) = ($con, $your_ref);
	set_timeout("fwd:$fwd_my_ref", $timeout, sub {
	    # the response did not arrive in time
	    send_message($con, 'error', $your_ref, '.',
			 code    => 3,
			 message => 'response did not arrive before timeout');
	    delete $active_requests{$fwd_my_ref};
	    message($con, 'response did not arrive before timeout');
	}); }
    },

    # a response to a request
    # we forward it to the node that sent us the request (another hub, or remote mux)
    response => sub {
	++$n_responses;

	my $request = delete $active_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message('received response for expired / spurious request');
	    return;
	}

	my $cache_until = $fields{cache_until};

	# TODO: cache the response

	# forward the response
	send_message( $request->{from_con}, 'response', $request->{req_your_ref}, '.',
		      %fields, $body);

	cancel_timeout("fwd:$my_ref");
    },

    # something went wrong, in response to a forwarded request
    # (at the moment, that is the only case when an 'error' is sent)
    error => sub {
	++$n_errors;

	my $request = delete $active_requests{$my_ref};
	unless ($request) {
	    # drop expired / spurious responses
	    message('received error for expired / spurious request');
	    return;
	}

	# TODO: try another server?

	# forward the error
	send_message($request->{from_con}, 'error', $request->{req_your_ref}, '.',
		     %fields);

	cancel_timeout("fwd:$my_ref");
    },
);


# ------------------------------------------------------------------------------
# begin work

install_signal_handlers();

add_connection($sock_listen, 'listen');

main_loop();
