001package com.box.sdk;
002
003import com.box.sdk.http.HttpMethod;
004import com.eclipsesource.json.Json;
005import com.eclipsesource.json.JsonObject;
006import java.io.IOException;
007import java.io.InputStream;
008import java.net.URL;
009import java.security.DigestInputStream;
010import java.security.MessageDigest;
011import java.security.NoSuchAlgorithmException;
012import java.util.ArrayList;
013import java.util.List;
014import java.util.Map;
015import java.util.concurrent.Executors;
016import java.util.concurrent.ThreadPoolExecutor;
017import java.util.concurrent.TimeUnit;
018
019/**
020 * Utility class for uploading large files.
021 */
022public final class LargeFileUpload {
023    private static final String DIGEST_ALGORITHM_SHA1 = "SHA1";
024    private static final int DEFAULT_CONNECTIONS = 3;
025    private static final int DEFAULT_TIMEOUT = 1;
026    private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.HOURS;
027    private static final int THREAD_POOL_WAIT_TIME_IN_MILLIS = 1000;
028    private final ThreadPoolExecutor executorService;
029    private final long timeout;
030    private final TimeUnit timeUnit;
031
032    /**
033     * Creates a LargeFileUpload object.
034     *
035     * @param nParallelConnections number of parallel http connections to use
036     * @param timeOut              time to wait before killing the job
037     * @param unit                 time unit for the time wait value
038     */
039    public LargeFileUpload(int nParallelConnections, long timeOut, TimeUnit unit) {
040        this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nParallelConnections);
041        this.timeout = timeOut;
042        this.timeUnit = unit;
043    }
044
045    /**
046     * Creates a LargeFileUpload object with a default number of parallel conections and timeout.
047     */
048    public LargeFileUpload() {
049        this.executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(LargeFileUpload.DEFAULT_CONNECTIONS);
050        this.timeout = LargeFileUpload.DEFAULT_TIMEOUT;
051        this.timeUnit = LargeFileUpload.DEFAULT_TIMEUNIT;
052    }
053
054    private static byte[] getBytesFromStream(InputStream stream, int numBytes) {
055
056        int bytesNeeded = numBytes;
057        int offset = 0;
058        byte[] bytes = new byte[numBytes];
059
060        while (bytesNeeded > 0) {
061
062            int bytesRead;
063            try {
064                bytesRead = stream.read(bytes, offset, bytesNeeded);
065            } catch (IOException ioe) {
066                throw new BoxAPIException("Reading data from stream failed.", ioe);
067            }
068
069            if (bytesRead == -1) {
070                throw new BoxAPIException("Stream ended while upload was progressing");
071            }
072
073            bytesNeeded = bytesNeeded - bytesRead;
074            offset = offset + bytesRead;
075        }
076
077        return bytes;
078    }
079
080    private BoxFileUploadSession.Info createUploadSession(BoxAPIConnection boxApi, String folderId,
081                                                          URL url, String fileName, long fileSize) {
082
083        BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST);
084
085        //Create the JSON body of the request
086        JsonObject body = new JsonObject();
087        body.add("folder_id", folderId);
088        body.add("file_name", fileName);
089        body.add("file_size", fileSize);
090        request.setBody(body.toString());
091
092        try (BoxJSONResponse response = request.send()) {
093            JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
094
095            String sessionId = jsonObject.get("id").asString();
096            BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId);
097
098            return session.new Info(jsonObject);
099        }
100    }
101
102    /**
103     * Uploads a new large file.
104     *
105     * @param boxApi   the API connection to be used by the upload session.
106     * @param folderId the id of the folder in which the file will be uploaded.
107     * @param stream   the input stream that feeds the content of the file.
108     * @param url      the upload session URL.
109     * @param fileName the name of the file to be created.
110     * @param fileSize the total size of the file.
111     * @return the created file instance.
112     * @throws InterruptedException when a thread gets interupted.
113     * @throws IOException          when reading a stream throws exception.
114     */
115    public BoxFile.Info upload(BoxAPIConnection boxApi, String folderId, InputStream stream, URL url,
116                               String fileName, long fileSize) throws InterruptedException, IOException {
117        //Create a upload session
118        BoxFileUploadSession.Info session = this.createUploadSession(boxApi, folderId, url, fileName, fileSize);
119        return this.uploadHelper(session, stream, fileSize, null);
120    }
121
122    /**
123     * Uploads a new large file and sets file attributes.
124     *
125     * @param boxApi         the API connection to be used by the upload session.
126     * @param folderId       the id of the folder in which the file will be uploaded.
127     * @param stream         the input stream that feeds the content of the file.
128     * @param url            the upload session URL.
129     * @param fileName       the name of the file to be created.
130     * @param fileSize       the total size of the file.
131     * @param fileAttributes file attributes to set
132     * @return the created file instance.
133     * @throws InterruptedException when a thread gets interupted.
134     * @throws IOException          when reading a stream throws exception.
135     */
136    public BoxFile.Info upload(BoxAPIConnection boxApi, String folderId, InputStream stream, URL url,
137                               String fileName, long fileSize, Map<String, String> fileAttributes)
138        throws InterruptedException, IOException {
139        //Create a upload session
140        BoxFileUploadSession.Info session = this.createUploadSession(boxApi, folderId, url, fileName, fileSize);
141        return this.uploadHelper(session, stream, fileSize, fileAttributes);
142    }
143
144    /**
145     * Creates a new version of a large file.
146     *
147     * @param boxApi   the API connection to be used by the upload session.
148     * @param stream   the input stream that feeds the content of the file.
149     * @param url      the upload session URL.
150     * @param fileSize the total size of the file.
151     * @return the file instance that also contains the version information.
152     * @throws InterruptedException when a thread gets interupted.
153     * @throws IOException          when reading a stream throws exception.
154     */
155    public BoxFile.Info upload(BoxAPIConnection boxApi, InputStream stream, URL url, long fileSize)
156        throws InterruptedException, IOException {
157        //creates a upload session
158        BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize);
159        return this.uploadHelper(session, stream, fileSize, null);
160    }
161
162    /**
163     * Creates a new version of a large file and sets file attributes.
164     *
165     * @param boxApi         the API connection to be used by the upload session.
166     * @param stream         the input stream that feeds the content of the file.
167     * @param url            the upload session URL.
168     * @param fileSize       the total size of the file.
169     * @param fileAttributes file attributes to set.
170     * @return the file instance that also contains the version information.
171     * @throws InterruptedException when a thread gets interupted.
172     * @throws IOException          when reading a stream throws exception.
173     */
174    public BoxFile.Info upload(BoxAPIConnection boxApi, InputStream stream, URL url, long fileSize,
175                               Map<String, String> fileAttributes)
176        throws InterruptedException, IOException {
177        //creates an upload session
178        BoxFileUploadSession.Info session = this.createUploadSession(boxApi, url, fileSize);
179        return this.uploadHelper(session, stream, fileSize, fileAttributes);
180    }
181
182    private BoxFile.Info uploadHelper(BoxFileUploadSession.Info session, InputStream stream, long fileSize,
183                                      Map<String, String> fileAttributes)
184        throws InterruptedException {
185        //Upload parts using the upload session
186        MessageDigest digest;
187        try {
188            digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1);
189        } catch (NoSuchAlgorithmException ae) {
190            throw new BoxAPIException("Digest algorithm not found", ae);
191        }
192        DigestInputStream dis = new DigestInputStream(stream, digest);
193        List<BoxFileUploadSessionPart> parts = this.uploadParts(session, dis, fileSize);
194
195        //Creates the file hash
196        byte[] digestBytes = digest.digest();
197        String digestStr = Base64.encode(digestBytes);
198
199        //Commit the upload session. If there is a failure, abort the commit.
200        try {
201            return session.getResource().commit(digestStr, parts, fileAttributes, null, null);
202        } catch (Exception e) {
203            session.getResource().abort();
204            throw new BoxAPIException("Unable to commit the upload session", e);
205        }
206    }
207
208    private BoxFileUploadSession.Info createUploadSession(BoxAPIConnection boxApi, URL url, long fileSize) {
209        BoxJSONRequest request = new BoxJSONRequest(boxApi, url, HttpMethod.POST);
210
211        //Creates the body of the request
212        JsonObject body = new JsonObject();
213        body.add("file_size", fileSize);
214        request.setBody(body.toString());
215
216        try (BoxJSONResponse response = request.send()) {
217            JsonObject jsonObject = Json.parse(response.getJSON()).asObject();
218
219            String sessionId = jsonObject.get("id").asString();
220            BoxFileUploadSession session = new BoxFileUploadSession(boxApi, sessionId);
221
222            return session.new Info(jsonObject);
223        }
224    }
225
226    /*
227     * Upload parts of the file. The part size is retrieved from the upload session.
228     */
229    private List<BoxFileUploadSessionPart> uploadParts(
230        BoxFileUploadSession.Info session, InputStream stream, long fileSize
231    ) throws InterruptedException {
232        List<BoxFileUploadSessionPart> parts = new ArrayList<>();
233
234        int partSize = session.getPartSize();
235        long offset = 0;
236        long processed = 0;
237        int partPostion = 0;
238        //Set the Max Queue Size to 1.5x the number of processors
239        double maxQueueSizeDouble = Math.ceil(this.executorService.getMaximumPoolSize() * 1.5);
240        int maxQueueSize = Double.valueOf(maxQueueSizeDouble).intValue();
241        while (processed < fileSize) {
242            //Waiting for any thread to finish before
243            long timeoutForWaitingInMillis = TimeUnit.MILLISECONDS.convert(this.timeout, this.timeUnit);
244            if (this.executorService.getCorePoolSize() <= this.executorService.getActiveCount()) {
245                if (timeoutForWaitingInMillis > 0) {
246                    Thread.sleep(LargeFileUpload.THREAD_POOL_WAIT_TIME_IN_MILLIS);
247                    timeoutForWaitingInMillis -= THREAD_POOL_WAIT_TIME_IN_MILLIS;
248                } else {
249                    throw new BoxAPIException("Upload parts timedout");
250                }
251            }
252            if (this.executorService.getQueue().size() < maxQueueSize) {
253                long diff = fileSize - processed;
254                //The size last part of the file can be lesser than the part size.
255                if (diff < (long) partSize) {
256                    partSize = (int) diff;
257                }
258                parts.add(null);
259                byte[] bytes = getBytesFromStream(stream, partSize);
260                this.executorService.execute(
261                    new LargeFileUploadTask(session.getResource(), bytes, offset,
262                        partSize, fileSize, parts, partPostion)
263                );
264
265                //Increase the offset and proceesed bytes to calculate the Content-Range header.
266                processed += partSize;
267                offset += partSize;
268                partPostion++;
269            }
270        }
271        this.executorService.shutdown();
272        this.executorService.awaitTermination(this.timeout, this.timeUnit);
273        return parts;
274    }
275
276    /**
277     * Generates the Base64 encoded SHA-1 hash for content available in the stream.
278     * It can be used to calculate the hash of a file.
279     *
280     * @param stream the input stream of the file or data.
281     * @return the Base64 encoded hash string.
282     */
283    public String generateDigest(InputStream stream) {
284        MessageDigest digest;
285        try {
286            digest = MessageDigest.getInstance(DIGEST_ALGORITHM_SHA1);
287        } catch (NoSuchAlgorithmException ae) {
288            throw new BoxAPIException("Digest algorithm not found", ae);
289        }
290
291        //Calcuate the digest using the stream.
292        DigestInputStream dis = new DigestInputStream(stream, digest);
293        try {
294            int value = dis.read();
295            while (value != -1) {
296                value = dis.read();
297            }
298        } catch (IOException ioe) {
299            throw new BoxAPIException("Reading the stream failed.", ioe);
300        }
301
302        //Get the calculated digest for the stream
303        byte[] digestBytes = digest.digest();
304        return Base64.encode(digestBytes);
305    }
306}