Author Topic: [Java] Handling Multiple Socket Assynchronously (improved)  (Read 4040 times)

0 Members and 1 Guest are viewing this topic.

Offline iago

  • Leader
  • Administrator
  • Hero Member
  • *****
  • Posts: 17914
  • Fnord.
    • View Profile
    • SkullSecurity
[Java] Handling Multiple Socket Assynchronously (improved)
« on: January 22, 2006, 05:53:03 pm »
nonblocking2.tgz

I've made a lot of improvements to my code.  The most noteworthy one, and the reason I re-did it in the first place, is that it now does connect() assynchronously, so it returns from connect() immediately.  I have absolutely no idea why it wasn't working last time I tried, I somehow missed a really obvious function.. ohwell.  I also added timeouts to connecting, and cleaned up normal timeouts.  The select() call now times out every 1000ms (give or take), at which point each socket is polled.  If the socket's timeout time has elapsed, it runs a timeout function. 

But besides that, I re-did a bunch of code, cleaned it up and such. 

Here is the output of my tests:
Quote
iago@slayer:~/tmp/nonblocking2$ javac util/socket_manager/SocketManager.java 
iago@slayer:~/tmp/nonblocking2$ java util.socket_manager.SocketManager
Socket is connectable: JavaOp2
JavaOp2: connect failed (java.net.ConnectException: Connection refused)
Socket is connectable: JavaOp
JavaOp: connected
JavaOp: received 1310 bytes
JavaOp: received 1310 bytes
JavaOp: received 632 bytes
JavaOp: disconnected
Socket is connectable: Google
Google: connected
Google: received 552 bytes
Google: disconnected
Google4: connect failed (Timed out (no response))
Google3: connect failed (Timed out (no response))
Google2: connect failed (Timed out (no response))
Google5: connect failed (Timed out (no response))
It's doing exactly what it's supposed to be doing, JavaOp2 was a bad port, and Google2 - Google4 were filtered ports. 

Here is the code of the main function, as well as the main driver:
Quote
package util.socket_manager;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;

/** This is a fairly generic class to manage multiple outgoing connections.  The connections are identified
 * by a string, so this is somewhat like a hashtable.
 *
 * This is NOT thread-safe.
 *   
 * @author iago
 */

public class SocketManager extends Thread
{
   /** The default receive timeout length, in milliseconds */
   public final static long DEFAULT_RECEIVE_TIMEOUT = 30000;
   /** The default connect timeout length, in milliseconds */
   public final static long DEFAULT_CONNECT_TIMEOUT = 5000;
   /** The timeout of the selector */
   public final static long SELECTOR_TIMEOUT = 1000;
   
   /** The maximum size that a single packet can be */
   public final static int MAX_PACKET_SIZE = 4096;
   
   /** The current timeout for waiting for data.  Changing this should take effect immediately. */
   private long receive_timeout;
   /** The current timeout for connecting to sockets.  Changing this will take effect immediately. */
   private long connect_timeout;
   
   /** The selector used for multiplexing the sockets */
   private final Selector selector;
   
   /** Contains all completed connections (servers and clients), indexed by their unique identifiers */
   private final Hashtable <String, ConnectionData> sockets_by_identifier;
   
   /** Contains all the client connections, indexed by their SocketChannels */
   private final Hashtable <SocketChannel, ConnectionData<ClientCallback, SocketChannel>> client_sockets_by_socket;
   /** Contains all the server connecitons, indexed by their ServerSocketchannels */
   private final Hashtable <ServerSocketChannel, ConnectionData<ServerCallback, ServerSocketChannel>> server_sockets_by_socket;
   
   /** Contains all the servers that are not yet connected, indexed by their name */
   private final Hashtable<String, ConnectionData<ClientCallback, SocketChannel>> pending_sockets_by_identifier;
   /** Contains all the servers that are not yet connected, indexed by their socket */
   private final Hashtable<SocketChannel, ConnectionData<ClientCallback, SocketChannel>> pending_sockets_by_socket;
   
   /** A static version of the SocketManager that is returned by getInstance() */
   private static SocketManager staticManager = null;

