#!/usr/bin/perl -w

# **TODO**
#
# Send message back to corr saying it wont send data so things can be cleaned up
# cope (somehow) with losing connection to control.pl and corr.pl
# Fork when sending data?

# Flexbuf steps
# 2024  vbs_fs -I 'v*' /mnt/vbsRFTC/
# 2025  ls /mnt/vbsRFTC/
# 2026  fusermount  -u /mnt/vbsRFTC 

use strict;

use IO::Socket;
use IO::Select;
use Getopt::Long;
use File::Basename;
use POSIX qw (floor);
use Astro::Time;

use constant RECONNECT_TIMEOUT => 5;

use RtFC;

sub process_control_command ($);
sub Shutdown();
sub open_connections ($);
sub close_connection($$);
sub chans2mode($$);

$SIG{INT} = \&Shutdown;

system("renice 19 $$\n");

my $control_host = defined($ENV{RTFC_CONTROL})
                          ? $ENV{RTFC_CONTROL}: 'localhost';
my $corr_host = defined($ENV{RTFC_CORRELATOR})
                          ? $ENV{RTFC_CORRELATOR}: 'localhost';

my $antenna_name = defined($ENV{RTFC_ANTNAME})
                          ? $ENV{RTFC_ANTNAME}: 'Test Antenna';
my $antenna_id = defined($ENV{RTFC_ANTID}) 
                          ? $ENV{RTFC_ANTID}: 'Tt';

my $flexMount = defined($ENV{RTFC_FLEXMOUNT}) ? $ENV{RTFC_FLEXMOUNT}: '/mnt/vbsRTFC';

my $defaultpath = '.';
my $swapped = 0;
my $invert = 0;
my $force_mode = undef;
my $prepack = undef;
my $nofilter = 0;
my $flexbuf = undef;
my $stream = undef;
my $thread = undef;

my %filecache = ();

GetOptions('control=s'=>\$control_host, 'correlator=s'=>\$corr_host,
	   'antname=s'=>\$antenna_name, 'antid=s'=>\$antenna_id,
	   'default=s'=>\$defaultpath, 'path=s'=>\$defaultpath,
	   'swapped'=>\$swapped,  'invert'=>\$invert, 
	   'force=s'=>\$force_mode, 'prepack=s'=>\$prepack,
	   'nofilter'=>\$nofilter, 'flexbuf=s'=>\$flexbuf,
	   'flexmount=s'=>\$flexMount, 'stream=s'=>\$stream, 
	   'thread=i'=>\$thread);

if (defined $force_mode) {
  $force_mode =~ s/O/o/g;
  $force_mode =~ s/0/o/g;
  $force_mode =~ s/X/x/g;
}

my $prepack_chandiv = 2;  # May change if other prepack modes used
if (defined $prepack) {
  $prepack =~ s/0/o/g;
  $prepack =~ s/O/o/g;
  $prepack =~ s/X/x/g;
  die "Unknow prepack mode $prepack\n" if !($prepack eq 'ox' || $prepack eq 'xo' || $prepack eq 'ooxx' || $prepack eq 'xxoo');
}

if (defined $flexbuf) {
  $flexbuf .= '*';
  $flexbuf .= $stream if (defined $stream);
  $flexbuf .= "_".$thread if (defined $thread);
}

my $sock_control;
my $sock_corr;
my $sock_corrdata;

my $sel = new IO::Select();

open_connections($sel);

my (@ready, $buf);

# Set the xterm titlebar
print "\033]0;RtFC $antenna_name\007";

if (defined $flexbuf) {
  my $status = system "fusermount  -u $flexMount";
  if ($?==-1) {
    # Failed to start
    die "Failed to run fusermount\n";
  }
  warn "Overriding \$defaultpath\n" if ($defaultpath ne '.');
  $defaultpath = $flexMount;
}

