Changeset 2572


Ignore:
Timestamp:
Aug 13, 2014, 3:51:11 PM (9 years ago)
Author:
Nicklas Nordborg
Message:

References #617: Re-use server connections within a transaction

Implemented for the FTP protocol. Since this supports anonymous ftp without a file server the connection parameters comparison needed to be changed so that it is based on both the URI and the connection parameters.

Location:
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles
Files:
2 added
6 edited
1 moved

Legend:

Unmodified
Added
Removed
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ConnectionCache.java

    r2571 r2572  
    2929import net.sf.basedb.core.DbControl;
    3030import net.sf.basedb.core.TransactionalAction;
    31 import net.sf.basedb.util.uri.ConnectionParameters;
    3231
    3332/**
     
    4342{
    4443
    45   private final ThreadLocal<Map<ConnectionParameters, C>> cache;
     44  private final ThreadLocal<Map<ConnectionInfo, C>> cache;
    4645  private final ThreadLocal<Boolean> initialized;
    4746 
     
    5453      are equal
    5554  */
    56   public ConnectionCache(Comparator<ConnectionParameters> comparator)
     55  public ConnectionCache(Comparator<ConnectionInfo> comparator)
    5756  {
    5857    this.cache = new TheCache<C>(comparator);
     
    7978    @return An existing connection or null if no connection has been cached
    8079  */
    81   public C getClient(ConnectionParameters cp)
     80  public C getClient(ConnectionInfo cp)
    8281  {
    8382    if (cp == null || initialized.get() == null)
     
    9291    @return TRUE if the connection was added to the cache, FALSE if not
    9392  */
    94   public boolean setClient(ConnectionParameters cp, C client)
     93  public boolean setClient(ConnectionInfo cp, C client)
    9594  {
    9695    if (cp == null || initialized.get() == null) return false;
     
    126125 
    127126  static class TheCache<C>
    128     extends ThreadLocal<Map<ConnectionParameters, C>>
     127    extends ThreadLocal<Map<ConnectionInfo, C>>
    129128  {
    130     private final Comparator<ConnectionParameters> comparator;
     129    private final Comparator<ConnectionInfo> comparator;
    131130   
    132     TheCache(Comparator<ConnectionParameters> comparator)
     131    TheCache(Comparator<ConnectionInfo> comparator)
    133132    {
    134133      this.comparator = comparator;
     
    136135   
    137136    @Override
    138     protected Map<ConnectionParameters, C> initialValue()
     137    protected Map<ConnectionInfo, C> initialValue()
    139138    {
    140       return new TreeMap<ConnectionParameters, C>(this.comparator);
     139      return new TreeMap<ConnectionInfo, C>(this.comparator);
    141140    }
    142141  }
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ftp/FTPClientWrapper.java

    r2521 r2572  
    22
    33import it.sauronsoftware.ftp4j.FTPClient;
     4import it.sauronsoftware.ftp4j.FTPFile;
    45
    56import java.io.Closeable;
    67import java.io.IOException;
     8import java.io.InputStream;
     9import java.io.OutputStream;
     10import java.io.PipedInputStream;
     11import java.io.PipedOutputStream;
     12import java.net.URI;
     13
     14import net.sf.basedb.util.FileUtil;
     15import net.sf.basedb.xfiles.ConnectionInfo;
     16import net.sf.basedb.xfiles.XFiles;
    717
    818/**
     
    1424  @since 1.0
    1525*/
    16 public class FTPClientCloseable
     26public class FTPClientWrapper
    1727  implements Closeable
    1828{
    1929 
    20   private final FTPClient ftp;
     30  public final FTPClient ftp;
     31  private int openFiles;
     32  private boolean isCached;
    2133 
    22   public FTPClientCloseable(FTPClient ftp)
     34  public FTPClientWrapper(FTPClient ftp, ConnectionInfo info)
    2335  {
    2436    this.ftp = ftp;
     37    this.isCached = FtpConnectionManager.CONNECTION_CACHE.setClient(info, this);
    2538  }
    2639
     
    3043  {
    3144    if (ftp == null) return;
    32     try
     45    if (openFiles == 0 || !isCached)
    3346    {
    34       ftp.disconnect(true);
     47      try
     48      {
     49        ftp.disconnect(true);
     50      }
     51      catch (Exception ex)
     52      {
     53        throw new IOException(ex);
     54      }
    3555    }
    36     catch (Exception ex)
     56    else
    3757    {
    38       throw new IOException(ex);
     58      openFiles--;
    3959    }
    4060  }
     61 
     62  public FTPFile[] list(URI uri)
     63    throws Exception
     64  {
     65    FTPFile[] files = ftp.list(uri.getPath());
     66    openFiles++;
     67    return files;
     68  }
    4169
     70  public InputStream download(final URI uri, final long offset)
     71  {
     72    final PipedInputStream in = new PipedInputStream(XFiles.DEFAULT_BUFFER_SIZE);
     73    Thread download = new Thread(new Runnable()
     74    {
     75      @Override
     76      public void run()
     77      {
     78        OutputStream out = null;
     79        try
     80        {
     81          out = new PipedOutputStream(in);
     82          ftp.download(uri.getPath(), out, offset, null);
     83        }
     84        catch (Exception ex)
     85        {
     86          throw new RuntimeException(ex);
     87        }
     88        finally
     89        {
     90          FileUtil.close(out);
     91        }
     92      }
     93    });
     94    download.start();
     95    openFiles++;
     96    return in;
     97  }
     98 
    4299}
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ftp/FtpConnectionManager.java

    r2523 r2572  
    2727import java.io.IOException;
    2828import java.io.InputStream;
    29 import java.io.OutputStream;
    30 import java.io.PipedInputStream;
    31 import java.io.PipedOutputStream;
    3229import java.net.URI;
    3330
     
    3936import net.sf.basedb.util.uri.ResumableConnectionManager;
    4037import net.sf.basedb.util.uri.UriMetadata;
     38import net.sf.basedb.xfiles.ConnectionCache;
     39import net.sf.basedb.xfiles.ConnectionInfo;
    4140import net.sf.basedb.xfiles.MultiCloseable;
    4241import net.sf.basedb.xfiles.XFiles;
     
    5352{
    5453
     54  static ConnectionCache<FTPClientWrapper> CONNECTION_CACHE = new ConnectionCache<FTPClientWrapper>(new FtpConnectionParametersComparator());
     55
    5556  private final URI uri;
    5657  private final ConnectionParameters parameters;
     
    8687    URI uri = getURI();
    8788    InputStream stream = null;
    88     FTPClient ftp = null;
     89    FTPClientWrapper wrapper = null;
    8990    try
    9091    {
    91       ftp = connect(uri, parameters);
    92       stream = new CloseResourceInputStream(startDownload(ftp, uri.getPath(), offset), new FTPClientCloseable(ftp));
     92      wrapper = connect(uri, parameters);
     93      stream = new CloseResourceInputStream(
     94          wrapper.download(uri, offset),
     95          new MultiCloseable(wrapper));
    9396    }
    9497    finally
     
    97100      if (stream == null)
    98101      {
    99         MultiCloseable.close(new FTPClientCloseable(ftp));
     102        MultiCloseable.close(wrapper);
    100103      }
    101104    }
     
    112115      URI uri = getURI();
    113116      metadata = new UriMetadata(uri);
    114      
    115       FTPClient ftp = null;
     117
     118      FTPClientWrapper wrapper = null;
    116119      try
    117120      {
    118         ftp = connect(uri, parameters);
    119         FTPFile[] files = ftp.list(uri.getPath());
     121        wrapper = connect(uri, parameters);
     122        FTPFile[] files = wrapper.list(uri);
    120123        if (files.length > 0)
    121124        {
     
    130133      {
    131134        // Clean up
    132         MultiCloseable.close(new FTPClientCloseable(ftp));
     135        MultiCloseable.close(wrapper);
    133136      }
    134137    }
     
    138141 
    139142 
    140   public FTPClient connect(URI uri, ConnectionParameters parameters)
    141     throws IOException
    142   {
    143     FTPClient ftp = null;
    144     boolean connected = false;
     143  public FTPClientWrapper connect(URI uri, ConnectionParameters parameters)
     144    throws IOException
     145  {
     146    ConnectionInfo info = new ConnectionInfo(uri, parameters);
     147
     148    FTPClientWrapper wrapper = CONNECTION_CACHE.getClient(info);
     149    boolean connected = wrapper != null && wrapper.ftp.isConnected();
     150
    145151    try
    146152    {
    147       ftp = new FTPClient();
     153      FTPClient ftp = new FTPClient();
    148154      // Get username and password
    149155      String username = null;
     
    182188      // Always use passive mode
    183189      ftp.setPassive(true);
     190      wrapper = new FTPClientWrapper(ftp, info);
    184191      connected = true;
    185192    }
     
    190197    finally
    191198    {
    192       if (!connected) MultiCloseable.close(new FTPClientCloseable(ftp));
    193     }
    194     return ftp;
     199      if (!connected)
     200      {
     201        MultiCloseable.close(wrapper);
     202        wrapper = null;
     203      }
     204    }
     205    return wrapper;
    195206  }
    196207 
     
    210221  }
    211222 
    212   /**
    213     Start downloading the file and return an InputStream for
    214     reading it. The download is started in a separate thread.
    215   */
    216   private InputStream startDownload(final FTPClient ftp, final String file, final long offset)
    217   {
    218     final PipedInputStream in = new PipedInputStream(XFiles.DEFAULT_BUFFER_SIZE);
    219     Thread download = new Thread(new Runnable()
    220     {
    221       @Override
    222       public void run()
    223       {
    224         try
    225         {
    226           OutputStream out = new PipedOutputStream(in);
    227           ftp.download(file, out, offset, null);
    228         }
    229         catch (Exception ex)
    230         {
    231           throw new RuntimeException(ex);
    232         }
    233       }
    234     });
    235     download.start();
    236     return in;
    237   }
    238  
    239223}
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ftp/FtpConnectionManagerActionFactory.java

    r2521 r2572  
    2222package net.sf.basedb.xfiles.ftp;
    2323
     24import net.sf.basedb.core.DbControl;
    2425import net.sf.basedb.core.plugin.About;
    2526import net.sf.basedb.util.extensions.ActionFactory;
     
    6667      initFactory(context);
    6768    }
     69    DbControl dc = context.getClientContext().getDbControl();
     70    if (dc != null && !dc.isClosed())
     71    {
     72      // Initialize a connection cache hooked up to the current transaction
     73      FtpConnectionManager.CONNECTION_CACHE.init(dc);
     74    }
    6875    return new ConnectionManagerFactory[] { factory };
    6976  }
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/sftp/SSHClientWrapper.java

    r2571 r2572  
    3131import net.schmizz.sshj.sftp.RemoteFile;
    3232import net.schmizz.sshj.sftp.SFTPClient;
    33 import net.sf.basedb.util.uri.ConnectionParameters;
     33import net.sf.basedb.xfiles.ConnectionInfo;
    3434import net.sf.basedb.xfiles.MultiCloseable;
    3535import net.sf.basedb.xfiles.XFiles;
     
    5656  public final SSHClient ssh;
    5757  public final SFTPClient sftp;
    58   private final ConnectionParameters parameters;
    5958 
    6059  private int openFiles;
    6160  private boolean isCached;
    6261 
    63   public SSHClientWrapper(URI uri, ConnectionParameters parameters)
     62  public SSHClientWrapper(ConnectionInfo info)
    6463    throws IOException
    6564  {
    6665    this.ssh = new SSHClient();
    67     this.isCached = SftpConnectionManager.CONNECTION_CACHE.setClient(parameters, this);
    68     this.ssh.addHostKeyVerifier(parameters.getSshFingerprint());
     66    this.isCached = SftpConnectionManager.CONNECTION_CACHE.setClient(info, this);
     67    this.ssh.addHostKeyVerifier(info.parameters.getSshFingerprint());
    6968    this.ssh.setConnectTimeout(2000);
    70     int port = uri.getPort();
     69    int port = info.uri.getPort();
    7170    if (port == -1) port = XFiles.DEFAULT_SSH_PORT;
    72     this.ssh.connect(uri.getHost(), port);
    73     this.ssh.authPassword(parameters.getUsername(), parameters.getPassword());
     71    this.ssh.connect(info.uri.getHost(), port);
     72    this.ssh.authPassword(info.parameters.getUsername(), info.parameters.getPassword());
    7473    this.sftp = ssh.newSFTPClient();
    75     this.parameters = parameters;
    7674    this.openFiles = 0;
    7775  }
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/sftp/SftpConnectionManager.java

    r2571 r2572  
    3535import net.sf.basedb.util.uri.UriMetadata;
    3636import net.sf.basedb.xfiles.ConnectionCache;
     37import net.sf.basedb.xfiles.ConnectionInfo;
    3738import net.sf.basedb.xfiles.MultiCloseable;
    3839import net.sf.basedb.xfiles.XFiles;
     
    143144    throws IOException
    144145  {
    145     SSHClientWrapper wrapper = CONNECTION_CACHE.getClient(parameters);
     146    if (parameters == null)
     147    {
     148      throw new IOException("No file server specified for " + uri);
     149    }
     150    else if (parameters.getSshFingerprint() == null)
     151    {
     152      throw new IOException("No SSH fingerprint in connection parameters for "+ uri);
     153    }
     154    else if (parameters.getUsername() == null)
     155    {
     156      throw new IOException("No username in connection parameters for "+ uri);
     157    }
     158    else if (parameters.getPassword() == null)
     159    {
     160      throw new IOException("No password in connection parameters for "+ uri);
     161    }
     162   
     163    ConnectionInfo info = new ConnectionInfo(uri, parameters);
     164    SSHClientWrapper wrapper = CONNECTION_CACHE.getClient(info);
    146165    boolean connected = wrapper != null && wrapper.ssh.isConnected();
    147166   
     
    150169      if (!connected)
    151170      {
    152         if (parameters == null)
    153         {
    154           throw new IOException("No file server specified for " + uri);
    155         }
    156         else if (parameters.getSshFingerprint() == null)
    157         {
    158           throw new IOException("No SSH fingerprint in connection parameters for "+ uri);
    159         }
    160         else if (parameters.getUsername() == null)
    161         {
    162           throw new IOException("No username in connection parameters for "+ uri);
    163         }
    164         else if (parameters.getPassword() == null)
    165         {
    166           throw new IOException("No password in connection parameters for "+ uri);
    167         }
    168        
    169         wrapper = new SSHClientWrapper(uri, parameters);
     171        wrapper = new SSHClientWrapper(info);
    170172        connected = true;
    171173      }
  • extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/sftp/SftpConnectionParametersComparator.java

    r2571 r2572  
    2424import java.util.Comparator;
    2525
    26 import net.sf.basedb.util.uri.ConnectionParameters;
     26import net.sf.basedb.xfiles.ConnectionInfo;
    2727
    2828/**
    2929  Comparator implementation that compare two sets of connection
    3030  parameters for the SSH protocol. The parameters are considered
    31   equal if the connect to the same host with the same username.
     31  equal if the connect to the same host with the same username and
     32  password.
    3233
    3334  @since 1.1
    3435*/
    3536public class SftpConnectionParametersComparator
    36   implements Comparator<ConnectionParameters>
     37  implements Comparator<ConnectionInfo>
    3738{
    3839
    3940  @Override
    40   public int compare(ConnectionParameters cp1, ConnectionParameters cp2)
     41  public int compare(ConnectionInfo cp1, ConnectionInfo cp2)
    4142  {
    42     int result = compare(cp1.getHost(), cp2.getHost());
     43    int result = compare(cp1.uri.getHost(), cp2.uri.getHost());
    4344    if (result == 0)
    4445    {
    45       result = compare(cp1.getUsername(), cp2.getUsername());
     46      result = cp2.uri.getPort() - cp1.uri.getPort();
     47    }
     48    if (result == 0)
     49    {
     50      result = compare(cp1.parameters.getUsername(), cp2.parameters.getUsername());
     51    }
     52    if (result == 0)
     53    {
     54      result = compare(cp1.parameters.getPassword(), cp2.parameters.getPassword());
    4655    }
    4756    return result;
Note: See TracChangeset for help on using the changeset viewer.