   /** Create a new instance of ClientManager.  It will spawn a new thread to look after arriving data.  This is
    * private, use getInstance() to retrieve an instance of it.   
    * 
    * @param identifier A way for the user/programmer to distinguish various SocketManager's.
    */
   private SocketManager(String identifier)
   {
      try
      {
         sockets_by_identifier = new Hashtable <String, ConnectionData> ();
   
         client_sockets_by_socket = new Hashtable <SocketChannel, ConnectionData<ClientCallback, SocketChannel>>();
         server_sockets_by_socket = new Hashtable <ServerSocketChannel, ConnectionData<ServerCallback, ServerSocketChannel>>();
         pending_sockets_by_socket = new Hashtable<SocketChannel, ConnectionData<ClientCallback, SocketChannel>>();
         pending_sockets_by_identifier = new Hashtable<String, ConnectionData<ClientCallback, SocketChannel>>();
         
         selector = Selector.open();

         setReceiveTimeout(DEFAULT_RECEIVE_TIMEOUT);
         setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
      }
      catch(IOException e)
      {
         e.printStackTrace();
         throw new RuntimeException("Cannot create SocketManager class");
      }
   }
   
   /** This function is guarenteed to always return the same instance, so other classes don't have to worry
    * about passing it around.  Because it is rare that there will ever be a need for more than one
    * SocketManager instance, it's logical to have a static way of accessing it.
    *
    * @return An instance of the Socket Manager.
    */
   public static SocketManager getInstance()
   {
      if(staticManager == null)
         staticManager = new SocketManager("default");
      
      return staticManager;
   }
   
   /** This function is guarenteed to always return a new instance of a SocketManager that won't be
    * used by anything else.
    *
    * @param identifier The identifier for the SocketManager (meaningless)
    * @return A new instance of a socket manager.
    */
   public static SocketManager getInstance(String identifier)
   {
      return new SocketManager(identifier);
   }
   
   /** This function will do a select operation, which will last no longer than TIMEOUT ms.  If data arrives,
    *  the data will be processed by the appropriate callbacks, followed by this function returning.
    *  Generally, this should be done in an infinite loop in the core of the program.
    * 
    *  If there is at least one connection pending, it will check if any pending connections are finished.
    *  If they are, it calls the appropriate event and adds them to the completed list.  If there are
    *  pending connections, this function will return immediately and the timeout will not be performed. 
    *  It's a compromise that I don't think I can avoid, and is worthwhile, in my opinion. 
    *
    * @throws IOException If an unexpected I/O error occurs.  If a disconnect or timeout occurs, an IOException
    * is not thrown, it is handled with the proper callback instead.
    */
   public void doSelect() throws IOException
   {
      if(selector.select(SELECTOR_TIMEOUT) == 0)
         handleTimeout();
      else
         handleSockets();
   }

   /** If a socket is readable, which means there's data waiting to be read, this function is notified.
    *
    * @param key The readable key.
    */
   protected void handleReadable(SelectionKey key)
   {
       int position;
       byte []data;
      /* Get the channel */
      SocketChannel s = (SocketChannel) key.channel();
      /* Get the connection data */
      ConnectionData <ClientCallback, SocketChannel>cd = client_sockets_by_socket.get(s);
      /* The number of bytes read this time */
      int bytesRead = -1;

      /* Reset the buffer and try to read in data.  If there's an IOException, ignore it, the socket will be
       * closed when the data fails to read */
      try
      {
         cd.getInBuffer().rewind();
         bytesRead = s.read(cd.getInBuffer());
      }
      catch(IOException e)
      {
         /* Don't do anything here, it's handled automatically if the read bytes is wrong */
      }
      
      if(bytesRead < 0)
      {
         /* If no data was read (either due to the socket dying or an IOException occurring), close the socket
          * officially. */
         close(cd.getIdentifier(), true);
      }
      else
      {
          position = cd.getInBuffer().position();
          data = new byte[position];
          cd.getInBuffer().position(0);
          cd.getInBuffer().get(data);
                    
          cd.getInBuffer().rewind();

         /* Call the callback to process the data, or whatever */
         cd.getCallback().receivedData(data);
      }

   }
   