while (1) {

  my $timeout = undef;
  if (!defined $sock_control || !defined $sock_corr 
      || !defined $sock_corrdata) {
    print "Trying to reconnect\n";
    open_connections($sel);

    if (!defined $sock_control || !defined $sock_corr 
	|| !defined $sock_corrdata) {
      $timeout = RECONNECT_TIMEOUT;
    }
  }

  while(@ready = $sel->can_read($timeout)) {
    foreach my $fh (@ready) {
      my $handled = 0;
      if($fh == $sock_control) {
	# Must have data from the control process...
	$handled = 1;
	
	my $command = recv_command($sock_control,$sel,'control');

	if (ref $command) {
	  process_control_command($command);
	} else {
	  next if (! defined $command); # There was a read error
	  if ($command==-1) { # Socket was closed remotely
	    #close_connection($sock_control, $sel);
	    warn "Remote control socket closed\n";
	  } else {
	    warn "Unknown error ($command) on correlator connection\n";
	  }
	}
      } elsif ($fh == $sock_corr) {
	# Must have data from the correlator process...
	$handled = 1;

	my $command = recv_command($sock_corr,$sel,'correlator');

	if (ref $command) {
	    if ($command->type==PING) {
	      #print "Got PING\n";
	    } else {
	      printf "Cannot handle command %s from correlator\n", $command->name;
	    }
	} else {
	  next if (! defined $command); # There was a read error
	  if ($command==-1) { # Socket was closed remotely
	    #warn "Remote correlator socket closed\n";
	    #close_connection($sock_corr, $sel);
	  } else {
	    warn "Unknown error ($command) on correlator connection\n";
	  }
	}
      } elsif ($fh == $sock_corrdata) {
	# Must have data from the control process...
	$handled = 1;

	my $command = recv_command($sock_corrdata,$sel,'correlator data');

	print "sock_corrdata = $sock_corrdata\n";
	if (ref $command) {
	  printf "Cannot handle command %s from correlator data port\n", 
	    $command->name;
	} else {
	  next if (! defined $command); # There was a read error
	  if ($command==-1) { # Socket was closed remotely
	    warn "Remote correlator data socket closed\n";
	    #close_connection($sock_corrdata, $sel);
	  } else {
	    warn "Unknown error ($command) on correlator data connection\n";
	  }
	}
      } else {
	warn "Data found on unhandled socket\n";
	Shutdown();
      }

      if (!$handled) {
	warn "Did not handle message from socket $fh\n";
      }
    }
  }
  
  if (!defined $sock_control && !defined $sock_corr 
      && !defined $sock_corrdata) {
    print "Sleeping\n";
    sleep(RECONNECT_TIMEOUT);
  }
}

sub process_control_command ($) {
  my $command = shift;

  if ($command->type==OBSGRAB_COMMAND) {
    my ($day, $month, $year, $ut) = mjd2cal($command->mjd);

    printf("Grab requested for %02d/%02d/%04d %s %.1f\n",
	   $day, $month, $year, turn2str($ut, 'H', 0),
	   $command->integration);

    grab_data($command);
  } elsif ($command->type==PING) {
      #print "Got PING\n";
  } else {
    printf STDERR "Unknown command (%d) received\n", $command->name;
  }
}

sub Shutdown () {
  $sock_control->shutdown(2) if (defined $sock_control);
  $sock_corr->shutdown(2) if (defined $sock_corr);
  $sock_corrdata->shutdown(2) if (defined $sock_corrdata);
  exit;
}

