Changeset 2572
- Timestamp:
- Aug 13, 2014, 3:51:11 PM (9 years ago)
- Location:
- extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles
- Files:
-
- 2 added
- 6 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ConnectionCache.java
r2571 r2572 29 29 import net.sf.basedb.core.DbControl; 30 30 import net.sf.basedb.core.TransactionalAction; 31 import net.sf.basedb.util.uri.ConnectionParameters;32 31 33 32 /** … … 43 42 { 44 43 45 private final ThreadLocal<Map<Connection Parameters, C>> cache;44 private final ThreadLocal<Map<ConnectionInfo, C>> cache; 46 45 private final ThreadLocal<Boolean> initialized; 47 46 … … 54 53 are equal 55 54 */ 56 public ConnectionCache(Comparator<Connection Parameters> comparator)55 public ConnectionCache(Comparator<ConnectionInfo> comparator) 57 56 { 58 57 this.cache = new TheCache<C>(comparator); … … 79 78 @return An existing connection or null if no connection has been cached 80 79 */ 81 public C getClient(Connection Parameterscp)80 public C getClient(ConnectionInfo cp) 82 81 { 83 82 if (cp == null || initialized.get() == null) … … 92 91 @return TRUE if the connection was added to the cache, FALSE if not 93 92 */ 94 public boolean setClient(Connection Parameterscp, C client)93 public boolean setClient(ConnectionInfo cp, C client) 95 94 { 96 95 if (cp == null || initialized.get() == null) return false; … … 126 125 127 126 static class TheCache<C> 128 extends ThreadLocal<Map<Connection Parameters, C>>127 extends ThreadLocal<Map<ConnectionInfo, C>> 129 128 { 130 private final Comparator<Connection Parameters> comparator;129 private final Comparator<ConnectionInfo> comparator; 131 130 132 TheCache(Comparator<Connection Parameters> comparator)131 TheCache(Comparator<ConnectionInfo> comparator) 133 132 { 134 133 this.comparator = comparator; … … 136 135 137 136 @Override 138 protected Map<Connection Parameters, C> initialValue()137 protected Map<ConnectionInfo, C> initialValue() 139 138 { 140 return new TreeMap<Connection Parameters, C>(this.comparator);139 return new TreeMap<ConnectionInfo, C>(this.comparator); 141 140 } 142 141 } -
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ftp/FTPClientWrapper.java
r2521 r2572 2 2 3 3 import it.sauronsoftware.ftp4j.FTPClient; 4 import it.sauronsoftware.ftp4j.FTPFile; 4 5 5 6 import java.io.Closeable; 6 7 import java.io.IOException; 8 import java.io.InputStream; 9 import java.io.OutputStream; 10 import java.io.PipedInputStream; 11 import java.io.PipedOutputStream; 12 import java.net.URI; 13 14 import net.sf.basedb.util.FileUtil; 15 import net.sf.basedb.xfiles.ConnectionInfo; 16 import net.sf.basedb.xfiles.XFiles; 7 17 8 18 /** … … 14 24 @since 1.0 15 25 */ 16 public class FTPClient Closeable26 public class FTPClientWrapper 17 27 implements Closeable 18 28 { 19 29 20 private final FTPClient ftp; 30 public final FTPClient ftp; 31 private int openFiles; 32 private boolean isCached; 21 33 22 public FTPClient Closeable(FTPClient ftp)34 public FTPClientWrapper(FTPClient ftp, ConnectionInfo info) 23 35 { 24 36 this.ftp = ftp; 37 this.isCached = FtpConnectionManager.CONNECTION_CACHE.setClient(info, this); 25 38 } 26 39 … … 30 43 { 31 44 if (ftp == null) return; 32 try45 if (openFiles == 0 || !isCached) 33 46 { 34 ftp.disconnect(true); 47 try 48 { 49 ftp.disconnect(true); 50 } 51 catch (Exception ex) 52 { 53 throw new IOException(ex); 54 } 35 55 } 36 catch (Exception ex)56 else 37 57 { 38 throw new IOException(ex);58 openFiles--; 39 59 } 40 60 } 61 62 public FTPFile[] list(URI uri) 63 throws Exception 64 { 65 FTPFile[] files = ftp.list(uri.getPath()); 66 openFiles++; 67 return files; 68 } 41 69 70 public InputStream download(final URI uri, final long offset) 71 { 72 final PipedInputStream in = new PipedInputStream(XFiles.DEFAULT_BUFFER_SIZE); 73 Thread download = new Thread(new Runnable() 74 { 75 @Override 76 public void run() 77 { 78 OutputStream out = null; 79 try 80 { 81 out = new PipedOutputStream(in); 82 ftp.download(uri.getPath(), out, offset, null); 83 } 84 catch (Exception ex) 85 { 86 throw new RuntimeException(ex); 87 } 88 finally 89 { 90 FileUtil.close(out); 91 } 92 } 93 }); 94 download.start(); 95 openFiles++; 96 return in; 97 } 98 42 99 } -
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ftp/FtpConnectionManager.java
r2523 r2572 27 27 import java.io.IOException; 28 28 import java.io.InputStream; 29 import java.io.OutputStream;30 import java.io.PipedInputStream;31 import java.io.PipedOutputStream;32 29 import java.net.URI; 33 30 … … 39 36 import net.sf.basedb.util.uri.ResumableConnectionManager; 40 37 import net.sf.basedb.util.uri.UriMetadata; 38 import net.sf.basedb.xfiles.ConnectionCache; 39 import net.sf.basedb.xfiles.ConnectionInfo; 41 40 import net.sf.basedb.xfiles.MultiCloseable; 42 41 import net.sf.basedb.xfiles.XFiles; … … 53 52 { 54 53 54 static ConnectionCache<FTPClientWrapper> CONNECTION_CACHE = new ConnectionCache<FTPClientWrapper>(new FtpConnectionParametersComparator()); 55 55 56 private final URI uri; 56 57 private final ConnectionParameters parameters; … … 86 87 URI uri = getURI(); 87 88 InputStream stream = null; 88 FTPClient ftp= null;89 FTPClientWrapper wrapper = null; 89 90 try 90 91 { 91 ftp = connect(uri, parameters); 92 stream = new CloseResourceInputStream(startDownload(ftp, uri.getPath(), offset), new FTPClientCloseable(ftp)); 92 wrapper = connect(uri, parameters); 93 stream = new CloseResourceInputStream( 94 wrapper.download(uri, offset), 95 new MultiCloseable(wrapper)); 93 96 } 94 97 finally … … 97 100 if (stream == null) 98 101 { 99 MultiCloseable.close( new FTPClientCloseable(ftp));102 MultiCloseable.close(wrapper); 100 103 } 101 104 } … … 112 115 URI uri = getURI(); 113 116 metadata = new UriMetadata(uri); 114 115 FTPClient ftp= null;117 118 FTPClientWrapper wrapper = null; 116 119 try 117 120 { 118 ftp= connect(uri, parameters);119 FTPFile[] files = ftp.list(uri.getPath());121 wrapper = connect(uri, parameters); 122 FTPFile[] files = wrapper.list(uri); 120 123 if (files.length > 0) 121 124 { … … 130 133 { 131 134 // Clean up 132 MultiCloseable.close( new FTPClientCloseable(ftp));135 MultiCloseable.close(wrapper); 133 136 } 134 137 } … … 138 141 139 142 140 public FTPClient connect(URI uri, ConnectionParameters parameters) 141 throws IOException 142 { 143 FTPClient ftp = null; 144 boolean connected = false; 143 public FTPClientWrapper connect(URI uri, ConnectionParameters parameters) 144 throws IOException 145 { 146 ConnectionInfo info = new ConnectionInfo(uri, parameters); 147 148 FTPClientWrapper wrapper = CONNECTION_CACHE.getClient(info); 149 boolean connected = wrapper != null && wrapper.ftp.isConnected(); 150 145 151 try 146 152 { 147 ftp = new FTPClient();153 FTPClient ftp = new FTPClient(); 148 154 // Get username and password 149 155 String username = null; … … 182 188 // Always use passive mode 183 189 ftp.setPassive(true); 190 wrapper = new FTPClientWrapper(ftp, info); 184 191 connected = true; 185 192 } … … 190 197 finally 191 198 { 192 if (!connected) MultiCloseable.close(new FTPClientCloseable(ftp)); 193 } 194 return ftp; 199 if (!connected) 200 { 201 MultiCloseable.close(wrapper); 202 wrapper = null; 203 } 204 } 205 return wrapper; 195 206 } 196 207 … … 210 221 } 211 222 212 /**213 Start downloading the file and return an InputStream for214 reading it. The download is started in a separate thread.215 */216 private InputStream startDownload(final FTPClient ftp, final String file, final long offset)217 {218 final PipedInputStream in = new PipedInputStream(XFiles.DEFAULT_BUFFER_SIZE);219 Thread download = new Thread(new Runnable()220 {221 @Override222 public void run()223 {224 try225 {226 OutputStream out = new PipedOutputStream(in);227 ftp.download(file, out, offset, null);228 }229 catch (Exception ex)230 {231 throw new RuntimeException(ex);232 }233 }234 });235 download.start();236 return in;237 }238 239 223 } -
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/ftp/FtpConnectionManagerActionFactory.java
r2521 r2572 22 22 package net.sf.basedb.xfiles.ftp; 23 23 24 import net.sf.basedb.core.DbControl; 24 25 import net.sf.basedb.core.plugin.About; 25 26 import net.sf.basedb.util.extensions.ActionFactory; … … 66 67 initFactory(context); 67 68 } 69 DbControl dc = context.getClientContext().getDbControl(); 70 if (dc != null && !dc.isClosed()) 71 { 72 // Initialize a connection cache hooked up to the current transaction 73 FtpConnectionManager.CONNECTION_CACHE.init(dc); 74 } 68 75 return new ConnectionManagerFactory[] { factory }; 69 76 } -
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/sftp/SSHClientWrapper.java
r2571 r2572 31 31 import net.schmizz.sshj.sftp.RemoteFile; 32 32 import net.schmizz.sshj.sftp.SFTPClient; 33 import net.sf.basedb. util.uri.ConnectionParameters;33 import net.sf.basedb.xfiles.ConnectionInfo; 34 34 import net.sf.basedb.xfiles.MultiCloseable; 35 35 import net.sf.basedb.xfiles.XFiles; … … 56 56 public final SSHClient ssh; 57 57 public final SFTPClient sftp; 58 private final ConnectionParameters parameters;59 58 60 59 private int openFiles; 61 60 private boolean isCached; 62 61 63 public SSHClientWrapper( URI uri, ConnectionParameters parameters)62 public SSHClientWrapper(ConnectionInfo info) 64 63 throws IOException 65 64 { 66 65 this.ssh = new SSHClient(); 67 this.isCached = SftpConnectionManager.CONNECTION_CACHE.setClient( parameters, this);68 this.ssh.addHostKeyVerifier( parameters.getSshFingerprint());66 this.isCached = SftpConnectionManager.CONNECTION_CACHE.setClient(info, this); 67 this.ssh.addHostKeyVerifier(info.parameters.getSshFingerprint()); 69 68 this.ssh.setConnectTimeout(2000); 70 int port = uri.getPort();69 int port = info.uri.getPort(); 71 70 if (port == -1) port = XFiles.DEFAULT_SSH_PORT; 72 this.ssh.connect( uri.getHost(), port);73 this.ssh.authPassword( parameters.getUsername(),parameters.getPassword());71 this.ssh.connect(info.uri.getHost(), port); 72 this.ssh.authPassword(info.parameters.getUsername(), info.parameters.getPassword()); 74 73 this.sftp = ssh.newSFTPClient(); 75 this.parameters = parameters;76 74 this.openFiles = 0; 77 75 } -
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/sftp/SftpConnectionManager.java
r2571 r2572 35 35 import net.sf.basedb.util.uri.UriMetadata; 36 36 import net.sf.basedb.xfiles.ConnectionCache; 37 import net.sf.basedb.xfiles.ConnectionInfo; 37 38 import net.sf.basedb.xfiles.MultiCloseable; 38 39 import net.sf.basedb.xfiles.XFiles; … … 143 144 throws IOException 144 145 { 145 SSHClientWrapper wrapper = CONNECTION_CACHE.getClient(parameters); 146 if (parameters == null) 147 { 148 throw new IOException("No file server specified for " + uri); 149 } 150 else if (parameters.getSshFingerprint() == null) 151 { 152 throw new IOException("No SSH fingerprint in connection parameters for "+ uri); 153 } 154 else if (parameters.getUsername() == null) 155 { 156 throw new IOException("No username in connection parameters for "+ uri); 157 } 158 else if (parameters.getPassword() == null) 159 { 160 throw new IOException("No password in connection parameters for "+ uri); 161 } 162 163 ConnectionInfo info = new ConnectionInfo(uri, parameters); 164 SSHClientWrapper wrapper = CONNECTION_CACHE.getClient(info); 146 165 boolean connected = wrapper != null && wrapper.ssh.isConnected(); 147 166 … … 150 169 if (!connected) 151 170 { 152 if (parameters == null) 153 { 154 throw new IOException("No file server specified for " + uri); 155 } 156 else if (parameters.getSshFingerprint() == null) 157 { 158 throw new IOException("No SSH fingerprint in connection parameters for "+ uri); 159 } 160 else if (parameters.getUsername() == null) 161 { 162 throw new IOException("No username in connection parameters for "+ uri); 163 } 164 else if (parameters.getPassword() == null) 165 { 166 throw new IOException("No password in connection parameters for "+ uri); 167 } 168 169 wrapper = new SSHClientWrapper(uri, parameters); 171 wrapper = new SSHClientWrapper(info); 170 172 connected = true; 171 173 } -
extensions/net.sf.basedb.xfiles/trunk/src/net/sf/basedb/xfiles/sftp/SftpConnectionParametersComparator.java
r2571 r2572 24 24 import java.util.Comparator; 25 25 26 import net.sf.basedb. util.uri.ConnectionParameters;26 import net.sf.basedb.xfiles.ConnectionInfo; 27 27 28 28 /** 29 29 Comparator implementation that compare two sets of connection 30 30 parameters for the SSH protocol. The parameters are considered 31 equal if the connect to the same host with the same username. 31 equal if the connect to the same host with the same username and 32 password. 32 33 33 34 @since 1.1 34 35 */ 35 36 public class SftpConnectionParametersComparator 36 implements Comparator<Connection Parameters>37 implements Comparator<ConnectionInfo> 37 38 { 38 39 39 40 @Override 40 public int compare(Connection Parameters cp1, ConnectionParameterscp2)41 public int compare(ConnectionInfo cp1, ConnectionInfo cp2) 41 42 { 42 int result = compare(cp1. getHost(), cp2.getHost());43 int result = compare(cp1.uri.getHost(), cp2.uri.getHost()); 43 44 if (result == 0) 44 45 { 45 result = compare(cp1.getUsername(), cp2.getUsername()); 46 result = cp2.uri.getPort() - cp1.uri.getPort(); 47 } 48 if (result == 0) 49 { 50 result = compare(cp1.parameters.getUsername(), cp2.parameters.getUsername()); 51 } 52 if (result == 0) 53 { 54 result = compare(cp1.parameters.getPassword(), cp2.parameters.getPassword()); 46 55 } 47 56 return result;
Note: See TracChangeset
for help on using the changeset viewer.