   /** If a socket is writable, which means that data can be sent, this function is notified.
    *
    * @param key The writable key.
    */
   protected void handleWritable(SelectionKey key)
   {
      /* Get the channel */
      SocketChannel s = (SocketChannel) key.channel();
      /* Get the connection data */
      ConnectionData <ClientCallback, SocketChannel>cd = client_sockets_by_socket.get(s);
      /* Get the next buffer we're planning to send */
      ByteBuffer writeData = cd.nextOutBuffer();

      try
      {
         if(writeData == null)
         {
            /* We only want to read now, so re-register with OP_READ */
            s.register(selector, SelectionKey.OP_READ);
         }
         else
         {
            /* We can send data, so send it and carry on. */
            s.write(writeData);
            
            /* Check if the entire buffer was sent.  If it wasn't, add it back to the list */
            if(writeData.position() != writeData.limit())
               cd.readOutBuffer(writeData);
         }
      }
      catch(IOException e)
      {
         /* If an I/O error occurs, close the socket */
         close(cd.getIdentifier(), true);
      }

   }

   /** If a socket is acceptable, which means that a connection is pending, this function is notified.
    *
    * @param key The acceptable key.
    */
   protected void handleAcceptable(SelectionKey key)
   {
      try
      {
         /* Get the channel */
         ServerSocketChannel s = (ServerSocketChannel) key.channel();
         /* Get the connection data */
         ConnectionData<ServerCallback, ServerSocketChannel> cd = server_sockets_by_socket.get(s);
   
         /* The identifier that the new Client will be registered under */
         String identifier = getClientIdentifier(cd.getIdentifier());
         /* The new socket for the client */
         SocketChannel newSocket = cd.getSocket().accept();
         /* The new callback for the client */
         ClientCallback callback = cd.getCallback().getSocketCallback(identifier);
   
         ConnectionData<ClientCallback, SocketChannel> cdNew = new ConnectionData<ClientCallback, SocketChannel>(identifier, newSocket, callback);
         
         /* Make it non-blocking and register it as readable */
         newSocket.configureBlocking(false);
         newSocket.register(selector, SelectionKey.OP_READ);
         
         /* Put it into the hashtables */
         sockets_by_identifier.put(identifier, cdNew);
         client_sockets_by_socket.put(newSocket, cdNew);
   
         /* Let the callback know that the connection has been established */
         cd.getCallback().connectionAccepted();
         /* Let the new client know that he's been connected */
         callback.connected();
         
         /* Wake up the selector, just to make sure it reads the new socket */
         selector.wakeup();
      }
      catch(IOException e)
      {
         e.printStackTrace();
      }
   }
   
   /** If a socket is acceptable, which means that it's ready to complete its connection sequence, this
    * function is called. 
    *
    * @param key The acceptable key.
    */
   protected void handleConnectable(SelectionKey key)
   {
      /* Get the channel */
      SocketChannel s = (SocketChannel) key.channel();
      /* Get the connection data */
      ConnectionData<ClientCallback, SocketChannel> cd = pending_sockets_by_socket.get(s);

      /* The identifier that the new Client will be registered under */
      String identifier = cd.getIdentifier();
      
      System.out.println("Socket is connectable: " + identifier);
      
      try
      {
         /* If it's completed, move it to the real list */
         if(s.finishConnect())
         {
            /* If there is pending data, register the connection for reading and writing.  If
             * there is no data to be sent, just register it for reading. */
            if(cd.hasPendingSend())
               s.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            else
               s.register(selector, SelectionKey.OP_READ);
            
            /* Remove the server from the pending connections */
            pending_sockets_by_socket.remove(s);
            pending_sockets_by_identifier.remove(identifier);
            
            /* Add it to the sets */
            sockets_by_identifier.put(identifier, cd);
            client_sockets_by_socket.put(s, cd);
            
            /* Note: the connected() function will almost invariably send data, so make sure the
             * socket is in a stable state before calling connected() */
            cd.getCallback().connected();
         }
      }
      catch(IOException e)
      {
         /* Remove it from the lists */
         close(identifier, false);
         /* Inform interested parties that the connect failed */
         cd.getCallback().connectFailed(e.toString());
      }
   }
   