sub grab_data {
  my $grabcom = shift;

  my $mjd = $grabcom->mjd;
  my $mode = $grabcom->mode;

  if ($grabcom->secoffset!=0) {
    print "Using a clock offset of ", $grabcom->secoffset, " seconds\n";
  }

  my $closest_mjd = 0;
  my $closest_file = undef;

  printf "Looking for mjd $mjd\n";
  # Get a list of files in this directory
  my $status;
  my $search_start = time;

  if (defined $flexbuf) {
    system "fusermount  -u $flexMount";
    if ($?==-1) {
      # Failed to start
      die "Failed to run fusermount\n";
      return;
    }
    warn "Run: vbs_fs -n 4 -I '$flexbuf*' $flexMount\n";
    system "vbs_fs -n 4 -I '$flexbuf*' $flexMount";
    if ($?==-1) {
      # Failed to start
      warn "Failed to run vbs_fs\n";
      return;
    }
  }

  opendir(DIR, $defaultpath) || die "Could not list contents of data dir $defaultpath\n";
  while (my $datafile = readdir(DIR)) {
    next if $datafile eq '..';
    next if $datafile eq '.';

    printf("Trying $datafile\n");
    next if (! -f "$defaultpath/$datafile");

    my $filemjd;
    if ($mode eq 'LBA') {
      if ($nofilter || $datafile =~ /^.+\.lba$/) {

	# Get the time code
	if (exists $filecache{$datafile}) { # Cached?
	  $filemjd = $filecache{$datafile};
	} else {
	  open(DATA, "$defaultpath/$datafile") 
	    || die "Could not open $datafile  for reading: $!\n";
	  my $time = <DATA>;
	  chomp $time;
	  close(DATA) || die "Error with file $datafile: $!\n";
	  if ($time =~ /^TIME (\d\d\d\d)(\d\d)(\d\d):(\d\d)(\d\d)(\d\d)$/) {
	    my $year = $1;
	    my $month = $2;
	    my $day = $3;
	    my $hour = $4;
	    my $min = $5;
	    my $sec = $6;
	
	    $filemjd = cal2mjd($day, $month, $year,
			       (($sec/60+$min)/60 + $hour)/24);
	    $filecache{$datafile} = $filemjd;
	  } else {
	    warn "Unexpected time string $time\n";
	    $filemjd = 0.0;
	  }
	}
      
      } else {
	print "Ignoring $datafile\n";
	next;
      }
    } else { # Mark5/VDIF
      my $data_fullpath = "$defaultpath/$datafile";
      
      if (($nofilter || $data_fullpath =~ /^.+\.m5a$/ || $data_fullpath =~ /^.+\.m5b$/ || $data_fullpath =~ /^.+\.vdif$/) && !($data_fullpath =~ /slice/)) {
	# Get the time code

	if (exists $filecache{$datafile}) {
	  $filemjd = $filecache{$datafile};
	} else {
	  my $cmd = "m5time $data_fullpath $mode";
	  print "$cmd\n";
	  my $mjdstr = `$cmd`;
	  if ($mjdstr =~ /MJD\s*=\s*(\d+)\/(\d+):(\d+):(\d+\.\d+)/) {
	    $filemjd = $1 + (($4/60.0+$3)/60.0+$2)/24.0;
	  } else {
	    warn "Could not get Mark5 time for \"$datafile\": $mjdstr\n";
	    next;
	  }
	  $filecache{$datafile} = $filemjd;

	}
      } else {
	print "Ignoring $datafile\n";
	next;
      }
    }

    $filemjd += $grabcom->secoffset/(60*60*24);

    if ($filemjd>$closest_mjd && $filemjd<=$mjd+0.1/60/60/24) {
      $closest_mjd = $filemjd;
      $closest_file =  $datafile;
    }
  }

  closedir(DIR);

  my $search_finish = time;
      
  printf("Data search took %.1f minutes\n",
	     ($search_finish-$search_start)/60);

  if (!defined $closest_file) {
    warn "Could not find any suitable data\n";
  } else {
    print "Closest file is $closest_file\n";
    my $offset;
    if ($mode eq 'LBA') {
      $offset = floor (($mjd-$closest_mjd) * 24 * 60 *60 +0.5 );
    } else {
      $offset = floor (($mjd-$closest_mjd) * 24 * 60 *60 *1000)/1000; # Nearest msec
    }
    printf " Offset of $offset seconds\n";

    my $inputfile;
    if ($defaultpath eq ".") {
	$inputfile = $closest_file;
    } else {
	$inputfile = "$defaultpath/$closest_file";
    }

    my $integration = $grabcom->integration;
    my $chans = $grabcom->channels;

    my ($finalfile, $origchans);
    if ($mode eq 'LBA') {
      my $swapopt = '';
      if ($swapped) {
	$swapopt = '-swapped ';
      }
      my $tempfile;
      if ($invert) {
	$tempfile =  "VSIPACK-$antenna_id.tmp-preinvert";
	$finalfile = "VSIPACK-$antenna_id.tmp";
      } else {
	$tempfile = "VSIPACK-$antenna_id.tmp";
	$finalfile = $tempfile;
      }
      
      # Read header of file found
      my %header = readheaders($inputfile);
      
      die "Did not find NCHAN header item" if (!exists $header{NCHAN});
      my $nchan = $header{NCHAN};
      
      if (defined $prepack) {
	my $system = "vsipack -offset $offset ".
	  "-int $integration -mode $prepack ".
	    "-outfile PRE-${tempfile} '$inputfile'";
	print "Running: $system\n";
	$status = system $system;
	if ($status==-1) {
	  # Failed to start
	  exit(0);
	} elsif (!$status) {
	  warn "Not transfering data\n";
	  return;
	}
	
	$inputfile = "PRE-${tempfile}";
	$offset = 0;
	$nchan /= $prepack_chandiv;
      }

      my ($mode, $prepack2);
      if (defined $force_mode) {
	$mode = $force_mode;
	# What should chans be set to?
      } else {
	
	# Check not too many channels requested for what we have
	if ($chans>=2**$nchan) {
	  my $newchan = $chans & (2**$nchan-1);
	  my @ignorechan = mask2chan($chans-$newchan);
	  warn "Too many channels passed. Ignoring @ignorechan\n";
	  $chans = $newchan;
	  if ($chans==0) {
	    warn "No channels left! Not transfering anything\n";
	    return;
	  }
	}
	
	# Expand bitmask to be 8 bits for nchan<4
	if ($nchan==1) { 
	  # if we got here this must be the channel we want
	  $chans=0;
	} elsif ($nchan==2) {
	  $origchans = $chans;
	  $chans |= $chans<<2;
	}
	
	
	($mode, $prepack2) = chans2mode($chans, $nchan);
	return if (! defined $mode);
	
	if (defined $prepack2) {
	  my $system = "vsipack -offset $offset ".
	    "-int $integration -mode $prepack2 ".
	      "-outfile PRE2-${tempfile} '$inputfile'";
	  print "Running: $system\n";
	  $status = system $system;
	  if ($status==-1) {
	    # Failed to start
	    exit(0);
	  } elsif (!$status) {
	    warn "Not transfering data\n";
	    return;
	  }
	  $inputfile = "PRE2-${tempfile}";
	  $offset = 0;
	}
      }
      
      my $system = "vsipack -offset $offset ".
	"-int $integration -mode $mode $swapopt".
	  "-outfile $tempfile '$inputfile'";
      
      print "Running: $system\n";
      $status = system $system;
      if ($status==-1) {
	# Failed to start
	exit(0);
      } elsif (!$status) {
	warn "Not transfering data\n";
	return;
      }
      
      if ($invert) {
	$system = "band_invert -at $tempfile $finalfile";
	
	print "Running: $system\n";
	$status = system $system;
	if ($status==-1) {
	  # Failed to start
	  exit(0);
	} elsif (!$status) {
	  warn "Not transfering data\n";
	  return;
	}
      }
      print "Done!\n";
    } else { # Mark5/VDIF
      my $system = "m5slice $inputfile $mode $offset $integration";
      print "Running: $system\n";
      $status = system $system;
      if ($status==-1) {
	# Failed to start
	exit(0);
      } elsif ($status) {
	warn "Not transfering data\n";
	return;
      }

      $inputfile = basename($inputfile);
      if ($inputfile =~ /^(.*)\.([^\.]*)$/) {
	$finalfile = "$1-slice.$2";
      } else {
	$finalfile = $inputfile . "-slice";
      }
    }

    if (!defined $sock_corrdata) {
      warn "Correlator data socket not open. Cannot send data\n";
      return;
    }
    
    # Update time if we asked for a time offset
    if ($mode eq 'LBA' && $grabcom->secoffset!=0) {
      my ($day, $month, $year, $ut) = mjd2cal($mjd);
      my ($sign, $hour, $minute, $second) = time2hms($ut, 'H', 0);
      my $mjdtime = sprintf('%04d%02d%02d:%02d%02d%02d', $year, $month, $day, $hour, $minute, $second);
      my $system = "vsib_settime $finalfile $mjdtime";

      print "Running: $system\n";
      $status = system $system;
      if ($status==-1) {
	# Failed to start
	exit(0);
      } elsif ($status) {
	warn "Not transfering data ($status)\n";
	return;
      }
    }

    # Check the size of the file
    my @stat = stat($finalfile);
    if (@stat) {
      my $size = $stat[7];

      $origchans = $chans if (!defined $origchans);
      my $grabcom = new RtFC::Command::Transfer(DATA_TRANSFER, $grabcom->code,
						$mjd, $origchans, $size);

      $status = $grabcom->send($sock_corrdata, 'correlator data');
      Shutdown() if (!defined($status));

      # Read file in chunks and send to correlator
      open(VSIDATA, $finalfile) || die "Could not open $finalfile\n";
      my $transfer_start = time;

      my $buf;
      my $bytestogo = $size;
      my $bytestoread = 1e6;
      while ($bytestogo>0) {
	$bytestoread = $bytestogo if ($bytestogo<$bytestoread);
	$status = sysread VSIDATA, $buf, $bytestoread;
	$bytestogo -= $status;
	if ($status==0) {
	  die "Error reading local file ($bytestoread)\n";
	}

	my $sent = $sock_corrdata->send($buf);
	if (!defined $sent) {
	  die "Error sending data to correlator\n";
	} elsif ($sent!=length($buf)) {
	  die "Only sent $sent bytes!!\n";
	}
      }
      my $transfer_close = time;
      
      printf("Data transfer took %.1f minutes\n",
	     ($transfer_close-$transfer_start)/60);
      close(VSIDATA);
    } else {
      warn "Failed to stat $finalfile - something must have gone wrong\n";
    }
  }
}

