001package com.box.sdk; 002 003import com.box.sdk.http.HttpMethod; 004import com.eclipsesource.json.Json; 005import com.eclipsesource.json.JsonObject; 006import java.io.IOException; 007import java.io.InputStream; 008import java.net.URL; 009import java.security.DigestInputStream; 010import java.security.MessageDigest; 011import java.security.NoSuchAlgorithmException; 012import java.util.ArrayList; 013import java.util.List; 014import java.util.Map; 015import java.util.concurrent.Executors; 016import java.util.concurrent.ThreadPoolExecutor; 017import java.util.concurrent.TimeUnit; 018 019/** 020 * Utility class for uploading large files. 021 */ 022public final class LargeFileUpload { 023 private static final String DIGEST_ALGORITHM_SHA1 = "SHA1"; 024 private static final int DEFAULT_CONNECTIONS = 3; 025 private static final int DEFAULT_TIMEOUT = 1; 026 private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.HOURS; 027 private static final int THREAD_POOL_WAIT_TIME_IN_MILLIS = 1000; 028 private final ThreadPoolExecutor executorService; 029 private final long timeout; 030 private final TimeUnit timeUnit; 031 032 /** 033 * Creates a LargeFileUpload object. 034 * 035 * @param nParallelConnections number of parallel http connections to use 036 * @param timeOut time to wait before killing the job 037 * @param unit time unit for the time wait value 038 */ 039 public LargeFileUpload(int nParallelConnections, long timeOut, TimeUnit unit) { 040 this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nParallelConnections); 041 this.timeout = timeOut; 042 this.timeUnit = unit; 043 } 044 045 /** 046 * Creates a LargeFileUpload object with a default number of parallel conections and timeout. 047 */ 048 public LargeFileUpload() { 049 this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(LargeFileUpload.DEFAULT_CONNECTIONS); 050 this.timeout = LargeFileUpload.DEFAULT_TIMEOUT; 051 this.timeUnit = LargeFileUpload.DEFAULT_TIMEUNIT; 052 } 053 054 private static byte[] getBytesFromStream(InputStream stream, int numBytes) { 055 056 int bytesNeeded = numBytes; 057 int offset = 0; 058 byte[] bytes = new byte[numBytes]; 059 060 while (bytesNeeded > 0) { 061 062 int bytesRead; 063 try { 064 bytesRead = stream.read(bytes, offset, bytesNeeded); 065 } catch (IOException ioe) { 066 throw new BoxAPIException("Reading data from stream failed.", ioe); 067 } 068 069 if (bytesRead == -1) { 070 throw new BoxAPIException("Stream ended while upload was progressing"); 071 } 072 073 bytesNeeded = bytesNeeded - bytesRead; 074 offset = offset + bytesRead; 075 } 076 077 return bytes; 078 } 079 080 private BoxFileUploadSession.Info createUploadSession(BoxAPIConnection boxApi, String folderId, 081 URL url, String fileName, long fileSize) { 082 083 BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST); 084 085 //Create the JSON body of the request 086 JsonObject body = new JsonObject(); 087 body.add("folder_id", folderId); 088 body.add("file_name", fileName); 089 body.add("file_size", fileSize); 090 request.setBody(body.toString()); 091 092 try (BoxJSONResponse response = request.send()) { 093 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 094 095 String sessionId = jsonObject.get("id").asString(); 096 BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId); 097 098 return session.new Info(jsonObject); 099 } 100 } 101 102 /** 103 * Uploads a new large file. 104 * 105 * @param boxApi the API connection to be used by the upload session. 106 * @param folderId the id of the folder in which the file will be uploaded. 107 * @param stream the input stream that feeds the content of the file. 108 * @param url the upload session URL. 109 * @param fileName the name of the file to be created. 110 * @param fileSize the total size of the file. 111 * @return the created file instance. 112 * @throws InterruptedException when a thread gets interupted. 113 * @throws IOException when reading a stream throws exception. 114 */ 115 public BoxFile.Info upload(BoxAPIConnection boxApi, String folderId, InputStream stream, URL url, 116 String fileName, long fileSize) throws InterruptedException, IOException { 117 //Create a upload session 118 BoxFileUploadSession.Info session = this.createUploadSession(boxApi, folderId, url, fileName, fileSize); 119 return this.uploadHelper(session, stream, fileSize, null); 120 } 121 122 /** 123 * Uploads a new large file and sets file attributes. 124 * 125 * @param boxApi the API connection to be used by the upload session. 126 * @param folderId the id of the folder in which the file will be uploaded. 127 * @param stream the input stream that feeds the content of the file. 128 * @param url the upload session URL. 129 * @param fileName the name of the file to be created. 130 * @param fileSize the total size of the file. 131 * @param fileAttributes file attributes to set 132 * @return the created file instance. 133 * @throws InterruptedException when a thread gets interupted. 134 * @throws IOException when reading a stream throws exception. 135 */ 136 public BoxFile.Info upload(BoxAPIConnection boxApi, String folderId, InputStream stream, URL url, 137 String fileName, long fileSize, Map<String, String> fileAttributes) 138 throws InterruptedException, IOException { 139 //Create a upload session 140 BoxFileUploadSession.Info session = this.createUploadSession(boxApi, folderId, url, fileName, fileSize); 141 return this.uploadHelper(session, stream, fileSize, fileAttributes); 142 } 143 144 /** 145 * Creates a new version of a large file. 146 * 147 * @param boxApi the API connection to be used by the upload session. 148 * @param stream the input stream that feeds the content of the file. 149 * @param url the upload session URL. 150 * @param fileSize the total size of the file. 151 * @return the file instance that also contains the version information. 152 * @throws InterruptedException when a thread gets interupted. 153 * @throws IOException when reading a stream throws exception. 154 */ 155 public BoxFile.Info upload(BoxAPIConnection boxApi, InputStream stream, URL url, long fileSize) 156 throws InterruptedException, IOException { 157 //creates a upload session 158 BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize); 159 return this.uploadHelper(session, stream, fileSize, null); 160 } 161 162 /** 163 * Creates a new version of a large file and sets file attributes. 164 * 165 * @param boxApi the API connection to be used by the upload session. 166 * @param stream the input stream that feeds the content of the file. 167 * @param url the upload session URL. 168 * @param fileSize the total size of the file. 169 * @param fileAttributes file attributes to set. 170 * @return the file instance that also contains the version information. 171 * @throws InterruptedException when a thread gets interupted. 172 * @throws IOException when reading a stream throws exception. 173 */ 174 public BoxFile.Info upload(BoxAPIConnection boxApi, InputStream stream, URL url, long fileSize, 175 Map<String, String> fileAttributes) 176 throws InterruptedException, IOException { 177 //creates an upload session 178 BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize); 179 return this.uploadHelper(session, stream, fileSize, fileAttributes); 180 } 181 182 private BoxFile.Info uploadHelper(BoxFileUploadSession.Info session, InputStream stream, long fileSize, 183 Map<String, String> fileAttributes) 184 throws InterruptedException { 185 //Upload parts using the upload session 186 MessageDigest digest; 187 try { 188 digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1); 189 } catch (NoSuchAlgorithmException ae) { 190 throw new BoxAPIException("Digest algorithm not found", ae); 191 } 192 DigestInputStream dis = new DigestInputStream(stream, digest); 193 List<BoxFileUploadSessionPart> parts = this.uploadParts(session, dis, fileSize); 194 195 //Creates the file hash 196 byte[] digestBytes = digest.digest(); 197 String digestStr = Base64.encode(digestBytes); 198 199 //Commit the upload session. If there is a failure, abort the commit. 200 try { 201 return session.getResource().commit(digestStr, parts, fileAttributes, null, null); 202 } catch (Exception e) { 203 session.getResource().abort(); 204 throw new BoxAPIException("Unable to commit the upload session", e); 205 } 206 } 207 208 private BoxFileUploadSession.Info createUploadSession(BoxAPIConnection boxApi, URL url, long fileSize) { 209 BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST); 210 211 //Creates the body of the request 212 JsonObject body = new JsonObject(); 213 body.add("file_size", fileSize); 214 request.setBody(body.toString()); 215 216 try (BoxJSONResponse response = request.send()) { 217 JsonObject jsonObject = Json.parse(response.getJSON()).asObject(); 218 219 String sessionId = jsonObject.get("id").asString(); 220 BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId); 221 222 return session.new Info(jsonObject); 223 } 224 } 225 226 /* 227 * Upload parts of the file. The part size is retrieved from the upload session. 228 */ 229 private List<BoxFileUploadSessionPart> uploadParts( 230 BoxFileUploadSession.Info session, InputStream stream, long fileSize 231 ) throws InterruptedException { 232 List<BoxFileUploadSessionPart> parts = new ArrayList<>(); 233 234 int partSize = session.getPartSize(); 235 long offset = 0; 236 long processed = 0; 237 int partPostion = 0; 238 //Set the Max Queue Size to 1.5x the number of processors 239 double maxQueueSizeDouble = Math.ceil(this.executorService.getMaximumPoolSize() * 1.5); 240 int maxQueueSize = Double.valueOf(maxQueueSizeDouble).intValue(); 241 while (processed < fileSize) { 242 //Waiting for any thread to finish before 243 long timeoutForWaitingInMillis = TimeUnit.MILLISECONDS.convert(this.timeout, this.timeUnit); 244 if (this.executorService.getCorePoolSize() <= this.executorService.getActiveCount()) { 245 if (timeoutForWaitingInMillis > 0) { 246 Thread.sleep(LargeFileUpload.THREAD_POOL_WAIT_TIME_IN_MILLIS); 247 timeoutForWaitingInMillis -= THREAD_POOL_WAIT_TIME_IN_MILLIS; 248 } else { 249 throw new BoxAPIException("Upload parts timedout"); 250 } 251 } 252 if (this.executorService.getQueue().size() < maxQueueSize) { 253 long diff = fileSize - processed; 254 //The size last part of the file can be lesser than the part size. 255 if (diff < (long) partSize) { 256 partSize = (int) diff; 257 } 258 parts.add(null); 259 byte[] bytes = getBytesFromStream(stream, partSize); 260 this.executorService.execute( 261 new LargeFileUploadTask(session.getResource(), bytes, offset, 262 partSize, fileSize, parts, partPostion) 263 ); 264 265 //Increase the offset and proceesed bytes to calculate the Content-Range header. 266 processed += partSize; 267 offset += partSize; 268 partPostion++; 269 } 270 } 271 this.executorService.shutdown(); 272 this.executorService.awaitTermination(this.timeout, this.timeUnit); 273 return parts; 274 } 275 276 /** 277 * Generates the Base64 encoded SHA-1 hash for content available in the stream. 278 * It can be used to calculate the hash of a file. 279 * 280 * @param stream the input stream of the file or data. 281 * @return the Base64 encoded hash string. 282 */ 283 public String generateDigest(InputStream stream) { 284 MessageDigest digest; 285 try { 286 digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1); 287 } catch (NoSuchAlgorithmException ae) { 288 throw new BoxAPIException("Digest algorithm not found", ae); 289 } 290 291 //Calcuate the digest using the stream. 292 DigestInputStream dis = new DigestInputStream(stream, digest); 293 try { 294 int value = dis.read(); 295 while (value != -1) { 296 value = dis.read(); 297 } 298 } catch (IOException ioe) { 299 throw new BoxAPIException("Reading the stream failed.", ioe); 300 } 301 302 //Get the calculated digest for the stream 303 byte[] digestBytes = digest.digest(); 304 return Base64.encode(digestBytes); 305 } 306}