   /** This function is called when select() returns, which indicates that something can be done with at least
    * one of the selector's keys.  This function figures out what has to be done, and calls the functions that
    * do the appropriate actions.
    */
   protected void handleSockets()
   {
      Iterator <SelectionKey>it;
      SelectionKey key;
      
      /* Get list of selection keys with pending events */
      it = selector.selectedKeys().iterator();

      /* Process each key */
      while (it.hasNext())
      {

         /* Get the selection key */
         key = it.next();
         /* Remove it from the list to indicate that it is being processed */
         it.remove();

         
         if (key.isReadable())
         {
            handleReadable(key);
         }
         else if (key.isWritable())
         {
            handleWritable(key);
         }
         else if (key.isAcceptable())
         {
            handleAcceptable(key);
         }
         else if (key.isConnectable())
         {
            handleConnectable(key);
         }
         
      }
   }

   /** A timeout occurs every second, and this function handles it.  We have to let each new and
    * pending connection know that another second has passed, and each of the connections have to
    * handle the timeout themselves. */
   protected void handleTimeout()
   {
      Enumeration<String> e;
      String element;
      ConnectionData cd;
      ConnectionData<ClientCallback, SocketChannel> cdClient;
      
      /* Loop through the connected sockets, and let them know that a timeout has happened. */
      for(e = sockets_by_identifier.keys(); e.hasMoreElements(); )
      {
         element = e.nextElement();
         cd = sockets_by_identifier.get(element);
         cd.poll(receive_timeout);
      }
      
      /* Loop through each pending sockets, and let them know a timeout has happened. */
      for(e = pending_sockets_by_identifier.keys(); e.hasMoreElements(); )
      {
         element = e.nextElement();
         cdClient = pending_sockets_by_identifier.get(element);
         
         if(cdClient.connectPoll(connect_timeout - SELECTOR_TIMEOUT) == false)
         {
            close(element, false);
            cdClient.getCallback().connectFailed("Timed out (no response)");
         }
      }
   }
   
   /** Remove/close the specified connection from the list, if possible, and connect to the host, storing
    *  the connection.  This function returns as soon as the address is resolved (timeouts might take some
    *  time), and the connection isn't completed until doSelect() is called at least once after the
    *  connection is made.  If data is sent after calling this but before the connection is completed, the
    *  data is queued and is all sent when the connection is established.   
    *   
    *  @throws IOException If the connect failed.
    *  @return An array of bytes corresponding to the address connected to, just in case the program needs
    *   to know the actual address. 
    */
   public byte []connect(String host, int port, String identifier, ClientCallback callback) throws IOException
   {
      SocketChannel s;
      ConnectionData <ClientCallback, SocketChannel>cd;
      InetAddress []addresses;
      int choice;
      byte []address = null;
      
      /* Close it, just in case it already exists */
      close(identifier, true);

      /* Open the new channel */
      s = SocketChannel.open();
      /* Turn off blocking */
      s.configureBlocking(false);
      
      /* Create a new ConnectionData structure */
      cd = new ConnectionData <ClientCallback, SocketChannel>(identifier, s, callback);
      
      try
      {
         /* Resolve the address to all possible addresses */
         addresses = InetAddress.getAllByName(host);
         
         /* Check if the address was successfully resolved */
         if(addresses.length > 0)
         {
            /* Pick an address randomly */
            choice = (int)(Math.random() * addresses.length);
            /* Connect to the remote socket */
            s.connect(new InetSocketAddress(addresses[choice].getHostAddress(), port));
            /* Add the connection to our list */
            pending_sockets_by_socket.put(s, cd);
            pending_sockets_by_identifier.put(identifier, cd);
            /* Prepare to return the actual address connected to, just in case */
            address = addresses[choice].getAddress();
            /* Register this connection with the selector */
            s.register(selector, SelectionKey.OP_CONNECT);
         }
         else
         {
            /* Inform the callback that the address resolution failed */
            callback.connectFailed("Couldn't find host '" + host + "'");
         }
      }
      catch(IOException e)
      {
         /* Inform the callback that the address resolution failed */
         callback.connectFailed("Couldn't resolve address '" + host + "' (" + e + ")");
      }
      
      return address;
   }
   