sub open_connections ($) {
  my $sel = shift;

  my $command_name = new RtFC::Command::Antenna_Name($antenna_name);
  my $command_id = new RtFC::Command::Antenna_ID($antenna_id);

  # Connect to the central control process

  my $status;
  if (!defined $sock_control) {
    warn "Try and open sock_control\n";
    # How messy is your drawer?
    $sock_control = IO::Socket::INET->new(PeerAddr => $control_host,
					  PeerPort => OBSPORT,
					  ReuseAddr => 1);

    if (defined $sock_control) {
      $status = $command_name->send($sock_control, 'control process');
      Shutdown() if (! defined $status);

      $status = $command_id->send($sock_control, 'control process');
      Shutdown() if (! defined $status);

      $sel->add($sock_control);
    }

  }

  #|| die "Couldn't create socket connection to control host ".
  #  "$control_host: $@\n";

  # Connect to the correlator process

  if (!defined $sock_corr) {
    warn "Try and open sock_corr\n";

    $sock_corr = IO::Socket::INET->new(PeerAddr  => $corr_host,
				       PeerPort  => CORRPORT,
				       ReuseAddr => 1
				      );
    if (defined $sock_corr) {
      $status = $command_name->send($sock_corr, 'correlator process');
      Shutdown() if (! defined $status);

      $status = $command_id->send($sock_corr, 'correlator process');
      Shutdown() if (! defined $status);

      $sel->add($sock_corr);
    }

    #|| die "Couldn't create socket connection to correlator host ".
    #"$corr_host: $@\n";
  }

  if (defined $sock_corr && ! defined $sock_corrdata) {
    # Connect to the correlator process again, this time for the 
    # data pipe

    warn "Try and open sock_control 2\n";

    $sock_corrdata = IO::Socket::INET->new(PeerAddr  => $corr_host,
					   PeerPort  => CORRPORT,
					   ReuseAddr => 1
					   );
    if (defined $sock_corrdata) {
      $status = $command_name->send($sock_corrdata, 'correlator process');
      Shutdown() if (! defined $status);

      $status = $command_id->send($sock_corrdata, 'correlator process');
      Shutdown() if (! defined $status);

      # Set the TCP window size

      $sock_corrdata->setsockopt(SOL_SOCKET, SO_SNDBUF, 128000);

      # Redefine this second correlator pipe as a data pipe, so the remote
      # process knows tospawn off and treat the second connection
      # differently

      my $command_redef = new RtFC::Command::Redefine(OBSDATA);
      $status = $command_redef->send($sock_corrdata, 'control process');

      #|| die "Couldn't create socket connection to $corr_host: $@\n";

      $sel->add($sock_corr);
    }
  }
}