   /** Listens for new incoming connections, adding the listening socket to the list of servers.
    *
    * @param port The port to litsen on.  Remember, on Linux, this has to be >=1024 unless you're root.   
    * @param identifier The name to give the server, for the purposes of identification.
    * @param callback The callback to inform when something happens.
    * @throws IOException If the port couldn't be opened for listening. 
    */
   public void listen(int port, String identifier, ServerCallback callback) throws IOException
   {
      ServerSocketChannel s = ServerSocketChannel.open();
      ConnectionData<ServerCallback, ServerSocketChannel> cd;
      
      s.configureBlocking(false);
      s.socket().bind(new InetSocketAddress(port));
      s.register(selector, SelectionKey.OP_ACCEPT);
      
      cd = new ConnectionData<ServerCallback, ServerSocketChannel>(identifier, s, callback);
      
      sockets_by_identifier.put(identifier, cd);
      server_sockets_by_socket.put(s, cd);
   }
   
   /** Remove/close the specified connection, if possible.  If it fails, no exception is thrown, it's just
    * ignored.
    * @param identifier The sting that identifies the connection.
    * @param doCallback If this is true, the callback.disconnect() functions is called.  Otherwise, it's not.
    *  If the socket never actually fully connects, I set doCallback to false and call callback.connectFailed()
    *  manually.
    */
   public void close(String identifier, boolean doCallback)
   {
      ConnectionData cd;
      
      /* Get the socket from the hashtable, if possible */
      cd = sockets_by_identifier.get(identifier);

      if(cd == null)
         cd = pending_sockets_by_identifier.get(identifier);
      
      /* If the socket exists, close it */
      if(cd != null)
      {
         try
         {
            cd.getSocket().close();
         }
         catch(IOException e)
         {
         }

         if(doCallback)
            cd.getCallback().disconnected();
         
         /* Remove the socket from the list (whether it's a client or a server) */
         client_sockets_by_socket.remove(cd.getSocket());
         server_sockets_by_socket.remove(cd.getSocket());
         
         /* Remove the identifier from the list */
         sockets_by_identifier.remove(identifier);
         
         /* Remove it from the pending list, if possible */
         pending_sockets_by_identifier.remove(identifier);
         pending_sockets_by_socket.remove(cd.getSocket());
      }


      /* Wake up the selector to make sure it's removed properly */
      selector.wakeup();
   }

   /** Writes the requested bytes to the socket whenever possible.  The data will be queued, and when the
    * socket becomes writable (probably immediately), it is sent.   
    *
    * @param identifier The socket identifier to send the data over.
    * @param bytes The data to send.
    * @throws IOException If something goes terribly wrong.  Isn't thrown if the socket is disconnected.
    * @throws IllegalArgumentException If the identifier isn't found in the list. 
    */
   public void send(String identifier, byte []bytes)
   {
      ConnectionData cd;
      boolean register = true;

      /* First, try to get the identifier from the active sockets */
      cd = sockets_by_identifier.get(identifier);

      /* It's not an active connection.  Try getting it from a pending connection instead. */
      if(cd == null)
      {
         cd = pending_sockets_by_identifier.get(identifier);
         
         /* If we get it from the pending connections, we don't want it to register the socket, that would
          * break stuff. */
         register = false;
      }
      
      if(cd != null)
      {
         try
         {
            /* Add the data to the buffer */
            cd.addOutBuffer(ByteBuffer.wrap(bytes));
      
            /* If it was found in the active connections, register the socket for writing */
            if(register)
            {
               /* Let it know that there is data that should be sent out now */
               cd.getSocket().register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
               /* Poke it so it sees the data */
               selector.wakeup();
            }
         }
         catch(ClosedChannelException e)
         {
            close(identifier, true);
         }
      }
      else
      {
         throw new IllegalArgumentException("No such identifier: " + identifier);
      }
   }
   
   /** Generates a client identifier from a server name.  This can be changed to suit the application, but
    * I personally just tag the next available number on the end.
    * WARNING: the identifier number can be recycled, when a socket is closed.
    *
    * @param serverIdentifier The identifier for the server that the new name might be based on.
    * @return A unique name that can be used to identify the socket.
    */
   protected String getClientIdentifier(String serverIdentifier)
   {
      String identifier;
      int i = 0;
      
      do
      {
         identifier = serverIdentifier + "-client-" + i;
         i++;
      }
      while(sockets_by_identifier.get(identifier) != null);
      
      return identifier;
   }
   
   /** Set a new timeout value for the selector.  The first timeout will occur in milliseconds time
    * from when this function is called, and call the timeout() function on the callback. 
    *
    * @param milliseconds The number of milliseconds until the next and each subsequent timeout.
    */
   public void setReceiveTimeout(long milliseconds)
   {
      this.receive_timeout = milliseconds;
      selector.wakeup();
   }

   /** Set the new timeout value for the connections.  If a socket goes "milliseconds" long without connecting,
    * it's removed and the error callback is called.  This takes effect instantly, on all pending connections
    * and all subsequent connections.
    *
    * @param milliseconds The number of milliseconds until a socket dies. 
    */
   public void setConnectTimeout(long milliseconds)
   {
      this.connect_timeout = milliseconds;
   }
   
   public static void main(String []args) throws Exception
   {
      SocketManager.getInstance().connect("www.javaop.com", 80, "JavaOp", SocketManager.getInstance().new Tester("JavaOp"));
      SocketManager.getInstance().send("JavaOp", "GET / HTTP/1.0\r\n\r\n".getBytes());
      
      SocketManager.getInstance().connect("www.google.ca", 80, "Google", SocketManager.getInstance().new Tester("Google"));
      //SocketManager.getInstance().connect("www.yahoo.com", 80, "Yahoo", SocketManager.getInstance().new Tester("Yahoo"));
      
      SocketManager.getInstance().connect("192.168.1.3", 6112, "JavaOp2", SocketManager.getInstance().new Tester("JavaOp2"));
      SocketManager.getInstance().connect("www.google.ca", 81, "Google2", SocketManager.getInstance().new Tester("Google2"));
      
      SocketManager.getInstance().connect("www.google.ca", 83, "Google3", SocketManager.getInstance().new Tester("Google3"));
      SocketManager.getInstance().connect("www.google.ca", 84, "Google4", SocketManager.getInstance().new Tester("Google4"));
      SocketManager.getInstance().connect("www.google.ca", 85, "Google5", SocketManager.getInstance().new Tester("Google5"));
      //SocketManager.getInstance().connect("www.yeehoopppppyd.com", 80, "Yahoo2", SocketManager.getInstance().new Tester("Yahoo2"));
   
      
      
      while(true)
         SocketManager.getInstance().doSelect();
   }
   
   private class Tester implements ClientCallback
   {
      private final String name;
      
      public Tester(String name)
      {
         this.name = name;
      }
      
      public void connected()
      {
         if(name.equals("JavaOp") == false)
            SocketManager.getInstance().send(name, "GET / HTTP/1.0\r\n\r\n".getBytes());
         System.out.println(name + ": connected");
         
      }

      public void receivedData(byte[] data)
      {
         System.out.println(name + ": received " + data.length + " bytes");
         
      }

      public void connectFailed(String reason)
      {
         System.out.println(name + ": connect failed (" + reason + ")");
         
      }

      public void disconnected()
      {
         System.out.println(name + ": disconnected");
      }

      public void timeout()
      {
         System.out.println(name + ": timeout");
      }
      
   }
}


/** This local class manages data for a single connection.  The purpose is to keep all the data together. */
class ConnectionData <CallbackType extends SocketCallback, SocketType extends AbstractSelectableChannel>
{
   private final String identifier;
   private final SocketType s;
   private final CallbackType callback;
   private final ByteBuffer inBuffer;
   private final LinkedList <ByteBuffer> outBuffers;
   
   /** Stores the time that the socket was created */
   private final long createTime;
   /** When the connect times out, it's given one final chance, just in case.  This is set to true when it's
    * on its final chance. */
   private boolean lastChance;
   /** Stores the time that the last data was sent, or when the last timeout occurred. */
   private long sendTime;
   
   public ConnectionData (String identifier, SocketType s, CallbackType callback)
   {
      this.identifier = identifier;
      this.s = s;
      this.callback = callback;
      this.inBuffer = ByteBuffer.allocate(SocketManager.MAX_PACKET_SIZE);
      
      outBuffers = new LinkedList <ByteBuffer>();
      
      this.createTime = System.currentTimeMillis();
      this.lastChance = false;
      this.sendTime = System.currentTimeMillis();
   }
   
   public String getIdentifier()
   {
      return identifier;
   }
   
   public SocketType getSocket()
   {
      return s;
   }
   
   public CallbackType getCallback()
   {
      return callback;
   }
   