sub close_connection($$) {
# Close a network connection
  my ($sock, $sel) = @_;

  warn "Closing socket $sock\n";
  $sel->remove($sock);

  $sock->shutdown(2);

  $_[1] = undef;

}

sub chans2mode($$) {
  my ($chans, $nchan) = @_;
  # Convert bitmask to mode option using simple if/then/else as
  # we have to check limited options anyhow. Do 16 bit separately

  my $mode = undef;
  my $prepack = undef;

  if ($nchan==8) {
    if ($chans==0x0) {
      $mode = 'xxxx';
    } elsif ($chans==0x01) {
      $prepack = 'ox';
      $mode = 'ooox';
    } elsif ($chans==0x02) {
      $prepack = 'ox';
      $mode = 'ooxo';
    } elsif ($chans==0x03) {
      $prepack = 'ox';
      $mode = 'ooxx';
    } elsif ($chans==0x04) {
      $prepack = 'ox';
      $mode = 'oxoo';
    } elsif ($chans==0x05) {
      $prepack = 'ox';
      $mode = 'oxox';
    } elsif ($chans==0x08) {
      $prepack = 'ox';
      $mode = 'xooo';
    } elsif ($chans==0x0A) {
      $prepack = 'ox';
      $mode = 'xoxo';
    } elsif ($chans==0x0C) {
      $prepack = 'ox';
      $mode = 'xxoo';
    } elsif ($chans==0x0F) {
      $prepack = 'ox';
      $mode = 'xxxx';
    } elsif ($chans==0x10) {
      $prepack = 'xo';
      $mode = 'ooox';
    } elsif ($chans==0x11) {
      $mode = 'ooox';
    } elsif ($chans==0x20) {
      $prepack = 'xo';
      $mode = 'ooxo';
    } elsif ($chans==0x22) {
      $mode = 'ooxo';
    } elsif ($chans==0x30) {
      $prepack = 'xo';
      $mode = 'ooxx';
    } elsif ($chans==0x33) {
      $mode = 'ooxx';
    } elsif ($chans==0x40) {
      $prepack = 'xo';
      $mode = 'oxoo';
    } elsif ($chans==0x44) {
      $mode = 'oxoo';
    } elsif ($chans==0x50) {
      $prepack = 'xo';
      $mode = 'oxox';
    } elsif ($chans==0x55) {
      $mode = 'oxox';
    } elsif ($chans==0x80) {
      $prepack = 'xo';
      $mode = 'xooo';
    } elsif ($chans==0x88) {
      $mode = 'xooo';
    } elsif ($chans==0xA0) {
      $prepack = 'xo';
      $mode = 'xoxo';
    } elsif ($chans==0xAA) {
      $mode = 'xoxo';
    } elsif ($chans==0xC0) {
      $prepack = 'xo';
      $mode = 'xxoo';
    } elsif ($chans==0xCC) {
      $mode = 'xxoo';
    } elsif ($chans==0xF0) {
      $prepack = 'xo';
      $mode = 'xxxx';
    } elsif ($chans==0xFF) {
      $mode = 'xxxx';
    } else {
      my @chans = mask2chan($chans);
      warn "Invalid channel selection @chans\n  Not transfering data\n";
    }
  } else { # Can treat as 4 channels
    if ($chans==0) {
      $mode = 'xxxx';
    } elsif ($chans==1) {
      $mode = 'ooox';
    } elsif ($chans==2) {
      $mode = 'ooxo';
    } elsif ($chans==3) {
      $mode = 'ooxx';
    } elsif ($chans==4) {
      $mode = 'oxoo';
    } elsif ($chans==5) {
      $mode = 'oxox';
    } elsif ($chans==8) {
      $mode = 'xooo';
    } elsif ($chans==10) {
      $mode = 'xoxo';
    } elsif ($chans==12) {
      $mode = 'xxoo';
    } elsif ($chans==15) {
      $mode = 'xxxx';
    } else {
      # Can never get here from 4 bit mode
      my @chans = mask2chan($chans);
      warn "Invalid channel selection @chans\n  Not transfering data\n";
      return;
    }
  }
 
  my @chans = mask2chan($chans);
  print "**CHANS2MODE**  Got chan selection @chans\n";
  print"                 Using \$mode = $mode\n";
  print"                 Using \$prepack = $prepack\n" if defined $prepack;
  return ($mode, $prepack);
}