   public ByteBuffer getInBuffer()
   {
      return inBuffer;
   }
   
   /** Adds a buffer that will be sent out.  The buffer is rewound before adding. */
   public void addOutBuffer(ByteBuffer buffer)
   {
      synchronized(this)
      {
         buffer.rewind();
         outBuffers.addLast(buffer);
      }
   }
   
   /** Re-adds a buffer who didn't manager to send all its data.  It is put at the front so it has the
    * first chance to send next time, and it obviously isn't rewound.
    */
   public void readOutBuffer(ByteBuffer buffer)
   {
      synchronized(this)
      {
         outBuffers.addFirst(buffer);
      }
   }
   
   public boolean hasPendingSend()
   {
      return outBuffers.size() > 0;
   }
   
   public ByteBuffer nextOutBuffer()
   {
      synchronized(this)
      {
         if(outBuffers.size() == 0)
            return null;
         
         return outBuffers.removeFirst();
      }
   }
   
   public long getConnectTime()
   {
      return createTime;
   }
   
   /** This is called periodically while the socket is connected. */
   public void poll(long timeout)
   {
      long elapsedTime = System.currentTimeMillis() - sendTime;
      
      if(elapsedTime > timeout)
      {
         callback.timeout();
         sendTime = System.currentTimeMillis();
      }
   }

   /** This is called periodically while the socket is connecting.  If it times out, a flag is set so it
    * remembers.  If it times out again, without connecting, false is returned.  There are two results from
    * this behaviour:
    * 1. If something takes longer than usual to process as the same time as a connection, the connect still
    *    has a second chance to connect if it times out as a result. 
    * 2. The timeout will actually be timeout + the selector's timeout. 
    *
    * @param timeout The number of milliseconds before a timeout occurs.
    * @return true if the connect is still going forward as planned, or false if the connect has timed out.
    */
   public boolean connectPoll(long timeout)
   {
      long elapsedTime = System.currentTimeMillis() - createTime;
      
//      System.out.println(identifier + ": " + elapsedTime + "ms elapsed");
      
      if(lastChance)
         return false;
      
      if(elapsedTime > timeout)
         lastChance = true;
      
      return true;
   }
   
   public String toString()
   {
      return identifier + " <" + s.toString() + ">";
   }
}


To run it, you'll need a few classes.  You can get them here:
http://www.javaop.com/~iago/nonblocking2.tgz
« Last Edit: January 22, 2006, 06:09:28 pm by iago »

Offline Ender

  • x86
  • Hero Member
  • *****
  • Posts: 2390
    • View Profile
Re: [Java] Handling Multiple Socket Assynchronously (improved)
« Reply #1 on: April 23, 2006, 10:34:52 pm »
iago, your IO package is awesome. I've been using it extensively lately and it's like, "import... implement... poof... the IO is done." I also like the idea of the callback system, which has worked better for me than the Observable-Observer pattern as relying on the observer's single-method interface is very painstaking. The only change I've made is making the making SocketManager abstract and adding some abstract methods for thread handling, this way I can handle the subclass as a SocketManager when I provide the implementation for the abstract methods. To preserve the option of being able to instantiate the SocketManager I provided a getDefaultInstance() method which returns an anonymous SocketManager with empty implementations of its abstract methods.

Offline Joe

  • B&
  • x86
  • Hero Member
  • *****
  • Posts: 10319
  • In Soviet Russia, text read you!
    • View Profile
    • Github
Re: [Java] Handling Multiple Socket Assynchronously (improved)
« Reply #2 on: April 23, 2006, 11:12:50 pm »
I have to agree with Ender. It's easy to use, once I got it figured it out (it was quite the maze!).
I'd personally do as Joe suggests

You might be right about that, Joe.


Offline iago

  • Leader
  • Administrator
  • Hero Member
  • *****
  • Posts: 17914
  • Fnord.
    • View Profile
    • SkullSecurity
Re: [Java] Handling Multiple Socket Assynchronously (improved)
« Reply #3 on: April 24, 2006, 11:12:02 am »
Thanks!  :-)

Incidentally, I tried mofidying this to also handle UDP.  I spend a few hours (over the course of a couple days) on it, then got pissed off and decided to stick with TCP.  </3 UDP.